ricochet-go/core/contact.go

396 lines
12 KiB
Go

package core
import (
"fmt"
protocol "github.com/s-rah/go-ricochet"
"github.com/special/notricochet/core/utils"
"github.com/special/notricochet/rpc"
"golang.org/x/net/context"
"log"
"strconv"
"strings"
"sync"
"time"
)
// XXX There is generally a lot of duplication and boilerplate between
// Contact, ConfigContact, and rpc.Contact. This should be reduced somehow.
type Contact struct {
core *Ricochet
id int
data ConfigContact
status ricochet.Contact_Status
mutex sync.Mutex
events *utils.Publisher
connection *protocol.OpenConnection
connChannel chan *protocol.OpenConnection
connClosedChannel chan struct{}
conversation *Conversation
}
func ContactFromConfig(core *Ricochet, id int, data ConfigContact, events *utils.Publisher) (*Contact, error) {
contact := &Contact{
core: core,
id: id,
data: data,
events: events,
connChannel: make(chan *protocol.OpenConnection),
connClosedChannel: make(chan struct{}),
}
if id < 0 {
return nil, fmt.Errorf("Invalid contact ID '%d'", id)
} else if len(data.Hostname) != 22 || !strings.HasSuffix(data.Hostname, ".onion") {
return nil, fmt.Errorf("Invalid contact hostname '%s", data.Hostname)
}
// XXX Should have some global trigger that starts all contact connections
// at the right time
go contact.contactConnection()
return contact, nil
}
func (c *Contact) Id() int {
return c.id
}
func (c *Contact) Nickname() string {
c.mutex.Lock()
defer c.mutex.Unlock()
return c.data.Nickname
}
func (c *Contact) Address() string {
c.mutex.Lock()
defer c.mutex.Unlock()
return "ricochet:" + c.data.Hostname[0:16]
}
func (c *Contact) Hostname() string {
c.mutex.Lock()
defer c.mutex.Unlock()
return c.data.Hostname
}
func (c *Contact) LastConnected() time.Time {
c.mutex.Lock()
defer c.mutex.Unlock()
time, _ := time.Parse(time.RFC3339, c.data.LastConnected)
return time
}
func (c *Contact) WhenCreated() time.Time {
c.mutex.Lock()
defer c.mutex.Unlock()
time, _ := time.Parse(time.RFC3339, c.data.WhenCreated)
return time
}
func (c *Contact) Status() ricochet.Contact_Status {
c.mutex.Lock()
defer c.mutex.Unlock()
return c.status
}
func (c *Contact) Data() *ricochet.Contact {
c.mutex.Lock()
defer c.mutex.Unlock()
data := &ricochet.Contact{
Id: int32(c.id),
Address: "ricochet:" + c.data.Hostname[0:16],
Nickname: c.data.Nickname,
WhenCreated: c.data.WhenCreated,
LastConnected: c.data.LastConnected,
Status: c.status,
}
return data
}
func (c *Contact) Conversation() *Conversation {
c.mutex.Lock()
defer c.mutex.Unlock()
if c.conversation == nil {
entity := &ricochet.Entity{
ContactId: int32(c.id),
Address: "ricochet:" + c.data.Hostname[0:16],
}
c.conversation = NewConversation(c.core, c, entity)
}
return c.conversation
}
// XXX Thread safety disaster
func (c *Contact) Connection() *protocol.OpenConnection {
c.mutex.Lock()
defer c.mutex.Unlock()
return c.connection
}
// Goroutine to handle the protocol connection for a contact.
// Responsible for making outbound connections and taking over authenticated
// inbound connections, running protocol handlers on the active connection, and
// reacting to connection loss. Nothing else may write Contact.connection.
func (c *Contact) contactConnection() {
// XXX Should the protocol continue handling its own goroutines?
// I'm thinking the protocol API design I want is:
// - "handler" assigned per-connection
// - each inbound listener has its own handler also, assigns for conns
// - so, non-authed connection has a handler that _can't_ do anything other than auth
// - inbound contact req connection gets a special handler for that case
// - authenticated contact conns get handler changed here
// It's probably more sensible to never break the conn read loop, because of buffering etc
// So likely we don't want to change that goroutine. Could still use a channel to pass it
// to handler for parsing, which could let it go on any goroutine we want, if it's desirable
// to put it on e.g. this routine. Hmm.
for {
// If there is no active connection, spawn an outbound connector.
// A successful connection is returned via connChannel; otherwise, it will keep trying.
var outboundCtx context.Context
outboundCancel := func() {}
if c.connection == nil {
outboundCtx, outboundCancel = context.WithCancel(context.Background())
go c.connectOutbound(outboundCtx, c.connChannel)
}
select {
case conn, ok := <-c.connChannel:
outboundCancel()
if !ok {
// Closing connChannel exits this connection routine, for contact
// deletion, exit, or some other case.
break
} else if conn == nil {
// Signal used to restart outbound connection attempts
continue
}
// Received a new connection. The connection is already authenticated and ready to use.
// This connection may be inbound or outbound. setConnection will decide whether to
// replace an existing connection.
// XXX Tweak setConnection logic if needed
// Logic when keeping connection, need to make sure protocol is going, etc...
if err := c.setConnection(conn); err != nil {
log.Printf("Discarded new contact %s connection: %s", c.Address(), err)
if !conn.Closed && conn != c.connection {
conn.Close()
}
continue
}
case <-c.connClosedChannel:
outboundCancel()
c.clearConnection(nil)
}
}
log.Printf("Exiting contact connection loop for %s", c.Address())
c.clearConnection(nil)
}
// Attempt an outbound connection to the contact, retrying automatically using OnionConnector.
// This function _must_ send something to connChannel before returning, unless the context has
// been cancelled.
func (c *Contact) connectOutbound(ctx context.Context, connChannel chan *protocol.OpenConnection) {
c.mutex.Lock()
connector := OnionConnector{
Network: c.core.Network,
NeverGiveUp: true,
}
hostname := c.data.Hostname
c.mutex.Unlock()
for {
conn, err := connector.Connect(hostname+":9878", ctx)
if err != nil {
// The only failure here should be context, because NeverGiveUp
// is set, but be robust anyway.
if ctx.Err() != nil {
return
}
log.Printf("Contact connection failure: %s", err)
continue
}
log.Printf("Successful outbound connection to contact %s", hostname)
oc, err := c.core.Protocol.ConnectOpen(conn, hostname[0:16])
if err != nil {
log.Printf("Contact connection protocol failure: %s", err)
oc.Close()
// XXX These failures are probably not worth retrying so much,
// but that would need to be investigated. For now, just do the
// same backoff behavior.
if err := connector.Backoff(ctx); err != nil {
return
}
continue
} else {
log.Printf("Protocol connection open: %v", oc)
// XXX Protocol API needs to be reworked; see notes in
// contactConnection. Ideally we should authenticate here and
// pass the conneciton back, but for now this can do nothing:
// the connection will either succeed and come in via the
// protocol handler, or will be closed and signalled via
// OnConnectionClosed. Alternatively, it will break because this
// is fragile and dumb.
// XXX BUG: This means no backoff for authentication failure
return
}
}
}
func (c *Contact) setConnection(conn *protocol.OpenConnection) error {
if conn.Client {
log.Printf("Contact %s has a new outbound connection", c.Address())
} else {
log.Printf("Contact %s has a new inbound connection", c.Address())
}
c.mutex.Lock()
if conn == c.connection {
c.mutex.Unlock()
return fmt.Errorf("Duplicate assignment of connection %v to contact %v", conn, c)
}
if !conn.IsAuthed || conn.Closed {
c.mutex.Unlock()
conn.Close()
return fmt.Errorf("Connection %v is not in a valid state to assign to contact %v", conn, c)
}
if c.data.Hostname[0:16] != conn.OtherHostname {
c.mutex.Unlock()
conn.Close()
return fmt.Errorf("Connection hostname %s doesn't match contact hostname %s when assigning connection", conn.OtherHostname, c.data.Hostname[0:16])
}
if c.connection != nil {
if c.shouldReplaceConnection(conn) {
// XXX Signal state change for connection loss?
c.connection.Close()
c.connection = nil
} else {
c.mutex.Unlock()
conn.Close()
return fmt.Errorf("Using existing connection")
}
}
// If this connection is inbound and there's an outbound attempt, keep this
// connection and cancel outbound if we haven't sent authentication yet, or
// if the outbound connection will lose the fallback comparison above.
// XXX implement this
c.connection = conn
c.status = ricochet.Contact_ONLINE
log.Printf("Assigned connection %v to contact %v", c.connection, c)
// XXX implicit accept contact requests
// Update LastConnected time
config := c.core.Config.OpenWrite()
c.data.LastConnected = time.Now().Format(time.RFC3339)
config.Contacts[strconv.Itoa(c.id)] = c.data
config.Save()
c.mutex.Unlock()
event := ricochet.ContactEvent{
Type: ricochet.ContactEvent_UPDATE,
Subject: &ricochet.ContactEvent_Contact{
Contact: c.Data(),
},
}
c.events.Publish(event)
return nil
}
// Close and clear state related to the active contact connection.
// If ifConn is non-nil, the active connection is only cleared if
// it is the same as ifConn. Returns true if cleared.
func (c *Contact) clearConnection(ifConn *protocol.OpenConnection) bool {
c.mutex.Lock()
if c.connection == nil || (ifConn != nil && c.connection != ifConn) {
c.mutex.Unlock()
return false
}
conn := c.connection
c.connection = nil
c.status = ricochet.Contact_OFFLINE
// XXX eww, and also potentially deadlockable?
config := c.core.Config.OpenWrite()
c.data.LastConnected = time.Now().Format(time.RFC3339)
config.Contacts[strconv.Itoa(c.id)] = c.data
config.Save()
c.mutex.Unlock()
if conn != nil && !conn.Closed {
conn.Close()
}
event := ricochet.ContactEvent{
Type: ricochet.ContactEvent_UPDATE,
Subject: &ricochet.ContactEvent_Contact{
Contact: c.Data(),
},
}
c.events.Publish(event)
return true
}
// Decide whether to replace the existing connection with conn.
// Assumes mutex is held.
func (c *Contact) shouldReplaceConnection(conn *protocol.OpenConnection) bool {
if c.connection == nil {
return true
} else if c.connection.Closed {
log.Printf("Replacing dead connection %v for contact %v", c.connection, c)
return true
} else if c.connection.Client == conn.Client {
// If the existing connection is in the same direction, always use the new one
log.Printf("Replacing existing same-direction connection %v with new connection %v for contact %v", c.connection, conn, c)
return true
} else if false {
// If the existing connection is more than 30 seconds old, use the new one
// XXX implement this
} else if preferOutbound := conn.MyHostname < conn.OtherHostname; preferOutbound == conn.Client {
// Fall back to string comparison of hostnames for a stable resolution
// New connection wins
log.Printf("Replacing existing connection %v with new connection %v for contact %v according to fallback order", c.connection, conn, c)
return true
} else {
// Old connection wins fallback
log.Printf("Keeping existing connection %v instead of new connection %v for contact %v according to fallback order", c.connection, conn, c)
return false
}
return false
}
// XXX also will go away during protocol API rework
func (c *Contact) OnConnectionAuthenticated(conn *protocol.OpenConnection) {
c.connChannel <- conn
}
// XXX rework connection close to have a proper notification instead of this "find contact" mess.
func (c *Contact) OnConnectionClosed(conn *protocol.OpenConnection) {
c.mutex.Lock()
if c.connection != conn {
c.mutex.Unlock()
return
}
c.mutex.Unlock()
c.connClosedChannel <- struct{}{}
}