Port core to the new go-ricochet API

This includes connection management and enough to compile, but doesn't
reimplement most of the protocol messages and functionality yet. There
are problems noted throughout the code, also.
This commit is contained in:
John Brooks 2017-08-13 20:04:35 -06:00
parent 9ae0eac4f3
commit 0c187e72a0
7 changed files with 388 additions and 509 deletions

49
core/contact-protocol.go Normal file
View File

@ -0,0 +1,49 @@
package core
import (
"github.com/s-rah/go-ricochet/channels"
"github.com/s-rah/go-ricochet/connection"
"log"
"time"
)
type ContactProtocolHandler struct {
connection.AutoConnectionHandler
conn *connection.Connection
contact *Contact
}
func NewContactProtocolHandler(contact *Contact, conn *connection.Connection) *ContactProtocolHandler {
handler := &ContactProtocolHandler{
conn: conn,
contact: contact,
}
handler.Init(nil, conn.RemoteHostname)
handler.RegisterChannelHandler("im.ricochet.chat", func() channels.Handler {
chat := new(channels.ChatChannel)
chat.Handler = handler
return chat
})
// XXX Somebody needs to be calling Process, nobody is yet, need that rework in contact
return handler
}
// Implement ChatChannelHandler for im.ricochet.chat
func (handler *ContactProtocolHandler) ChatMessage(messageID uint32, when time.Time, message string) bool {
// XXX sanity checks, message contents, etc
log.Printf("chat message: %d %d %v %s", messageID, when, message)
conversation := handler.contact.Conversation()
conversation.Receive(uint64(messageID), when.Unix(), message)
return true
}
func (handler *ContactProtocolHandler) ChatMessageAck(messageID uint32) {
// XXX no success field
log.Printf("chat ack: %d", messageID)
conversation := handler.contact.Conversation()
conversation.UpdateSentStatus(uint64(messageID), true)
}

View File

