From f56204116c58458db2fc0f80ee147dba5429bc90 Mon Sep 17 00:00:00 2001 From: John Brooks Date: Thu, 29 Sep 2016 22:13:55 -0700 Subject: [PATCH] core: Make outbound contact connections Refactor the connection management on contacts to have a goroutine responsible for tracking the state of a contact's connection, launching and canceling outbound attempts when appropriate, etc. --- core/contact.go | 203 +++++++++++++++++++++++++++++++++++++++++------ core/network.go | 2 + core/protocol.go | 31 +++++++- 3 files changed, 207 insertions(+), 29 deletions(-) diff --git a/core/contact.go b/core/contact.go index d198460..8827929 100644 --- a/core/contact.go +++ b/core/contact.go @@ -5,6 +5,7 @@ import ( protocol "github.com/s-rah/go-ricochet" "github.com/special/notricochet/core/utils" "github.com/special/notricochet/rpc" + "golang.org/x/net/context" "log" "strconv" "strings" @@ -25,15 +26,19 @@ type Contact struct { mutex sync.Mutex events *utils.Publisher - connection *protocol.OpenConnection + connection *protocol.OpenConnection + connChannel chan *protocol.OpenConnection + connClosedChannel chan struct{} } func ContactFromConfig(core *Ricochet, id int, data ConfigContact, events *utils.Publisher) (*Contact, error) { contact := &Contact{ - core: core, - id: id, - data: data, - events: events, + core: core, + id: id, + data: data, + events: events, + connChannel: make(chan *protocol.OpenConnection), + connClosedChannel: make(chan struct{}), } if id < 0 { @@ -42,6 +47,10 @@ func ContactFromConfig(core *Ricochet, id int, data ConfigContact, events *utils return nil, fmt.Errorf("Invalid contact hostname '%s", data.Hostname) } + // XXX Should have some global trigger that starts all contact connections + // at the right time + go contact.contactConnection() + return contact, nil } @@ -101,7 +110,128 @@ func (c *Contact) Data() *ricochet.Contact { return data } -func (c *Contact) SetConnection(conn *protocol.OpenConnection) error { +// Goroutine to handle the protocol connection for a contact. +// Responsible for making outbound connections and taking over authenticated +// inbound connections, running protocol handlers on the active connection, and +// reacting to connection loss. Nothing else may write Contact.connection. +func (c *Contact) contactConnection() { + // XXX Should the protocol continue handling its own goroutines? + // I'm thinking the protocol API design I want is: + // - "handler" assigned per-connection + // - each inbound listener has its own handler also, assigns for conns + // - so, non-authed connection has a handler that _can't_ do anything other than auth + // - inbound contact req connection gets a special handler for that case + // - authenticated contact conns get handler changed here + // It's probably more sensible to never break the conn read loop, because of buffering etc + // So likely we don't want to change that goroutine. Could still use a channel to pass it + // to handler for parsing, which could let it go on any goroutine we want, if it's desirable + // to put it on e.g. this routine. Hmm. + + for { + // If there is no active connection, spawn an outbound connector. + // A successful connection is returned via connChannel; otherwise, it will keep trying. + var outboundCtx context.Context + outboundCancel := func() {} + if c.connection == nil { + outboundCtx, outboundCancel = context.WithCancel(context.Background()) + go c.connectOutbound(outboundCtx, c.connChannel) + } + + select { + case conn, ok := <-c.connChannel: + outboundCancel() + if !ok { + // Closing connChannel exits this connection routine, for contact + // deletion, exit, or some other case. + break + } else if conn == nil { + // Signal used to restart outbound connection attempts + continue + } + + // Received a new connection. The connection is already authenticated and ready to use. + // This connection may be inbound or outbound. setConnection will decide whether to + // replace an existing connection. + + // XXX Tweak setConnection logic if needed + // Logic when keeping connection, need to make sure protocol is going, etc... + if err := c.setConnection(conn); err != nil { + log.Printf("Discarded new contact %s connection: %s", c.Address(), err) + if !conn.Closed && conn != c.connection { + conn.Close() + } + continue + } + + case <-c.connClosedChannel: + outboundCancel() + c.clearConnection(nil) + } + } + + log.Printf("Exiting contact connection loop for %s", c.Address()) + c.clearConnection(nil) +} + +// Attempt an outbound connection to the contact, retrying automatically using OnionConnector. +// This function _must_ send something to connChannel before returning, unless the context has +// been cancelled. +func (c *Contact) connectOutbound(ctx context.Context, connChannel chan *protocol.OpenConnection) { + c.mutex.Lock() + connector := OnionConnector{ + Network: c.core.Network, + NeverGiveUp: true, + } + hostname := c.data.Hostname + c.mutex.Unlock() + + for { + conn, err := connector.Connect(hostname+":9878", ctx) + if err != nil { + // The only failure here should be context, because NeverGiveUp + // is set, but be robust anyway. + if ctx.Err() != nil { + return + } + + log.Printf("Contact connection failure: %s", err) + continue + } + + log.Printf("Successful outbound connection to contact %s", hostname) + oc, err := c.core.Protocol.ConnectOpen(conn, hostname[0:16]) + if err != nil { + log.Printf("Contact connection protocol failure: %s", err) + oc.Close() + // XXX These failures are probably not worth retrying so much, + // but that would need to be investigated. For now, just do the + // same backoff behavior. + if err := connector.Backoff(ctx); err != nil { + return + } + continue + } else { + log.Printf("Protocol connection open: %v", oc) + // XXX Protocol API needs to be reworked; see notes in + // contactConnection. Ideally we should authenticate here and + // pass the conneciton back, but for now this can do nothing: + // the connection will either succeed and come in via the + // protocol handler, or will be closed and signalled via + // OnConnectionClosed. Alternatively, it will break because this + // is fragile and dumb. + // XXX BUG: This means no backoff for authentication failure + return + } + } +} + +func (c *Contact) setConnection(conn *protocol.OpenConnection) error { + if conn.Client { + log.Printf("Contact %s has a new outbound connection", c.Address()) + } else { + log.Printf("Contact %s has a new inbound connection", c.Address()) + } + c.mutex.Lock() if conn == c.connection { @@ -138,7 +268,6 @@ func (c *Contact) SetConnection(conn *protocol.OpenConnection) error { // if the outbound connection will lose the fallback comparison above. // XXX implement this - // XXX react to connection state changes c.connection = conn c.status = ricochet.Contact_ONLINE log.Printf("Assigned connection %v to contact %v", c.connection, c) @@ -164,6 +293,41 @@ func (c *Contact) SetConnection(conn *protocol.OpenConnection) error { return nil } +// Close and clear state related to the active contact connection. +// If ifConn is non-nil, the active connection is only cleared if +// it is the same as ifConn. Returns true if cleared. +func (c *Contact) clearConnection(ifConn *protocol.OpenConnection) bool { + c.mutex.Lock() + if c.connection == nil || (ifConn != nil && c.connection != ifConn) { + c.mutex.Unlock() + return false + } + + conn := c.connection + c.connection = nil + c.status = ricochet.Contact_OFFLINE + + // XXX eww, and also potentially deadlockable? + config := c.core.Config.OpenWrite() + c.data.LastConnected = time.Now().Format(time.RFC3339) + config.Contacts[strconv.Itoa(c.id)] = c.data + config.Save() + c.mutex.Unlock() + + if conn != nil && !conn.Closed { + conn.Close() + } + + event := ricochet.ContactEvent{ + Type: ricochet.ContactEvent_UPDATE, + Subject: &ricochet.ContactEvent_Contact{ + Contact: c.Data(), + }, + } + c.events.Publish(event) + return true +} + // Decide whether to replace the existing connection with conn. // Assumes mutex is held. func (c *Contact) shouldReplaceConnection(conn *protocol.OpenConnection) bool { @@ -192,29 +356,18 @@ func (c *Contact) shouldReplaceConnection(conn *protocol.OpenConnection) bool { return false } +// XXX also will go away during protocol API rework +func (c *Contact) OnConnectionAuthenticated(conn *protocol.OpenConnection) { + c.connChannel <- conn +} + +// XXX rework connection close to have a proper notification instead of this "find contact" mess. func (c *Contact) OnConnectionClosed(conn *protocol.OpenConnection) { c.mutex.Lock() - if c.connection != conn { c.mutex.Unlock() return } - - c.connection = nil - c.status = ricochet.Contact_OFFLINE - - config := c.core.Config.OpenWrite() - c.data.LastConnected = time.Now().Format(time.RFC3339) - config.Contacts[strconv.Itoa(c.id)] = c.data - config.Save() - c.mutex.Unlock() - - event := ricochet.ContactEvent{ - Type: ricochet.ContactEvent_UPDATE, - Subject: &ricochet.ContactEvent_Contact{ - Contact: c.Data(), - }, - } - c.events.Publish(event) + c.connClosedChannel <- struct{}{} } diff --git a/core/network.go b/core/network.go index 94f8192..a85ce2d 100644 --- a/core/network.go +++ b/core/network.go @@ -16,6 +16,8 @@ import ( "time" ) +// XXX Network disconnect should kill open connections ... somehow + type Network struct { // Connection settings; can only change while stopped controlAddress string diff --git a/core/protocol.go b/core/protocol.go index 8d81b2c..58ed1cd 100644 --- a/core/protocol.go +++ b/core/protocol.go @@ -36,8 +36,8 @@ func (p *Protocol) ServeListener(listener net.Listener) { // Strangely, ServeListener starts a background routine that watches a channel // on p.service for new connections and dispatches their events to the handler // for the listener. API needs a little work here. -func (p *Protocol) Connect(address string) (*protocol.OpenConnection, error) { - oc, err := p.service.Connect(address) +func (p *Protocol) ConnectOpen(conn net.Conn, host string) (*protocol.OpenConnection, error) { + oc, err := p.service.ConnectOpen(conn, host) if err != nil { return nil, err } @@ -104,13 +104,36 @@ func (handler *protocolHandler) OnAuthenticationProof(oc *protocol.OpenConnectio log.Printf("protocol: OnAuthenticationProof, result: %v, contact: %v", result, contact) if result && contact != nil { - contact.SetConnection(oc) + contact.OnConnectionAuthenticated(oc) } } func (handler *protocolHandler) OnAuthenticationResult(oc *protocol.OpenConnection, channelID int32, result bool, isKnownContact bool) { - log.Printf("protocol: OnAuthenticationResult, result: %v, known: %v", result, isKnownContact) oc.IsAuthed = result + oc.CloseChannel(channelID) + + if !result { + log.Printf("protocol: Outbound connection authentication to %s failed", oc.OtherHostname) + oc.Close() + return + } + + // XXX Contact request, removed cases + if !isKnownContact { + log.Printf("protocol: Outbound connection authentication to %s succeeded, but we are not a known contact", oc.OtherHostname) + oc.Close() + return + } + + contact := handler.p.core.Identity.ContactList().ContactByAddress("ricochet:" + oc.OtherHostname) + if contact == nil { + log.Printf("protocol: Outbound connection authenticated to %s succeeded, but no matching contact found", oc.OtherHostname) + oc.Close() + return + } + + log.Printf("protocol: Outbound connection to %s authenticated", oc.OtherHostname) + contact.OnConnectionAuthenticated(oc) } // Contact Management