Implement inbound contact request protocol
This implements a handler for inbound contact request connections, taking advantage of the synchronous Process API. This is largely untested code at the moment.
This commit is contained in:
parent
11cdfeaad8
commit
b5301c2782
|
@ -298,7 +298,8 @@ func (cl *ContactList) RemoveInboundContactRequest(request *InboundContactReques
|
||||||
return errors.New("Request is not in contact list")
|
return errors.New("Request is not in contact list")
|
||||||
}
|
}
|
||||||
|
|
||||||
request.CloseConnection()
|
// Close connection asynchronously to avoid potential deadlocking
|
||||||
|
go request.CloseConnection()
|
||||||
// XXX Remove from config
|
// XXX Remove from config
|
||||||
|
|
||||||
delete(cl.inboundRequests, requestData.Address)
|
delete(cl.inboundRequests, requestData.Address)
|
||||||
|
|
|
@ -197,9 +197,12 @@ func (me *Identity) handleInboundConnection(conn net.Conn) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// XXX-protocol Unknown contact, should pass to handler for contact requests
|
err = HandleInboundRequestConnection(rc, me.contactList)
|
||||||
log.Printf("Inbound connection is a contact request, but that's not implemented yet")
|
if err == nil {
|
||||||
return errors.New("inbound contact request connections aren't implemented")
|
// Connection now belongs to an accepted contact, don't close it
|
||||||
|
conn = nil
|
||||||
|
}
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (me *Identity) Address() string {
|
func (me *Identity) Address() string {
|
||||||
|
|
|
@ -3,6 +3,7 @@ package core
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"github.com/ricochet-im/ricochet-go/rpc"
|
"github.com/ricochet-im/ricochet-go/rpc"
|
||||||
|
channels "github.com/s-rah/go-ricochet/channels"
|
||||||
connection "github.com/s-rah/go-ricochet/connection"
|
connection "github.com/s-rah/go-ricochet/connection"
|
||||||
"log"
|
"log"
|
||||||
"sync"
|
"sync"
|
||||||
|
@ -10,17 +11,165 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type InboundContactRequest struct {
|
type InboundContactRequest struct {
|
||||||
core *Ricochet
|
core *Ricochet
|
||||||
mutex sync.Mutex
|
mutex sync.Mutex
|
||||||
data ricochet.ContactRequest
|
data ricochet.ContactRequest
|
||||||
conn *connection.Connection
|
Address string
|
||||||
channelID int32
|
|
||||||
Address string
|
// If non-nil, sent the new contact on accept or nil on reject.
|
||||||
|
// Used to signal back to active connections when request state changes.
|
||||||
|
contactResultChan chan *Contact
|
||||||
|
|
||||||
// Called when the request state is changed
|
// Called when the request state is changed
|
||||||
StatusChanged func(request *InboundContactRequest)
|
StatusChanged func(request *InboundContactRequest)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type inboundRequestHandler struct {
|
||||||
|
Name string
|
||||||
|
Message string
|
||||||
|
|
||||||
|
RequestReceivedChan chan struct{}
|
||||||
|
ResponseChan chan string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (irh *inboundRequestHandler) ContactRequestRejected() {}
|
||||||
|
func (irh *inboundRequestHandler) ContactRequestAccepted() {}
|
||||||
|
func (irh *inboundRequestHandler) ContactRequestError() {}
|
||||||
|
func (irh *inboundRequestHandler) ContactRequest(name, message string) string {
|
||||||
|
if len(name) > 0 && !IsNicknameAcceptable(name) {
|
||||||
|
log.Printf("protocol: Stripping unacceptable nickname from inbound request; encoded: %x", []byte(name))
|
||||||
|
name = ""
|
||||||
|
}
|
||||||
|
irh.Name = name
|
||||||
|
if len(message) > 0 && !IsMessageAcceptable(message) {
|
||||||
|
log.Printf("protocol: Stripping unacceptable message from inbound request; len: %d, encoded: %x", len(message), []byte(message))
|
||||||
|
message = ""
|
||||||
|
}
|
||||||
|
irh.Message = message
|
||||||
|
|
||||||
|
// Signal to the goroutine that a request was received
|
||||||
|
irh.RequestReceivedChan <- struct{}{}
|
||||||
|
// Wait for a response
|
||||||
|
response := <-irh.ResponseChan
|
||||||
|
return response
|
||||||
|
}
|
||||||
|
|
||||||
|
// HandleInboundRequestConnection takes an authenticated connection that does not
|
||||||
|
// associate to any known contact and handles inbound contact request channels.
|
||||||
|
// If no request is seen after a short timeout, the connection will be closed.
|
||||||
|
// If a valid request is received, the connection will stay open to wait for the
|
||||||
|
// user's reply.
|
||||||
|
//
|
||||||
|
// This function takes full ownership of the connection. On return, the connection
|
||||||
|
// will be either closed and a non-nil error returned, or will be assigned to an
|
||||||
|
// existing Contact along with a nil return value.
|
||||||
|
func HandleInboundRequestConnection(conn *connection.Connection, contactList *ContactList) error {
|
||||||
|
log.Printf("Handling inbound contact request connection")
|
||||||
|
ach := &connection.AutoConnectionHandler{}
|
||||||
|
ach.Init()
|
||||||
|
|
||||||
|
// There may only ever be one contact request channel on the connection
|
||||||
|
req := &inboundRequestHandler{
|
||||||
|
RequestReceivedChan: make(chan struct{}),
|
||||||
|
ResponseChan: make(chan string),
|
||||||
|
}
|
||||||
|
// XXX should close conn if the channel goes away...
|
||||||
|
ach.RegisterChannelHandler("im.ricochet.contact.request", func() channels.Handler {
|
||||||
|
return &channels.ContactRequestChannel{Handler: req}
|
||||||
|
})
|
||||||
|
|
||||||
|
processChan := make(chan error)
|
||||||
|
go func() {
|
||||||
|
processChan <- conn.Process(ach)
|
||||||
|
}()
|
||||||
|
|
||||||
|
address, ok := AddressFromPlainHost(conn.RemoteHostname)
|
||||||
|
if !ok {
|
||||||
|
conn.Conn.Close()
|
||||||
|
return <-processChan
|
||||||
|
}
|
||||||
|
|
||||||
|
// Expecting to receive request data within 15 seconds
|
||||||
|
select {
|
||||||
|
case <-req.RequestReceivedChan:
|
||||||
|
break
|
||||||
|
case err := <-processChan:
|
||||||
|
if err == nil {
|
||||||
|
conn.Conn.Close()
|
||||||
|
err = errors.New("unexpected break")
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
case <-time.After(15 * time.Second):
|
||||||
|
// Didn't receive a contact request fast enough
|
||||||
|
conn.Conn.Close()
|
||||||
|
return <-processChan
|
||||||
|
}
|
||||||
|
|
||||||
|
// Function to respond to the request; changed after the initial response
|
||||||
|
respond := func(status string) { req.ResponseChan <- status }
|
||||||
|
|
||||||
|
request, contact := contactList.AddOrUpdateInboundContactRequest(address, req.Name, req.Message)
|
||||||
|
if contact == nil && request != nil && !request.IsRejected() {
|
||||||
|
// Pending request; keep connection open and wait for a user response
|
||||||
|
respond("Pending")
|
||||||
|
contactChan := request.getContactResultChannel()
|
||||||
|
select {
|
||||||
|
case c, ok := <-contactChan:
|
||||||
|
if !ok {
|
||||||
|
// Replaced by a different connection or otherwise cancelled without reply
|
||||||
|
conn.Conn.Close()
|
||||||
|
return <-processChan
|
||||||
|
}
|
||||||
|
// Set contact and fall out to handle the reply
|
||||||
|
contact = c
|
||||||
|
break
|
||||||
|
case err := <-processChan:
|
||||||
|
request.clearContactResultChannel(contactChan)
|
||||||
|
for {
|
||||||
|
_, ok := <-contactChan
|
||||||
|
if !ok {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Change how future responses are sent
|
||||||
|
respond = func(status string) {
|
||||||
|
conn.Do(func() error {
|
||||||
|
channel := conn.Channel("im.ricochet.contact.request", channels.Inbound)
|
||||||
|
if channel == nil {
|
||||||
|
return errors.New("no channel")
|
||||||
|
}
|
||||||
|
channel.Handler.(*channels.ContactRequestChannel).SendResponse(status)
|
||||||
|
// Also close the channel; this was a final response
|
||||||
|
channel.CloseChannel()
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Have a response (either immediately or after pending)
|
||||||
|
if contact != nil {
|
||||||
|
// Accepted
|
||||||
|
respond("Accepted")
|
||||||
|
if err := conn.Break(); err != nil {
|
||||||
|
// Connection lost; but request was accepted anyway, so it'll get reconnected later
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
contact.AssignConnection(conn)
|
||||||
|
return nil
|
||||||
|
} else {
|
||||||
|
// Rejected
|
||||||
|
respond("Rejected")
|
||||||
|
if request != nil {
|
||||||
|
contactList.RemoveInboundContactRequest(request)
|
||||||
|
}
|
||||||
|
conn.Conn.Close()
|
||||||
|
return <-processChan
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// CreateInboundContactRequest constructs a new InboundContactRequest, usually from a newly
|
// CreateInboundContactRequest constructs a new InboundContactRequest, usually from a newly
|
||||||
// received request on an open connection. Requests are managed through the ContactList, so
|
// received request on an open connection. Requests are managed through the ContactList, so
|
||||||
// generally you should use ContactList.AddOrUpdateInboundContactRequest instead of calling
|
// generally you should use ContactList.AddOrUpdateInboundContactRequest instead of calling
|
||||||
|
@ -41,23 +190,35 @@ func CreateInboundContactRequest(core *Ricochet, address, nickname, message stri
|
||||||
return cr
|
return cr
|
||||||
}
|
}
|
||||||
|
|
||||||
// XXX-protocol There should be stricter management & a timeout for this connection
|
// getContactResultChannel returns a channel that will be sent a Contact if the request is
|
||||||
func (cr *InboundContactRequest) SetConnection(conn *connection.Connection, channelID int32) {
|
// accepted, nil if the request is rejected, or closed if the channel is no longer used.
|
||||||
|
// This is used to communciate with active connections for pending requests.
|
||||||
|
func (cr *InboundContactRequest) getContactResultChannel() chan *Contact {
|
||||||
cr.mutex.Lock()
|
cr.mutex.Lock()
|
||||||
defer cr.mutex.Unlock()
|
defer cr.mutex.Unlock()
|
||||||
|
|
||||||
if cr.conn != nil && cr.conn != conn {
|
if cr.contactResultChan != nil {
|
||||||
log.Printf("Replacing connection on an inbound contact request")
|
close(cr.contactResultChan)
|
||||||
cr.conn.Conn.Close()
|
}
|
||||||
|
cr.contactResultChan = make(chan *Contact)
|
||||||
|
return cr.contactResultChan
|
||||||
|
}
|
||||||
|
|
||||||
|
func (cr *InboundContactRequest) clearContactResultChannel(c chan *Contact) {
|
||||||
|
cr.mutex.Lock()
|
||||||
|
defer cr.mutex.Unlock()
|
||||||
|
if cr.contactResultChan == c {
|
||||||
|
close(cr.contactResultChan)
|
||||||
|
cr.contactResultChan = nil
|
||||||
}
|
}
|
||||||
cr.conn = conn
|
|
||||||
cr.channelID = channelID
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cr *InboundContactRequest) CloseConnection() {
|
func (cr *InboundContactRequest) CloseConnection() {
|
||||||
if cr.conn != nil {
|
cr.mutex.Lock()
|
||||||
cr.conn.Conn.Close()
|
defer cr.mutex.Unlock()
|
||||||
cr.conn = nil
|
if cr.contactResultChan != nil {
|
||||||
|
close(cr.contactResultChan)
|
||||||
|
cr.contactResultChan = nil
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -117,27 +278,32 @@ func (cr *InboundContactRequest) Accept() (*Contact, error) {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := cr.AcceptWithContact(contact); err != nil {
|
if err := cr.acceptWithContact(contact); err != nil {
|
||||||
return contact, err
|
return contact, err
|
||||||
}
|
}
|
||||||
return contact, nil
|
return contact, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cr *InboundContactRequest) AcceptWithContact(contact *Contact) error {
|
func (cr *InboundContactRequest) AcceptWithContact(contact *Contact) error {
|
||||||
|
cr.mutex.Lock()
|
||||||
|
defer cr.mutex.Unlock()
|
||||||
|
return cr.acceptWithContact(contact)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Assumes mutex
|
||||||
|
func (cr *InboundContactRequest) acceptWithContact(contact *Contact) error {
|
||||||
if contact.Address() != cr.data.Address {
|
if contact.Address() != cr.data.Address {
|
||||||
return errors.New("Contact address does not match request in accept")
|
return errors.New("Contact address does not match request in accept")
|
||||||
}
|
}
|
||||||
|
|
||||||
cr.core.Identity.ContactList().RemoveInboundContactRequest(cr)
|
// Send to active connection if present
|
||||||
|
if cr.contactResultChan != nil {
|
||||||
// Pass the open connection to the new contact
|
cr.contactResultChan <- contact
|
||||||
if cr.conn != nil {
|
close(cr.contactResultChan)
|
||||||
// XXX-protocol cr.conn.AckContactRequest(cr.channelID, "Accepted")
|
cr.contactResultChan = nil
|
||||||
// XXX-protocol cr.conn.CloseChannel(cr.channelID)
|
|
||||||
contact.AssignConnection(cr.conn)
|
|
||||||
cr.conn = nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
cr.core.Identity.ContactList().RemoveInboundContactRequest(cr)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -152,20 +318,19 @@ func (cr *InboundContactRequest) Reject() {
|
||||||
log.Printf("Rejecting contact request from %s", cr.data.Address)
|
log.Printf("Rejecting contact request from %s", cr.data.Address)
|
||||||
cr.data.Rejected = true
|
cr.data.Rejected = true
|
||||||
|
|
||||||
|
// Signal to the active connection
|
||||||
|
if cr.contactResultChan != nil {
|
||||||
|
cr.contactResultChan <- nil
|
||||||
|
close(cr.contactResultChan)
|
||||||
|
cr.contactResultChan = nil
|
||||||
|
}
|
||||||
|
|
||||||
// Signal update to the callback (probably from ContactList)
|
// Signal update to the callback (probably from ContactList)
|
||||||
if cr.StatusChanged != nil {
|
if cr.StatusChanged != nil {
|
||||||
cr.StatusChanged(cr)
|
cr.StatusChanged(cr)
|
||||||
}
|
}
|
||||||
|
|
||||||
if cr.conn != nil {
|
cr.core.Identity.ContactList().RemoveInboundContactRequest(cr)
|
||||||
// XXX-protocol cr.conn.AckContactRequest(cr.channelID, "Rejected")
|
|
||||||
// XXX-protocol cr.conn.CloseChannel(cr.channelID)
|
|
||||||
cr.conn.Conn.Close()
|
|
||||||
cr.conn = nil
|
|
||||||
|
|
||||||
// The request can be removed once a protocol response is sent
|
|
||||||
cr.core.Identity.ContactList().RemoveInboundContactRequest(cr)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cr *InboundContactRequest) Data() ricochet.ContactRequest {
|
func (cr *InboundContactRequest) Data() ricochet.ContactRequest {
|
||||||
|
|
Loading…
Reference in New Issue