@ -5,6 +5,7 @@ import (
"github.com/ricochet-im/ricochet-go/core/utils" "github.com/ricochet-im/ricochet-go/core/utils"
"github.com/ricochet-im/ricochet-go/rpc" "github.com/ricochet-im/ricochet-go/rpc"
protocol "github.com/s-rah/go-ricochet" protocol "github.com/s-rah/go-ricochet"
connection "github.com/s-rah/go-ricochet/connection"
"golang.org/x/net/context" "golang.org/x/net/context"
"log" "log"
"strconv" "strconv"
@ -29,15 +30,13 @@ type Contact struct {
events *utils.Publisher events *utils.Publisher
connEnabled bool connEnabled bool
connection *protocol.OpenConnection connection *connection.Connection
connChannel chan *protocol.OpenConnection connChannel chan *connection.Connection
connClosedChannel chan struct{} connEnabledSignal chan bool
connStopped chan struct{} connectionOnce sync.Once
timeConnected time.Time timeConnected time.Time
outboundConnAuthKnown bool
conversation *Conversation conversation *Conversation
} }
@ -47,6 +46,8 @@ func ContactFromConfig(core *Ricochet, id int, data ConfigContact, events *utils
id: id, id: id,
data: data, data: data,
events: events, events: events,
connChannel: make(chan *connection.Connection),
connEnabledSignal: make(chan bool),
} }
if id < 0 { if id < 0 {
@ -153,41 +154,31 @@ func (c *Contact) Conversation() *Conversation {
return c.conversation return c.conversation
} }
// XXX Thread safety disaster func (c *Contact) Connection() *connection.Connection {
func (c *Contact) Connection() *protocol.OpenConnection {
c.mutex.Lock() c.mutex.Lock()
defer c.mutex.Unlock() defer c.mutex.Unlock()
return c.connection return c.connection
} }
// StartConnection enables inbound and outbound connections for this contact, if other
// conditions permit them. This function is safe to call repeatedly.
func (c *Contact) StartConnection() { func (c *Contact) StartConnection() {
c.mutex.Lock() c.connectionOnce.Do(func() {
defer c.mutex.Unlock() go c.contactConnection()
})
if c.connEnabled {
return
}
c.connEnabled = true c.connEnabled = true
c.connChannel = make(chan *protocol.OpenConnection) c.connEnabledSignal <- true
c.connClosedChannel = make(chan struct{})
c.connStopped = make(chan struct{})
go c.contactConnection()
} }
func (c *Contact) StopConnection() { func (c *Contact) StopConnection() {
c.mutex.Lock() // Must be running to consume connEnabledSignal
if !c.connEnabled { c.connectionOnce.Do(func() {
c.mutex.Unlock() go c.contactConnection()
return })
}
stopped := c.connStopped c.connEnabled = false
close(c.connChannel) c.connEnabledSignal <- false
c.connChannel = nil
c.connClosedChannel = nil
c.connStopped = nil
c.mutex.Unlock()
<-stopped
} }
func (c *Contact) shouldMakeOutboundConnections() bool { func (c *Contact) shouldMakeOutboundConnections() bool {
@ -202,85 +193,149 @@ func (c *Contact) shouldMakeOutboundConnections() bool {
return c.connEnabled return c.connEnabled
} }
// closeUnhandledConnection takes a connection without an active Process routine
// and ensures that it is fully closed and destroyed. It is safe to call on
// a connection that has already been closed and on any connection in any
// state, as long as Process() is not currently running.
func closeUnhandledConnection(conn *connection.Connection) {
conn.Conn.Close()
nullHandler := &connection.AutoConnectionHandler{}
nullHandler.Init()
conn.Process(nullHandler)
}
// Goroutine to handle the protocol connection for a contact. // Goroutine to handle the protocol connection for a contact.
// Responsible for making outbound connections and taking over authenticated // Responsible for making outbound connections and taking over authenticated
// inbound connections, running protocol handlers on the active connection, and // inbound connections, running protocol handlers on the active connection, and
// reacting to connection loss. Nothing else may write Contact.connection. // reacting to connection loss. Nothing else may write Contact.connection.
//
// This goroutine is started by the first call to StartConnection or StopConnection
// and persists for the lifetime of the contact. When connections are stopped, it
// consumes connChannel and closes all (presumably inbound) connections.
// XXX Need a hard kill for destroying contacts
func (c *Contact) contactConnection() { func (c *Contact) contactConnection() {
// XXX Should the protocol continue handling its own goroutines? // Signalled when the active connection is closed
// I'm thinking the protocol API design I want is: connClosedChannel := make(chan struct{})
// - "handler" assigned per-connection connectionsEnabled := false
// - each inbound listener has its own handler also, assigns for conns
// - so, non-authed connection has a handler that _can't_ do anything other than auth
// - inbound contact req connection gets a special handler for that case
// - authenticated contact conns get handler changed here
// It's probably more sensible to never break the conn read loop, because of buffering etc
// So likely we don't want to change that goroutine. Could still use a channel to pass it
// to handler for parsing, which could let it go on any goroutine we want, if it's desirable
// to put it on e.g. this routine. Hmm.
connChannel := c.connChannel
connClosedChannel := c.connClosedChannel
stopped := c.connStopped
connectionLoop:
for { for {
// If there is no active connection, spawn an outbound connector. if !connectionsEnabled {
// A successful connection is returned via connChannel; otherwise, it will keep trying. // Reject all connections on connChannel and wait for start signal
select {
case conn := <-c.connChannel:
if conn != nil {
log.Printf("Discarded connection to %s because connections are disabled", c.Address())
go closeUnhandledConnection(conn)
// XXX-protocol doing this here instead of during auth means they'll keep trying endlessly. Doing it in
// auth means they'll never try again. Both are sometimes wrong. Hmm.
}
case enable := <-c.connEnabledSignal:
if enable {
log.Printf("Contact %s connections are enabled", c.Address())
connectionsEnabled = true
}
// XXX hard kill
}
continue
}
// If there is no active connection, spawn an outbound connector. A successful connection
// is returned via connChannel, and otherwise it will keep trying until cancelled via
// the context.
var outboundCtx context.Context var outboundCtx context.Context
outboundCancel := func() {} outboundCancel := func() {}
if c.connection == nil && c.shouldMakeOutboundConnections() { if c.connection == nil && c.shouldMakeOutboundConnections() {
outboundCtx, outboundCancel = context.WithCancel(context.Background()) outboundCtx, outboundCancel = context.WithCancel(context.Background())
go c.connectOutbound(outboundCtx, connChannel) go c.connectOutbound(outboundCtx, c.connChannel)
} }
select { select {
case conn, ok := <-connChannel: case conn := <-c.connChannel:
outboundCancel() outboundCancel()
if !ok { if conn == nil {
// Closing connChannel exits this connection routine, for contact
// deletion, exit, or some other case.
break connectionLoop
} else if conn == nil {
// Signal used to restart outbound connection attempts // Signal used to restart outbound connection attempts
continue continue
} }
// Received a new connection. The connection is already authenticated and ready to use. c.mutex.Lock()
// This connection may be inbound or outbound. setConnection will decide whether to // Decide whether to keep this connection; if this returns an error, conn is
// replace an existing connection. // already closed. If there was an existing connection and this returns nil,
// the old connection is closed but c.connection has not been reset.
// XXX Tweak setConnection logic if needed if err := c.considerUsingConnection(conn); err != nil {
// Logic when keeping connection, need to make sure protocol is going, etc... log.Printf("Discarded new contact %s connection: %s", c.data.Hostname, err)
if err := c.setConnection(conn); err != nil { go closeUnhandledConnection(conn)
log.Printf("Discarded new contact %s connection: %s", c.Address(), err) c.mutex.Unlock()
if !conn.Closed && conn != c.connection {
conn.Close()
}
continue continue
} }
replacingConn := c.connection != nil
c.connection = conn
if replacingConn {
// Wait for old handleConnection to return
c.mutex.Unlock()
<-connClosedChannel
c.mutex.Lock()
}
go c.handleConnection(conn, connClosedChannel)
c.onConnectionStateChanged()
c.mutex.Unlock()
case <-connClosedChannel: case <-connClosedChannel:
outboundCancel() outboundCancel()
c.clearConnection(nil) c.mutex.Lock()
c.connection = nil
c.onConnectionStateChanged()
c.mutex.Unlock()
case enable := <-c.connEnabledSignal:
outboundCancel()
if !enable {
connectionsEnabled = false
log.Printf("Contact %s connections are disabled", c.Address())
}
} }
} }
log.Printf("Exiting contact connection loop for %s", c.Address()) log.Printf("Exiting contact connection loop for %s", c.Address())
c.clearConnection(nil) c.mutex.Lock()
close(stopped) if c.connection != nil {
c.connection.Conn.Close()
c.connection = nil
c.onConnectionStateChanged()
c.mutex.Unlock()
<-connClosedChannel
} else {
c.mutex.Unlock()
}
}
// Goroutine to maintain an open contact connection, calls Process and reports when closed.
func (c *Contact) handleConnection(conn *connection.Connection, closedChannel chan struct{}) {
// Connection does not outlive this function
defer func() {
conn.Conn.Close()
closedChannel <- struct{}{}
}()
log.Printf("Contact connection for %s ready", conn.RemoteHostname)
handler := NewContactProtocolHandler(c, conn)
err := conn.Process(handler)
if err == nil {
// Somebody called Break?
err = fmt.Errorf("Connection handler interrupted unexpectedly")
}
log.Printf("Contact connection for %s closed: %s", conn.RemoteHostname, err)
} }
// Attempt an outbound connection to the contact, retrying automatically using OnionConnector. // Attempt an outbound connection to the contact, retrying automatically using OnionConnector.
// This function _must_ send something to connChannel before returning, unless the context has // This function _must_ send something to connChannel before returning, unless the context has
// been cancelled. // been cancelled.
func (c *Contact) connectOutbound(ctx context.Context, connChannel chan *protocol.OpenConnection) { func (c *Contact) connectOutbound(ctx context.Context, connChannel chan *connection.Connection) {
c.mutex.Lock() c.mutex.Lock()
connector := OnionConnector{ connector := OnionConnector{
Network: c.core.Network, Network: c.core.Network,
NeverGiveUp: true, NeverGiveUp: true,
} }
hostname := c.data.Hostname hostname := c.data.Hostname
isRequest := c.data.Request.Pending
c.mutex.Unlock() c.mutex.Unlock()
for { for {
@ -296,110 +351,131 @@ func (c *Contact) connectOutbound(ctx context.Context, connChannel chan *protoco
continue continue
} }
// XXX-protocol Ideally this should all take place under ctx also; easy option is a goroutine
// blocked on ctx that kills the connection.
log.Printf("Successful outbound connection to contact %s", hostname) log.Printf("Successful outbound connection to contact %s", hostname)
oc, err := protocol.Open(conn, hostname[0:16]) oc, err := protocol.NegotiateVersionOutbound(conn, hostname[0:16])
if err != nil { if err != nil {
log.Printf("Contact connection protocol failure: %s", err) log.Printf("Outbound connection version negotiation failed: %v", err)
if oc != nil { conn.Close()
oc.Close()
}
if err := connector.Backoff(ctx); err != nil { if err := connector.Backoff(ctx); err != nil {
return return
} }
continue continue
} else {
log.Printf("Protocol connection open: %v", oc)
// XXX Protocol API needs to be reworked; see notes in
// contactConnection. Ideally we should authenticate here and
// pass the conneciton back, but for now this can do nothing:
// the connection will either succeed and come in via the
// protocol handler, or will be closed and signalled via
// OnConnectionClosed. Alternatively, it will break because this
// is fragile and dumb.
// XXX BUG: This means no backoff for authentication failure
handler := &ProtocolConnection{
Core: c.core,
Conn: oc,
Contact: c,
MyHostname: c.core.Identity.Address()[9:],
PrivateKey: c.core.Identity.PrivateKey(),
} }
go oc.Process(handler)
log.Printf("Outbound connection negotiated version; authenticating")
privateKey := c.core.Identity.PrivateKey()
known, err := connection.HandleOutboundConnection(oc).ProcessAuthAsClient(&privateKey)
if err != nil {
log.Printf("Outbound connection authentication failed: %v", err)
closeUnhandledConnection(oc)
if err := connector.Backoff(ctx); err != nil {
return return
} }
continue
}
// XXX-protocol Also move the "this is an outbound request" logic here to kill the silly flag and
// move those out of a scary mutexed path? XXX
if !known && !isRequest {
log.Printf("Outbound connection to contact says we are not a known contact for %v", c)
// XXX Should move to rejected status, stop attempting connections.
closeUnhandledConnection(oc)
if err := connector.Backoff(ctx); err != nil {
return
}
continue
} else if known && isRequest {
log.Printf("Contact request implicitly accepted for outbound connection by contact %v", c)
c.UpdateContactRequest("Accepted")
}
log.Printf("Assigning outbound connection to contact")
c.AssignConnection(oc)
break
} }
} }
func (c *Contact) setConnection(conn *protocol.OpenConnection) error { // considerUsingConnection takes a newly established connection and decides whether
if conn.Client { // the new connection is valid and acceptable, and whether to replace or keep an
log.Printf("Contact %s has a new outbound connection", c.Address()) // existing connection. To handle race cases when peers are connecting to eachother,
// a particular set of rules is followed for replacing an existing connection.
//
// considerUsingConnection returns nil if the new connection is valid and should be
// used. If this function returns nil, the existing connection has been closed (but
// c.connection is unmodified, and the process routine may still be executing). If
// this function returns an error, conn has been closed.
//
// Assumes that c.mutex is held.
func (c *Contact) considerUsingConnection(conn *connection.Connection) error {
killConn := conn
defer func() {
if killConn != nil {
killConn.Conn.Close()
}
}()
if conn.IsInbound {
log.Printf("Contact %s has a new inbound connection", c.data.Hostname)
} else { } else {
log.Printf("Contact %s has a new inbound connection", c.Address()) log.Printf("Contact %s has a new outbound connection", c.data.Hostname)
} }
c.mutex.Lock()
if conn == c.connection { if conn == c.connection {
c.mutex.Unlock()
return fmt.Errorf("Duplicate assignment of connection %v to contact %v", conn, c) return fmt.Errorf("Duplicate assignment of connection %v to contact %v", conn, c)
} }
if !conn.IsAuthed || conn.Closed { if !conn.Authentication["im.ricochet.auth.hidden-service"] {
c.mutex.Unlock() return fmt.Errorf("Connection %v is not authenticated", conn)
conn.Close()
return fmt.Errorf("Connection %v is not in a valid state to assign to contact %v", conn, c)
} }
if c.data.Hostname[0:16] != conn.OtherHostname { if c.data.Hostname[0:16] != conn.RemoteHostname {
c.mutex.Unlock() return fmt.Errorf("Connection hostname %s doesn't match contact hostname %s when assigning connection", conn.RemoteHostname, c.data.Hostname[0:16])
conn.Close()
return fmt.Errorf("Connection hostname %s doesn't match contact hostname %s when assigning connection", conn.OtherHostname, c.data.Hostname[0:16])
} }
if conn.Client && !c.outboundConnAuthKnown && !c.data.Request.Pending { if c.connection != nil && !c.shouldReplaceConnection(conn) {
log.Printf("Outbound connection to contact says we are not a known contact for %v", c)
// XXX Should move to rejected status, stop attempting connections.
c.mutex.Unlock()
conn.Close()
return fmt.Errorf("Outbound connection says we are not a known contact")
}
if c.connection != nil {
if c.shouldReplaceConnection(conn) {
// XXX Signal state change for connection loss?
c.connection.Close()
c.connection = nil
} else {
c.mutex.Unlock()
conn.Close()
return fmt.Errorf("Using existing connection") return fmt.Errorf("Using existing connection")
} }
}
// If this connection is inbound and there's an outbound attempt, keep this // If this connection is inbound and there's an outbound attempt, keep this
// connection and cancel outbound if we haven't sent authentication yet, or // connection and cancel outbound if we haven't sent authentication yet, or
// if the outbound connection will lose the fallback comparison above. // if the outbound connection will lose the fallback comparison above.
// XXX implement this // XXX implement this; currently outbound is always cancelled when an inbound
// connection succeeds.
c.connection = conn // We will keep conn, close c.connection instead if there was one
log.Printf("Assigned connection %v to contact %v", c.connection, c) killConn = c.connection
return nil
}
// onConnectionStateChanged is called by the connection loop when the c.connection
// is changed, which can be a transition to online or offline or a replacement.
// Assumes c.mutex is held.
func (c *Contact) onConnectionStateChanged() {
if c.connection != nil {
if c.data.Request.Pending { if c.data.Request.Pending {
if conn.Client && !c.outboundConnAuthKnown { if !c.connection.IsInbound {
// Outbound connection for contact request; send request message // Outbound connection for contact request; send request message
// XXX hardcoded channel ID
log.Printf("Sending outbound contact request to %v", c) log.Printf("Sending outbound contact request to %v", c)
conn.SendContactRequest(5, c.data.Request.MyNickname, c.data.Request.Message) // XXX-protocol ooooohhhhh no you don't. Cannot interact w/ protocol here, because process may
// not have started yet. Maybe this one needs to go w/ outbound auth in pre-connection also
// honestly, it would not be a bad thing to have outbound unaccepted requests _not_ be
// considered active connections, as long as they get handled properly.
// c.connection.SendContactRequest(5, c.data.Request.MyNickname, c.data.Request.Message)
} else { } else {
// Inbound connection or outbound connection with a positive // Inbound connection implicitly accepts the contact request and can continue as a contact
// 'isKnownContact' response implicitly accepts the contact request
// and can continue as a contact
log.Printf("Contact request implicitly accepted by contact %v", c) log.Printf("Contact request implicitly accepted by contact %v", c)
c.updateContactRequest("Accepted") c.updateContactRequest("Accepted")
} }
} else { } else {
c.status = ricochet.Contact_ONLINE c.status = ricochet.Contact_ONLINE
} }
} else {
if c.status == ricochet.Contact_ONLINE {
c.status = ricochet.Contact_OFFLINE
}
}
// Update LastConnected time // Update LastConnected time
c.timeConnected = time.Now() c.timeConnected = time.Now()
@ -409,8 +485,8 @@ func (c *Contact) setConnection(conn *protocol.OpenConnection) error {
config.Contacts[strconv.Itoa(c.id)] = c.data config.Contacts[strconv.Itoa(c.id)] = c.data
config.Save() config.Save()
// _really_ assumes c.mutex was held
c.mutex.Unlock() c.mutex.Unlock()
event := ricochet.ContactEvent{ event := ricochet.ContactEvent{
Type: ricochet.ContactEvent_UPDATE, Type: ricochet.ContactEvent_UPDATE,
Subject: &ricochet.ContactEvent_Contact{ Subject: &ricochet.ContactEvent_Contact{
@ -419,59 +495,24 @@ func (c *Contact) setConnection(conn *protocol.OpenConnection) error {
} }
c.events.Publish(event) c.events.Publish(event)
if c.connection != nil {
// Send any queued messages // Send any queued messages
sent := c.Conversation().SendQueuedMessages() sent := c.Conversation().SendQueuedMessages()
if sent > 0 { if sent > 0 {
log.Printf("Sent %d queued messages to contact", sent) log.Printf("Sent %d queued messages to contact", sent)
} }
return nil
} }
// Close and clear state related to the active contact connection.
// If ifConn is non-nil, the active connection is only cleared if
// it is the same as ifConn. Returns true if cleared.
func (c *Contact) clearConnection(ifConn *protocol.OpenConnection) bool {
c.mutex.Lock() c.mutex.Lock()
if c.connection == nil || (ifConn != nil && c.connection != ifConn) {
c.mutex.Unlock()
return false
}
conn := c.connection
c.connection = nil
c.status = ricochet.Contact_OFFLINE
// XXX eww, and also potentially deadlockable?
config := c.core.Config.OpenWrite()
c.data.LastConnected = time.Now().Format(time.RFC3339)
config.Contacts[strconv.Itoa(c.id)] = c.data
config.Save()
c.mutex.Unlock()
if conn != nil && !conn.Closed {
conn.Close()
}
event := ricochet.ContactEvent{
Type: ricochet.ContactEvent_UPDATE,
Subject: &ricochet.ContactEvent_Contact{
Contact: c.Data(),
},
}
c.events.Publish(event)
return true
} }
// Decide whether to replace the existing connection with conn. // Decide whether to replace the existing connection with conn.
// Assumes mutex is held. // Assumes mutex is held.
func (c *Contact) shouldReplaceConnection(conn *protocol.OpenConnection) bool { func (c *Contact) shouldReplaceConnection(conn *connection.Connection) bool {
myHostname, _ := PlainHostFromAddress(c.core.Identity.Address())
if c.connection == nil { if c.connection == nil {
return true return true
} else if c.connection.Closed { } else if c.connection.IsInbound == conn.IsInbound {
log.Printf("Replacing dead connection %v for contact %v", c.connection, c)
return true
} else if c.connection.Client == conn.Client {
// If the existing connection is in the same direction, always use the new one // If the existing connection is in the same direction, always use the new one
log.Printf("Replacing existing same-direction connection %v with new connection %v for contact %v", c.connection, conn, c) log.Printf("Replacing existing same-direction connection %v with new connection %v for contact %v", c.connection, conn, c)
return true return true
@ -479,7 +520,7 @@ func (c *Contact) shouldReplaceConnection(conn *protocol.OpenConnection) bool {
// If the existing connection is more than 30 seconds old, use the new one // If the existing connection is more than 30 seconds old, use the new one
log.Printf("Replacing existing %v old connection %v with new connection %v for contact %v", time.Since(c.timeConnected), c.connection, conn, c) log.Printf("Replacing existing %v old connection %v with new connection %v for contact %v", time.Since(c.timeConnected), c.connection, conn, c)
return true return true
} else if preferOutbound := conn.MyHostname < conn.OtherHostname; preferOutbound == conn.Client { } else if preferOutbound := myHostname < conn.RemoteHostname; preferOutbound != conn.IsInbound {
// Fall back to string comparison of hostnames for a stable resolution // Fall back to string comparison of hostnames for a stable resolution
// New connection wins // New connection wins
log.Printf("Replacing existing connection %v with new connection %v for contact %v according to fallback order", c.connection, conn, c) log.Printf("Replacing existing connection %v with new connection %v for contact %v according to fallback order", c.connection, conn, c)
@ -552,29 +593,13 @@ func (c *Contact) updateContactRequest(status string) bool {
return re return re
} }
// XXX also will go away during protocol API rework // AssignConnection takes new connections, inbound or outbound, to this contact, and
func (c *Contact) OnConnectionAuthenticated(conn *protocol.OpenConnection, knownContact bool) { // asynchronously decides whether to keep or close them.
c.mutex.Lock() func (c *Contact) AssignConnection(conn *connection.Connection) {
if c.connChannel == nil { c.connectionOnce.Do(func() {
log.Printf("Inbound connection from contact, but connections are not enabled for contact %v", c) go c.contactConnection()
c.mutex.Unlock() })
conn.Close()
}
// XXX this is ugly
if conn.Client {
c.outboundConnAuthKnown = knownContact
}
c.connChannel <- conn
c.mutex.Unlock()
}
// XXX rework connection close to have a proper notification instead of this "find contact" mess. // If connections are disabled, this connection will be closed by contactConnection
func (c *Contact) OnConnectionClosed(conn *protocol.OpenConnection) { c.connChannel <- conn
c.mutex.Lock()
if c.connection != conn || c.connClosedChannel == nil {
c.mutex.Unlock()
return
}
c.connClosedChannel <- struct{}{}
c.mutex.Unlock()
} }

View File

@ -208,6 +208,9 @@ func (this *ContactList) RemoveContact(contact *Contact) error {
return errors.New("Not in contact list") return errors.New("Not in contact list")
} }
// XXX How do we safely make sure that the contact has stopped everything, and that
// nobody is going to block on it or keep referencing it..? This is insufficient, it
// leaves a goroutine up among other things.
contact.StopConnection() contact.StopConnection()
config := this.core.Config.OpenWrite() config := this.core.Config.OpenWrite()

View File

@ -4,7 +4,7 @@ import (
"errors" "errors"
"github.com/ricochet-im/ricochet-go/core/utils" "github.com/ricochet-im/ricochet-go/core/utils"
"github.com/ricochet-im/ricochet-go/rpc" "github.com/ricochet-im/ricochet-go/rpc"
protocol "github.com/s-rah/go-ricochet" connection "github.com/s-rah/go-ricochet/connection"
"log" "log"
"math/rand" "math/rand"
"sync" "sync"
@ -182,10 +182,13 @@ func (c *Conversation) SendQueuedMessages() int {
return sent return sent
} }
func sendMessageToConnection(conn *protocol.OpenConnection, message *ricochet.Message) { func sendMessageToConnection(conn *connection.Connection, message *ricochet.Message) {
// XXX
panic("sendMessageToConnection needs implementing for new protocol API")
/*
// XXX hardcoded channel IDs, also channel IDs shouldn't be exposed // XXX hardcoded channel IDs, also channel IDs shouldn't be exposed
channelId := int32(7) channelId := int32(7)
if !conn.Client { if conn.IsInbound {
channelId++ channelId++
} }
// XXX no error handling // XXX no error handling
@ -195,6 +198,7 @@ func sendMessageToConnection(conn *protocol.OpenConnection, message *ricochet.Me
// XXX no message IDs // XXX no message IDs
conn.SendMessage(channelId, message.Text) conn.SendMessage(channelId, message.Text)
*/
} }
// XXX This is inefficient -- it'll usually only be marking the last message // XXX This is inefficient -- it'll usually only be marking the last message

View File

@ -6,8 +6,10 @@ import (
"errors" "errors"
"github.com/ricochet-im/ricochet-go/core/utils" "github.com/ricochet-im/ricochet-go/core/utils"
protocol "github.com/s-rah/go-ricochet" protocol "github.com/s-rah/go-ricochet"
connection "github.com/s-rah/go-ricochet/connection"
"github.com/yawning/bulb/utils/pkcs1" "github.com/yawning/bulb/utils/pkcs1"
"log" "log"
"net"
"sync" "sync"
) )
@ -103,34 +105,6 @@ func (me *Identity) setPrivateKey(key *rsa.PrivateKey) error {
return nil return nil
} }
type identityService struct {
Identity *Identity
MyHostname string
}
func (is *identityService) OnNewConnection(oc *protocol.OpenConnection) {
log.Printf("Inbound connection accepted")
oc.MyHostname = is.MyHostname
// XXX Should have pre-auth handling, timeouts
identity := is.Identity
handler := &ProtocolConnection{
Core: identity.core,
Conn: oc,
GetContactByHostname: func(hostname string) *Contact {
address, ok := AddressFromPlainHost(hostname)
if !ok {
return nil
}
return identity.ContactList().ContactByAddress(address)
},
}
go oc.Process(handler)
}
func (is *identityService) OnFailedConnection(err error) {
log.Printf("Inbound connection failed: %v", err)
}
// BUG(special): No error handling for failures under publishService // BUG(special): No error handling for failures under publishService
func (me *Identity) publishService(key *rsa.PrivateKey) { func (me *Identity) publishService(key *rsa.PrivateKey) {
// This call will block until a control connection is available and the // This call will block until a control connection is available and the
@ -160,17 +134,72 @@ func (me *Identity) publishService(key *rsa.PrivateKey) {
} }
log.Printf("Identity service published, accepting connections") log.Printf("Identity service published, accepting connections")
is := &identityService{ for {
Identity: me, conn, err := listener.Accept()
MyHostname: me.Address()[9:],
}
err = protocol.Serve(listener, is)
if err != nil { if err != nil {
log.Printf("Identity listener failed: %v", err) log.Printf("Identity listener failed: %v", err)
// XXX handle // XXX handle
return return
} }
// Handle connection in a separate goroutine and continue listening
go me.handleInboundConnection(conn)
}
}
func (me *Identity) handleInboundConnection(conn net.Conn) error {
defer func() {
// Close conn on return unless explicitly cleared
if conn != nil {
conn.Close()
}
}()
contactByHostname := func(hostname string) (*Contact, error) {
address, ok := AddressFromPlainHost(hostname)
if !ok {
// This would be a bug
return nil, errors.New("invalid authenticated hostname")
}
return me.contactList.ContactByAddress(address), nil
}
lookupContactAuth := func(hostname string, publicKey rsa.PublicKey) (bool, bool) {
contact, err := contactByHostname(hostname)
if err != nil {
return false, false
}
// allowed, known
return true, contact != nil
}
rc, err := protocol.NegotiateVersionInbound(conn)
if err != nil {
log.Printf("Inbound connection failed: %v", err)
return err
}
authHandler := connection.HandleInboundConnection(rc)
err = authHandler.ProcessAuthAsServer(me.privateKey, lookupContactAuth)
if err != nil {
log.Printf("Inbound connection auth failed: %v", err)
return err
}
contact, err := contactByHostname(rc.RemoteHostname)
if err != nil {
log.Printf("Inbound connection lookup failed: %v", err)
return err
}
if contact != nil {
// Known contact, pass the new connection to Contact
contact.AssignConnection(rc)
conn = nil
return nil
}
// XXX-protocol Unknown contact, should pass to handler for contact requests
log.Printf("Inbound connection is a contact request, but that's not implemented yet")
return errors.New("inbound contact request connections aren't implemented")
} }
func (me *Identity) Address() string { func (me *Identity) Address() string {

View File

@ -3,7 +3,7 @@ package core
import ( import (
"errors" "errors"
"github.com/ricochet-im/ricochet-go/rpc" "github.com/ricochet-im/ricochet-go/rpc"
protocol "github.com/s-rah/go-ricochet" connection "github.com/s-rah/go-ricochet/connection"
"log" "log"
"sync" "sync"
"time" "time"
@ -13,7 +13,7 @@ type InboundContactRequest struct {
core *Ricochet core *Ricochet
mutex sync.Mutex mutex sync.Mutex
data ricochet.ContactRequest data ricochet.ContactRequest
conn *protocol.OpenConnection conn *connection.Connection
channelID int32 channelID int32
Address string Address string
@ -41,14 +41,14 @@ func CreateInboundContactRequest(core *Ricochet, address, nickname, message stri
return cr return cr
} }
// XXX There should be stricter management & a timeout for this connection // XXX-protocol There should be stricter management & a timeout for this connection
func (cr *InboundContactRequest) SetConnection(conn *protocol.OpenConnection, channelID int32) { func (cr *InboundContactRequest) SetConnection(conn *connection.Connection, channelID int32) {
cr.mutex.Lock() cr.mutex.Lock()
defer cr.mutex.Unlock() defer cr.mutex.Unlock()
if cr.conn != nil && cr.conn != conn { if cr.conn != nil && cr.conn != conn {
log.Printf("Replacing connection on an inbound contact request") log.Printf("Replacing connection on an inbound contact request")
cr.conn.Close() cr.conn.Conn.Close()
} }
cr.conn = conn cr.conn = conn
cr.channelID = channelID cr.channelID = channelID
@ -56,7 +56,7 @@ func (cr *InboundContactRequest) SetConnection(conn *protocol.OpenConnection, ch
func (cr *InboundContactRequest) CloseConnection() { func (cr *InboundContactRequest) CloseConnection() {
if cr.conn != nil { if cr.conn != nil {
cr.conn.Close() cr.conn.Conn.Close()
cr.conn = nil cr.conn = nil
} }
} }
@ -131,10 +131,10 @@ func (cr *InboundContactRequest) AcceptWithContact(contact *Contact) error {
cr.core.Identity.ContactList().RemoveInboundContactRequest(cr) cr.core.Identity.ContactList().RemoveInboundContactRequest(cr)
// Pass the open connection to the new contact // Pass the open connection to the new contact
if cr.conn != nil && !cr.conn.Closed { if cr.conn != nil {
cr.conn.AckContactRequest(cr.channelID, "Accepted") // XXX-protocol cr.conn.AckContactRequest(cr.channelID, "Accepted")
cr.conn.CloseChannel(cr.channelID) // XXX-protocol cr.conn.CloseChannel(cr.channelID)
contact.OnConnectionAuthenticated(cr.conn, true) contact.AssignConnection(cr.conn)
cr.conn = nil cr.conn = nil
} }
@ -157,10 +157,10 @@ func (cr *InboundContactRequest) Reject() {
cr.StatusChanged(cr) cr.StatusChanged(cr)
} }
if cr.conn != nil && !cr.conn.Closed { if cr.conn != nil {
cr.conn.AckContactRequest(cr.channelID, "Rejected") // XXX-protocol cr.conn.AckContactRequest(cr.channelID, "Rejected")
cr.conn.CloseChannel(cr.channelID) // XXX-protocol cr.conn.CloseChannel(cr.channelID)
cr.conn.Close() cr.conn.Conn.Close()
cr.conn = nil cr.conn = nil
// The request can be removed once a protocol response is sent // The request can be removed once a protocol response is sent

View File

@ -1,231 +0,0 @@
package core
import (
"crypto/rsa"
"encoding/asn1"
protocol "github.com/s-rah/go-ricochet"
"log"
"time"
)
type ProtocolConnection struct {
Core *Ricochet
Conn *protocol.OpenConnection
Contact *Contact
// Client-side authentication
MyHostname string
PrivateKey rsa.PrivateKey
// Service-side authentication
GetContactByHostname func(hostname string) *Contact
}
func (pc *ProtocolConnection) OnReady(oc *protocol.OpenConnection) {
if pc.Conn != nil && pc.Conn != oc {
log.Panicf("ProtocolConnection is already assigned connection %v, but OnReady called for connection %v", pc.Conn, oc)
}
pc.Conn = oc
if pc.Conn.Client {
log.Printf("Connected to %s", pc.Conn.OtherHostname)
pc.Conn.MyHostname = pc.MyHostname
pc.Conn.IsAuthed = true // Outbound connections are authenticated
pc.Conn.Authenticate(1)
}
}
func (pc *ProtocolConnection) OnDisconnect() {
log.Printf("protocol: OnDisconnect: %v", pc)
if pc.Contact != nil {
pc.Contact.OnConnectionClosed(pc.Conn)
}
}
// Authentication Management
func (pc *ProtocolConnection) OnAuthenticationRequest(channelID int32, clientCookie [16]byte) {
log.Printf("protocol: OnAuthenticationRequest")
pc.Conn.ConfirmAuthChannel(channelID, clientCookie)
}
func (pc *ProtocolConnection) OnAuthenticationChallenge(channelID int32, serverCookie [16]byte) {
log.Printf("protocol: OnAuthenticationChallenge")
publicKeyBytes, _ := asn1.Marshal(pc.PrivateKey.PublicKey)
pc.Conn.SendProof(1, serverCookie, publicKeyBytes, &pc.PrivateKey)
}
func (pc *ProtocolConnection) OnAuthenticationProof(channelID int32, publicKey []byte, signature []byte) {
result := pc.Conn.ValidateProof(channelID, publicKey, signature)
if result {
if len(pc.Conn.OtherHostname) != 16 {
log.Printf("protocol: Invalid format for hostname '%s' in authentication proof", pc.Conn.OtherHostname)
result = false
} else {
pc.Contact = pc.GetContactByHostname(pc.Conn.OtherHostname)
}
}
isKnownContact := (pc.Contact != nil)
pc.Conn.SendAuthenticationResult(channelID, result, isKnownContact)
pc.Conn.IsAuthed = result
pc.Conn.CloseChannel(channelID)
log.Printf("protocol: OnAuthenticationProof, result: %v, contact: %v", result, pc.Contact)
if result && pc.Contact != nil {
pc.Contact.OnConnectionAuthenticated(pc.Conn, true)
}
}
func (pc *ProtocolConnection) OnAuthenticationResult(channelID int32, result bool, isKnownContact bool) {
pc.Conn.IsAuthed = result
pc.Conn.CloseChannel(channelID)
if !result {
log.Printf("protocol: Outbound connection authentication to %s failed", pc.Conn.OtherHostname)
pc.Conn.Close()
return
}
log.Printf("protocol: Outbound connection to %s authenticated", pc.Conn.OtherHostname)
if pc.Contact != nil {
pc.Contact.OnConnectionAuthenticated(pc.Conn, isKnownContact)
}
}
// Contact Management
func (pc *ProtocolConnection) OnContactRequest(channelID int32, nick string, message string) {
if pc.Conn.Client || !pc.Conn.IsAuthed || pc.Contact != nil {
pc.Conn.CloseChannel(channelID)
return
}
address, ok := AddressFromPlainHost(pc.Conn.OtherHostname)
if !ok {
pc.Conn.CloseChannel(channelID)
return
}
if len(nick) > 0 && !IsNicknameAcceptable(nick) {
log.Printf("protocol: Stripping unacceptable nickname from inbound request; encoded: %x", []byte(nick))
nick = ""
}
if len(message) > 0 && !IsMessageAcceptable(message) {
log.Printf("protocol: Stripping unacceptable message from inbound request; len: %d, encoded: %x", len(message), []byte(message))
message = ""
}
contactList := pc.Core.Identity.ContactList()
request, contact := contactList.AddOrUpdateInboundContactRequest(address, nick, message)
if contact != nil {
// Accepted immediately
pc.Conn.AckContactRequestOnResponse(channelID, "Accepted")
pc.Conn.CloseChannel(channelID)
contact.OnConnectionAuthenticated(pc.Conn, true)
} else if request != nil && !request.IsRejected() {
// Pending
pc.Conn.AckContactRequestOnResponse(channelID, "Pending")
request.SetConnection(pc.Conn, channelID)
} else {
// Rejected
pc.Conn.AckContactRequestOnResponse(channelID, "Rejected")
pc.Conn.CloseChannel(channelID)
pc.Conn.Close()
if request != nil {
contactList.RemoveInboundContactRequest(request)
}
}
}
func (pc *ProtocolConnection) OnContactRequestAck(channelID int32, status string) {
if !pc.Conn.Client || pc.Contact == nil {
pc.Conn.CloseChannel(channelID)
return
}
if !pc.Contact.UpdateContactRequest(status) {
pc.Conn.CloseChannel(channelID)
return
}
}
func (pc *ProtocolConnection) IsKnownContact(hostname string) bool {
// All uses of this are for authenticated contacts, so it's sufficient to check pc.Contact
if pc.Contact != nil {
contactHostname, _ := PlainHostFromOnion(pc.Contact.Hostname())
if hostname != contactHostname {
log.Panicf("IsKnownContact called for unexpected hostname '%s'", hostname)
}
return true
}
return false
}
// Managing Channels
func (pc *ProtocolConnection) OnOpenChannelRequest(channelID int32, channelType string) {
log.Printf("open channel request: %v %v", channelID, channelType)
pc.Conn.AckOpenChannel(channelID, channelType)
}
func (pc *ProtocolConnection) OnOpenChannelRequestSuccess(channelID int32) {
log.Printf("open channel request success: %v", channelID)
}
func (pc *ProtocolConnection) OnChannelClosed(channelID int32) {
log.Printf("channel closed: %v", channelID)
}
// Chat Messages
// XXX messageID should be (at least) uint32
func (pc *ProtocolConnection) OnChatMessage(channelID int32, messageID int32, message string) {
// XXX no time delta?
// XXX sanity checks, message contents, etc
log.Printf("chat message: %d %d %s", channelID, messageID, message)
// XXX error case
if pc.Contact == nil {
pc.Conn.Close()
}
// XXX cache?
conversation := pc.Contact.Conversation()
conversation.Receive(uint64(messageID), time.Now().Unix(), message)
pc.Conn.AckChatMessage(channelID, messageID)
}
func (pc *ProtocolConnection) OnChatMessageAck(channelID int32, messageID int32) {
// XXX no success
log.Printf("chat ack: %d %d", channelID, messageID)
// XXX error case
if pc.Contact == nil {
pc.Conn.Close()
}
conversation := pc.Contact.Conversation()
conversation.UpdateSentStatus(uint64(messageID), true)
}
// Handle Errors
func (pc *ProtocolConnection) OnFailedChannelOpen(channelID int32, errorType string) {
log.Printf("failed channel open: %d %s", channelID, errorType)
pc.Conn.UnsetChannel(channelID)
}
func (pc *ProtocolConnection) OnGenericError(channelID int32) {
pc.Conn.RejectOpenChannel(channelID, "GenericError")
}
func (pc *ProtocolConnection) OnUnknownTypeError(channelID int32) {
pc.Conn.RejectOpenChannel(channelID, "UnknownTypeError")
}
func (pc *ProtocolConnection) OnUnauthorizedError(channelID int32) {
pc.Conn.RejectOpenChannel(channelID, "UnauthorizedError")
}
func (pc *ProtocolConnection) OnBadUsageError(channelID int32) {
pc.Conn.RejectOpenChannel(channelID, "BadUsageError")
}
func (pc *ProtocolConnection) OnFailedError(channelID int32) {
pc.Conn.RejectOpenChannel(channelID, "FailedError")
}