From 7fe23638011d087d652fb2c01a46ed93a94e17fb Mon Sep 17 00:00:00 2001 From: John Brooks Date: Sat, 15 Oct 2016 18:04:19 -0600 Subject: [PATCH] core: More of the conversation implementation Improved parts of the conversation implementation, moved the conversation event monitor to Identity, added event monitor population, and other minor changes. --- backend/rpc.go | 33 ++++++++- core/contact.go | 8 ++- core/conversation.go | 156 ++++++++++++++++++++++++++++++------------- core/identity.go | 5 +- core/ricochet.go | 19 ++++++ 5 files changed, 169 insertions(+), 52 deletions(-) diff --git a/backend/rpc.go b/backend/rpc.go index 635e32f..951274e 100644 --- a/backend/rpc.go +++ b/backend/rpc.go @@ -150,10 +150,37 @@ func (s *RpcServer) RejectInboundRequest(ctx context.Context, req *rpc.ContactRe } func (s *RpcServer) MonitorConversations(req *rpc.MonitorConversationsRequest, stream rpc.RicochetCore_MonitorConversationsServer) error { - monitor := ricochet.ConversationEventMonitor().Subscribe(100) - defer ricochet.ConversationEventMonitor().Unsubscribe(monitor) + // XXX Technically there is a race between starting to monitor + // and the list and state of messages used to populate, that could + // result in duplicate messages or other weird behavior. + // Same problem exists for other places this pattern is used. + monitor := s.core.Identity.ConversationStream.Subscribe(100) + defer s.core.Identity.ConversationStream.Unsubscribe(monitor) - // XXX should populate + { + // Populate with existing conversations + contacts := s.core.Identity.ContactList().Contacts() + for _, contact := range contacts { + messages := contact.Conversation().Messages() + for _, message := range messages { + event := rpc.ConversationEvent{ + Type: rpc.ConversationEvent_POPULATE, + Msg: message, + } + if err := stream.Send(&event); err != nil { + return err + } + } + } + + // End population with an empty populate event + event := rpc.ConversationEvent{ + Type: rpc.ConversationEvent_POPULATE, + } + if err := stream.Send(&event); err != nil { + return err + } + } for { event, ok := (<-monitor).(rpc.ConversationEvent) diff --git a/core/contact.go b/core/contact.go index acb396f..35bf725 100644 --- a/core/contact.go +++ b/core/contact.go @@ -120,7 +120,7 @@ func (c *Contact) Conversation() *Conversation { ContactId: int32(c.id), Address: "ricochet:" + c.data.Hostname[0:16], } - c.conversation = NewConversation(c.core, c, entity) + c.conversation = NewConversation(c, entity, c.core.Identity.ConversationStream) } return c.conversation } @@ -319,6 +319,12 @@ func (c *Contact) setConnection(conn *protocol.OpenConnection) error { } c.events.Publish(event) + // Send any queued messages + sent := c.Conversation().SendQueuedMessages() + if sent > 0 { + log.Printf("Sent %d queued messages to contact", sent) + } + return nil } diff --git a/core/conversation.go b/core/conversation.go index b02ec65..db1f775 100644 --- a/core/conversation.go +++ b/core/conversation.go @@ -1,41 +1,59 @@ package core import ( + "errors" + protocol "github.com/s-rah/go-ricochet" "github.com/special/notricochet/core/utils" "github.com/special/notricochet/rpc" "log" + "math/rand" "sync" "time" ) -// XXX threading model.. this one isn't great +// XXX should have limits on backlog size/duration -// XXX Should probably be under core or identity -var conversationStream *utils.Publisher = utils.CreatePublisher() +// XXX populate will bring clients up to date, but they have no way +// of knowing which messages have been seen before. Likewise, if multiple +// clients are attached, there is no way to sync unread state between +// them. Implies that it makes sense for clients to report to backend +// when a message is "seen". type Conversation struct { - Core *Ricochet Contact *Contact mutex sync.Mutex - localEntity *ricochet.Entity - remoteEntity *ricochet.Entity - messages []*ricochet.Message + localEntity *ricochet.Entity + remoteEntity *ricochet.Entity + messages []*ricochet.Message + lastSentMessageId uint32 + + events *utils.Publisher } -func NewConversation(core *Ricochet, contact *Contact, remoteEntity *ricochet.Entity) *Conversation { +func NewConversation(contact *Contact, remoteEntity *ricochet.Entity, eventStream *utils.Publisher) *Conversation { return &Conversation{ - Core: core, Contact: contact, localEntity: &ricochet.Entity{IsSelf: true}, remoteEntity: remoteEntity, messages: make([]*ricochet.Message, 0), + events: eventStream, } } -func ConversationEventMonitor() utils.Subscribable { - return conversationStream +func (c *Conversation) EventMonitor() utils.Subscribable { + return c.events +} + +func (c *Conversation) Messages() []*ricochet.Message { + c.mutex.Lock() + defer c.mutex.Unlock() + re := make([]*ricochet.Message, 0, len(c.messages)) + for _, message := range c.messages { + re = append(re, message) + } + return re } func (c *Conversation) Receive(id uint64, timestamp int64, text string) { @@ -47,26 +65,24 @@ func (c *Conversation) Receive(id uint64, timestamp int64, text string) { Status: ricochet.Message_RECEIVED, Text: text, } - // XXX container - // XXX limit backlog/etc - c.mutex.Lock() - c.messages = append(c.messages, message) - log.Printf("Conversation received message: %v", message) - c.mutex.Unlock() - // XXX Technically these aren't guaranteed to be in order (because - // the lock has been released) or to all arrive (because of publisher's - // dropping behavior)... + c.mutex.Lock() + defer c.mutex.Unlock() + + c.messages = append(c.messages, message) event := ricochet.ConversationEvent{ Type: ricochet.ConversationEvent_RECEIVE, Msg: message, } - conversationStream.Publish(event) + c.events.Publish(event) } func (c *Conversation) UpdateSentStatus(id uint64, success bool) { c.mutex.Lock() - for _, message := range c.messages { + defer c.mutex.Unlock() + + for i := len(c.messages) - 1; i >= 0; i-- { + message := c.messages[i] if message.Status != ricochet.Message_SENDING || message.Identifier != id { continue } @@ -77,58 +93,104 @@ func (c *Conversation) UpdateSentStatus(id uint64, success bool) { message.Status = ricochet.Message_ERROR } - c.mutex.Unlock() event := ricochet.ConversationEvent{ Type: ricochet.ConversationEvent_UPDATE, Msg: message, } - conversationStream.Publish(event) + c.events.Publish(event) return } - c.mutex.Unlock() + + log.Printf("Ignoring ack for unknown message id %d", id) } func (c *Conversation) Send(text string) (*ricochet.Message, error) { - // XXX protocol - // XXX check that text is ok, get identifier, etc - // XXX decide whether sending or queued based on state + if len(text) == 0 { + return nil, errors.New("Message text is empty") + } else if len(text) > 2000 { + return nil, errors.New("Message is too long") + } + + c.mutex.Lock() + defer c.mutex.Unlock() + + if c.lastSentMessageId == 0 { + // Rand is seeded by Ricochet.Init + c.lastSentMessageId = rand.Uint32() + } + if c.lastSentMessageId++; c.lastSentMessageId == 0 { + c.lastSentMessageId++ + } + message := &ricochet.Message{ Sender: c.localEntity, Recipient: c.remoteEntity, Timestamp: time.Now().Unix(), - Identifier: 0, // XXX + Identifier: uint64(c.lastSentMessageId), Status: ricochet.Message_QUEUED, Text: text, } - // XXX witness thread disaster + // XXX threading mess, and probably deadlockable. Need better API for conn. conn := c.Contact.Connection() if conn != nil { - // XXX hardcoded channel IDs, also channel IDs shouldn't be exposed - channelId := int32(7) - if !conn.Client { - channelId++ - } - // XXX no error handling - if conn.GetChannelType(channelId) != "im.ricochet.chat" { - conn.OpenChatChannel(channelId) - } - - // XXX no message IDs, no acks - conn.SendMessage(channelId, text) + sendMessageToConnection(conn, message) message.Status = ricochet.Message_SENDING } - c.mutex.Lock() c.messages = append(c.messages, message) - log.Printf("Conversation sent message: %v", message) - c.mutex.Unlock() - event := ricochet.ConversationEvent{ Type: ricochet.ConversationEvent_SEND, Msg: message, } - conversationStream.Publish(event) + c.events.Publish(event) return message, nil } + +// Send all messages in the QUEUED state to the contact, if +// a connection is available. Should be called after a new +// connection is established. +func (c *Conversation) SendQueuedMessages() int { + c.mutex.Lock() + defer c.mutex.Unlock() + + conn := c.Contact.Connection() + if conn == nil { + return 0 + } + + sent := 0 + for _, message := range c.messages { + if message.Status != ricochet.Message_QUEUED { + continue + } + + sendMessageToConnection(conn, message) + message.Status = ricochet.Message_SENDING + sent++ + + event := ricochet.ConversationEvent{ + Type: ricochet.ConversationEvent_UPDATE, + Msg: message, + } + c.events.Publish(event) + } + + return sent +} + +func sendMessageToConnection(conn *protocol.OpenConnection, message *ricochet.Message) { + // XXX hardcoded channel IDs, also channel IDs shouldn't be exposed + channelId := int32(7) + if !conn.Client { + channelId++ + } + // XXX no error handling + if conn.GetChannelType(channelId) != "im.ricochet.chat" { + conn.OpenChatChannel(channelId) + } + + // XXX no message IDs + conn.SendMessage(channelId, message.Text) +} diff --git a/core/identity.go b/core/identity.go index 05c8814..594a932 100644 --- a/core/identity.go +++ b/core/identity.go @@ -21,11 +21,14 @@ type Identity struct { address string privateKey *rsa.PrivateKey contactList *ContactList + + ConversationStream *utils.Publisher } func CreateIdentity(core *Ricochet) (*Identity, error) { me := &Identity{ - core: core, + core: core, + ConversationStream: utils.CreatePublisher(), } if err := me.loadIdentity(); err != nil { diff --git a/core/ricochet.go b/core/ricochet.go index 839e54c..38db7c5 100644 --- a/core/ricochet.go +++ b/core/ricochet.go @@ -1,5 +1,13 @@ package core +import ( + cryptorand "crypto/rand" + "log" + "math" + "math/big" + "math/rand" +) + type Ricochet struct { Config *Config Network *Network @@ -7,6 +15,8 @@ type Ricochet struct { } func (core *Ricochet) Init(conf *Config) error { + initRand() + var err error core.Config = conf core.Network = CreateNetwork() @@ -17,3 +27,12 @@ func (core *Ricochet) Init(conf *Config) error { return nil } + +func initRand() { + n, err := cryptorand.Int(cryptorand.Reader, big.NewInt(math.MaxInt64)) + if err != nil { + log.Panicf("rng failed: %v", err) + } + + rand.Seed(n.Int64()) +}