diff --git a/core/contact.go b/core/contact.go index f5233e4..8cdbc14 100644 --- a/core/contact.go +++ b/core/contact.go @@ -5,6 +5,7 @@ import ( "github.com/ricochet-im/ricochet-go/core/utils" "github.com/ricochet-im/ricochet-go/rpc" protocol "github.com/s-rah/go-ricochet" + channels "github.com/s-rah/go-ricochet/channels" connection "github.com/s-rah/go-ricochet/connection" "golang.org/x/net/context" "log" @@ -376,8 +377,6 @@ func (c *Contact) connectOutbound(ctx context.Context, connChannel chan *connect continue } - // XXX-protocol Also move the "this is an outbound request" logic here to kill the silly flag and - // move those out of a scary mutexed path? XXX if !known && !isRequest { log.Printf("Outbound connection to contact says we are not a known contact for %v", c) // XXX Should move to rejected status, stop attempting connections. @@ -389,6 +388,21 @@ func (c *Contact) connectOutbound(ctx context.Context, connChannel chan *connect } else if known && isRequest { log.Printf("Contact request implicitly accepted for outbound connection by contact %v", c) c.UpdateContactRequest("Accepted") + isRequest = false + } + + if isRequest { + // Need to send a contact request; this will block until the peer accepts or rejects, + // the connection fails, or the context is cancelled (which also closes the connection). + if err := c.sendContactRequest(oc, ctx); err != nil { + log.Printf("Outbound contact request connection closed: %s", err) + if err := connector.Backoff(ctx); err != nil { + return + } + continue + } else { + log.Printf("Outbound contact request accepted, assigning connection") + } } log.Printf("Assigning outbound connection to contact") @@ -397,6 +411,77 @@ func (c *Contact) connectOutbound(ctx context.Context, connChannel chan *connect } } +type requestChannelHandler struct { + Response chan string +} + +func (r *requestChannelHandler) ContactRequest(name, message string) string { + log.Printf("BUG: inbound ContactRequest handler called for outbound channel") + return "Error" +} +func (r *requestChannelHandler) ContactRequestRejected() { r.Response <- "Rejected" } +func (r *requestChannelHandler) ContactRequestAccepted() { r.Response <- "Accepted" } +func (r *requestChannelHandler) ContactRequestError() { r.Response <- "Error" } + +// sendContactRequest synchronously delivers a contact request to an authenticated +// outbound connection and waits for a final (yes/no) reply. This may be cancelled +// by closing the connection. Once a reply is received, it's passed to +// UpdateContactRequest to update the status and this function will return. nil is +// returned for an accepted request when the connection is still established. In all +// other cases, an error is returned and the connection will be closed. +func (c *Contact) sendContactRequest(conn *connection.Connection, ctx context.Context) error { + log.Printf("Sending request to outbound contact %v", c) + ach := &connection.AutoConnectionHandler{} + ach.Init() + + processChan := make(chan error) + responseChan := make(chan string) + + // No timeouts on outbound contact request; wait forever for a final reply + go func() { + processChan <- conn.Process(ach) + }() + + err := conn.Do(func() error { + _, err := conn.RequestOpenChannel("im.ricochet.contact.request", + &channels.ContactRequestChannel{ + Handler: &requestChannelHandler{Response: responseChan}, + Name: c.data.Request.MyNickname, // XXX mutex + Message: c.data.Request.Message, + }) + return err + }) + if err != nil { + // Close and end Process, resulting in an error to processChan and return when done + conn.Conn.Close() + return <-processChan + } + + select { + case err := <-processChan: + // Should not get nil (via Break) return values here; prevent them + if err == nil { + closeUnhandledConnection(conn) + err = fmt.Errorf("unknown connection break") + } + return err + + case response := <-responseChan: + c.UpdateContactRequest(response) + if response == "Accepted" { + conn.Break() + return <-processChan // nil if connection is still alive + } else { + conn.Conn.Close() + return <-processChan + } + + case <-ctx.Done(): + conn.Conn.Close() + return <-processChan + } +} + // considerUsingConnection takes a newly established connection and decides whether // the new connection is valid and acceptable, and whether to replace or keep an // existing connection. To handle race cases when peers are connecting to eachother, @@ -454,20 +539,11 @@ func (c *Contact) considerUsingConnection(conn *connection.Connection) error { // Assumes c.mutex is held. func (c *Contact) onConnectionStateChanged() { if c.connection != nil { - if c.data.Request.Pending { - if !c.connection.IsInbound { - // Outbound connection for contact request; send request message - log.Printf("Sending outbound contact request to %v", c) - // XXX-protocol ooooohhhhh no you don't. Cannot interact w/ protocol here, because process may - // not have started yet. Maybe this one needs to go w/ outbound auth in pre-connection also - // honestly, it would not be a bad thing to have outbound unaccepted requests _not_ be - // considered active connections, as long as they get handled properly. - // c.connection.SendContactRequest(5, c.data.Request.MyNickname, c.data.Request.Message) - } else { - // Inbound connection implicitly accepts the contact request and can continue as a contact - log.Printf("Contact request implicitly accepted by contact %v", c) - c.updateContactRequest("Accepted") - } + if c.data.Request.Pending && c.connection.IsInbound { + // Inbound connection implicitly accepts the contact request and can continue as a contact + // Outbound request logic is all handled by connectOutbound. + log.Printf("Contact request implicitly accepted by contact %v", c) + c.updateContactRequest("Accepted") } else { c.status = ricochet.Contact_ONLINE }