Implement outbound contact request protocol
This commit is contained in:
parent
0b19ae4b66
commit
11cdfeaad8
102
core/contact.go
102
core/contact.go
|
@ -5,6 +5,7 @@ import (
|
||||||
"github.com/ricochet-im/ricochet-go/core/utils"
|
"github.com/ricochet-im/ricochet-go/core/utils"
|
||||||
"github.com/ricochet-im/ricochet-go/rpc"
|
"github.com/ricochet-im/ricochet-go/rpc"
|
||||||
protocol "github.com/s-rah/go-ricochet"
|
protocol "github.com/s-rah/go-ricochet"
|
||||||
|
channels "github.com/s-rah/go-ricochet/channels"
|
||||||
connection "github.com/s-rah/go-ricochet/connection"
|
connection "github.com/s-rah/go-ricochet/connection"
|
||||||
"golang.org/x/net/context"
|
"golang.org/x/net/context"
|
||||||
"log"
|
"log"
|
||||||
|
@ -376,8 +377,6 @@ func (c *Contact) connectOutbound(ctx context.Context, connChannel chan *connect
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// XXX-protocol Also move the "this is an outbound request" logic here to kill the silly flag and
|
|
||||||
// move those out of a scary mutexed path? XXX
|
|
||||||
if !known && !isRequest {
|
if !known && !isRequest {
|
||||||
log.Printf("Outbound connection to contact says we are not a known contact for %v", c)
|
log.Printf("Outbound connection to contact says we are not a known contact for %v", c)
|
||||||
// XXX Should move to rejected status, stop attempting connections.
|
// XXX Should move to rejected status, stop attempting connections.
|
||||||
|
@ -389,6 +388,21 @@ func (c *Contact) connectOutbound(ctx context.Context, connChannel chan *connect
|
||||||
} else if known && isRequest {
|
} else if known && isRequest {
|
||||||
log.Printf("Contact request implicitly accepted for outbound connection by contact %v", c)
|
log.Printf("Contact request implicitly accepted for outbound connection by contact %v", c)
|
||||||
c.UpdateContactRequest("Accepted")
|
c.UpdateContactRequest("Accepted")
|
||||||
|
isRequest = false
|
||||||
|
}
|
||||||
|
|
||||||
|
if isRequest {
|
||||||
|
// Need to send a contact request; this will block until the peer accepts or rejects,
|
||||||
|
// the connection fails, or the context is cancelled (which also closes the connection).
|
||||||
|
if err := c.sendContactRequest(oc, ctx); err != nil {
|
||||||
|
log.Printf("Outbound contact request connection closed: %s", err)
|
||||||
|
if err := connector.Backoff(ctx); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
} else {
|
||||||
|
log.Printf("Outbound contact request accepted, assigning connection")
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Printf("Assigning outbound connection to contact")
|
log.Printf("Assigning outbound connection to contact")
|
||||||
|
@ -397,6 +411,77 @@ func (c *Contact) connectOutbound(ctx context.Context, connChannel chan *connect
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type requestChannelHandler struct {
|
||||||
|
Response chan string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (r *requestChannelHandler) ContactRequest(name, message string) string {
|
||||||
|
log.Printf("BUG: inbound ContactRequest handler called for outbound channel")
|
||||||
|
return "Error"
|
||||||
|
}
|
||||||
|
func (r *requestChannelHandler) ContactRequestRejected() { r.Response <- "Rejected" }
|
||||||
|
func (r *requestChannelHandler) ContactRequestAccepted() { r.Response <- "Accepted" }
|
||||||
|
func (r *requestChannelHandler) ContactRequestError() { r.Response <- "Error" }
|
||||||
|
|
||||||
|
// sendContactRequest synchronously delivers a contact request to an authenticated
|
||||||
|
// outbound connection and waits for a final (yes/no) reply. This may be cancelled
|
||||||
|
// by closing the connection. Once a reply is received, it's passed to
|
||||||
|
// UpdateContactRequest to update the status and this function will return. nil is
|
||||||
|
// returned for an accepted request when the connection is still established. In all
|
||||||
|
// other cases, an error is returned and the connection will be closed.
|
||||||
|
func (c *Contact) sendContactRequest(conn *connection.Connection, ctx context.Context) error {
|
||||||
|
log.Printf("Sending request to outbound contact %v", c)
|
||||||
|
ach := &connection.AutoConnectionHandler{}
|
||||||
|
ach.Init()
|
||||||
|
|
||||||
|
processChan := make(chan error)
|
||||||
|
responseChan := make(chan string)
|
||||||
|
|
||||||
|
// No timeouts on outbound contact request; wait forever for a final reply
|
||||||
|
go func() {
|
||||||
|
processChan <- conn.Process(ach)
|
||||||
|
}()
|
||||||
|
|
||||||
|
err := conn.Do(func() error {
|
||||||
|
_, err := conn.RequestOpenChannel("im.ricochet.contact.request",
|
||||||
|
&channels.ContactRequestChannel{
|
||||||
|
Handler: &requestChannelHandler{Response: responseChan},
|
||||||
|
Name: c.data.Request.MyNickname, // XXX mutex
|
||||||
|
Message: c.data.Request.Message,
|
||||||
|
})
|
||||||
|
return err
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
// Close and end Process, resulting in an error to processChan and return when done
|
||||||
|
conn.Conn.Close()
|
||||||
|
return <-processChan
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case err := <-processChan:
|
||||||
|
// Should not get nil (via Break) return values here; prevent them
|
||||||
|
if err == nil {
|
||||||
|
closeUnhandledConnection(conn)
|
||||||
|
err = fmt.Errorf("unknown connection break")
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
|
||||||
|
case response := <-responseChan:
|
||||||
|
c.UpdateContactRequest(response)
|
||||||
|
if response == "Accepted" {
|
||||||
|
conn.Break()
|
||||||
|
return <-processChan // nil if connection is still alive
|
||||||
|
} else {
|
||||||
|
conn.Conn.Close()
|
||||||
|
return <-processChan
|
||||||
|
}
|
||||||
|
|
||||||
|
case <-ctx.Done():
|
||||||
|
conn.Conn.Close()
|
||||||
|
return <-processChan
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// considerUsingConnection takes a newly established connection and decides whether
|
// considerUsingConnection takes a newly established connection and decides whether
|
||||||
// the new connection is valid and acceptable, and whether to replace or keep an
|
// the new connection is valid and acceptable, and whether to replace or keep an
|
||||||
// existing connection. To handle race cases when peers are connecting to eachother,
|
// existing connection. To handle race cases when peers are connecting to eachother,
|
||||||
|
@ -454,20 +539,11 @@ func (c *Contact) considerUsingConnection(conn *connection.Connection) error {
|
||||||
// Assumes c.mutex is held.
|
// Assumes c.mutex is held.
|
||||||
func (c *Contact) onConnectionStateChanged() {
|
func (c *Contact) onConnectionStateChanged() {
|
||||||
if c.connection != nil {
|
if c.connection != nil {
|
||||||
if c.data.Request.Pending {
|
if c.data.Request.Pending && c.connection.IsInbound {
|
||||||
if !c.connection.IsInbound {
|
|
||||||
// Outbound connection for contact request; send request message
|
|
||||||
log.Printf("Sending outbound contact request to %v", c)
|
|
||||||
// XXX-protocol ooooohhhhh no you don't. Cannot interact w/ protocol here, because process may
|
|
||||||
// not have started yet. Maybe this one needs to go w/ outbound auth in pre-connection also
|
|
||||||
// honestly, it would not be a bad thing to have outbound unaccepted requests _not_ be
|
|
||||||
// considered active connections, as long as they get handled properly.
|
|
||||||
// c.connection.SendContactRequest(5, c.data.Request.MyNickname, c.data.Request.Message)
|
|
||||||
} else {
|
|
||||||
// Inbound connection implicitly accepts the contact request and can continue as a contact
|
// Inbound connection implicitly accepts the contact request and can continue as a contact
|
||||||
|
// Outbound request logic is all handled by connectOutbound.
|
||||||
log.Printf("Contact request implicitly accepted by contact %v", c)
|
log.Printf("Contact request implicitly accepted by contact %v", c)
|
||||||
c.updateContactRequest("Accepted")
|
c.updateContactRequest("Accepted")
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
c.status = ricochet.Contact_ONLINE
|
c.status = ricochet.Contact_ONLINE
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue