core: More of the conversation implementation

Improved parts of the conversation implementation, moved the
conversation event monitor to Identity, added event monitor
population, and other minor changes.
This commit is contained in:
John Brooks 2016-10-15 18:04:19 -06:00
parent f9bc09c520
commit 7fe2363801
5 changed files with 169 additions and 52 deletions

View File

@ -150,10 +150,37 @@ func (s *RpcServer) RejectInboundRequest(ctx context.Context, req *rpc.ContactRe
} }
func (s *RpcServer) MonitorConversations(req *rpc.MonitorConversationsRequest, stream rpc.RicochetCore_MonitorConversationsServer) error { func (s *RpcServer) MonitorConversations(req *rpc.MonitorConversationsRequest, stream rpc.RicochetCore_MonitorConversationsServer) error {
monitor := ricochet.ConversationEventMonitor().Subscribe(100) // XXX Technically there is a race between starting to monitor
defer ricochet.ConversationEventMonitor().Unsubscribe(monitor) // and the list and state of messages used to populate, that could
// result in duplicate messages or other weird behavior.
// Same problem exists for other places this pattern is used.
monitor := s.core.Identity.ConversationStream.Subscribe(100)
defer s.core.Identity.ConversationStream.Unsubscribe(monitor)
// XXX should populate {
// Populate with existing conversations
contacts := s.core.Identity.ContactList().Contacts()
for _, contact := range contacts {
messages := contact.Conversation().Messages()
for _, message := range messages {
event := rpc.ConversationEvent{
Type: rpc.ConversationEvent_POPULATE,
Msg: message,
}
if err := stream.Send(&event); err != nil {
return err
}
}
}
// End population with an empty populate event
event := rpc.ConversationEvent{
Type: rpc.ConversationEvent_POPULATE,
}
if err := stream.Send(&event); err != nil {
return err
}
}
for { for {
event, ok := (<-monitor).(rpc.ConversationEvent) event, ok := (<-monitor).(rpc.ConversationEvent)

View File

@ -120,7 +120,7 @@ func (c *Contact) Conversation() *Conversation {
ContactId: int32(c.id), ContactId: int32(c.id),
Address: "ricochet:" + c.data.Hostname[0:16], Address: "ricochet:" + c.data.Hostname[0:16],
} }
c.conversation = NewConversation(c.core, c, entity) c.conversation = NewConversation(c, entity, c.core.Identity.ConversationStream)
} }
return c.conversation return c.conversation
} }
@ -319,6 +319,12 @@ func (c *Contact) setConnection(conn *protocol.OpenConnection) error {
} }
c.events.Publish(event) c.events.Publish(event)
// Send any queued messages
sent := c.Conversation().SendQueuedMessages()
if sent > 0 {
log.Printf("Sent %d queued messages to contact", sent)
}
return nil return nil
} }

View File

@ -1,41 +1,59 @@
package core package core
import ( import (
"errors"
protocol "github.com/s-rah/go-ricochet"
"github.com/special/notricochet/core/utils" "github.com/special/notricochet/core/utils"
"github.com/special/notricochet/rpc" "github.com/special/notricochet/rpc"
"log" "log"
"math/rand"
"sync" "sync"
"time" "time"
) )
// XXX threading model.. this one isn't great // XXX should have limits on backlog size/duration
// XXX Should probably be under core or identity // XXX populate will bring clients up to date, but they have no way
var conversationStream *utils.Publisher = utils.CreatePublisher() // of knowing which messages have been seen before. Likewise, if multiple
// clients are attached, there is no way to sync unread state between
// them. Implies that it makes sense for clients to report to backend
// when a message is "seen".
type Conversation struct { type Conversation struct {
Core *Ricochet
Contact *Contact Contact *Contact
mutex sync.Mutex mutex sync.Mutex
localEntity *ricochet.Entity localEntity *ricochet.Entity
remoteEntity *ricochet.Entity remoteEntity *ricochet.Entity
messages []*ricochet.Message messages []*ricochet.Message
lastSentMessageId uint32
events *utils.Publisher
} }
func NewConversation(core *Ricochet, contact *Contact, remoteEntity *ricochet.Entity) *Conversation { func NewConversation(contact *Contact, remoteEntity *ricochet.Entity, eventStream *utils.Publisher) *Conversation {
return &Conversation{ return &Conversation{
Core: core,
Contact: contact, Contact: contact,
localEntity: &ricochet.Entity{IsSelf: true}, localEntity: &ricochet.Entity{IsSelf: true},
remoteEntity: remoteEntity, remoteEntity: remoteEntity,
messages: make([]*ricochet.Message, 0), messages: make([]*ricochet.Message, 0),
events: eventStream,
} }
} }
func ConversationEventMonitor() utils.Subscribable { func (c *Conversation) EventMonitor() utils.Subscribable {
return conversationStream return c.events
}
func (c *Conversation) Messages() []*ricochet.Message {
c.mutex.Lock()
defer c.mutex.Unlock()
re := make([]*ricochet.Message, 0, len(c.messages))
for _, message := range c.messages {
re = append(re, message)
}
return re
} }
func (c *Conversation) Receive(id uint64, timestamp int64, text string) { func (c *Conversation) Receive(id uint64, timestamp int64, text string) {
@ -47,26 +65,24 @@ func (c *Conversation) Receive(id uint64, timestamp int64, text string) {
Status: ricochet.Message_RECEIVED, Status: ricochet.Message_RECEIVED,
Text: text, Text: text,
} }
// XXX container
// XXX limit backlog/etc
c.mutex.Lock()
c.messages = append(c.messages, message)
log.Printf("Conversation received message: %v", message)
c.mutex.Unlock()
// XXX Technically these aren't guaranteed to be in order (because c.mutex.Lock()
// the lock has been released) or to all arrive (because of publisher's defer c.mutex.Unlock()
// dropping behavior)...
c.messages = append(c.messages, message)
event := ricochet.ConversationEvent{ event := ricochet.ConversationEvent{
Type: ricochet.ConversationEvent_RECEIVE, Type: ricochet.ConversationEvent_RECEIVE,
Msg: message, Msg: message,
} }
conversationStream.Publish(event) c.events.Publish(event)
} }
func (c *Conversation) UpdateSentStatus(id uint64, success bool) { func (c *Conversation) UpdateSentStatus(id uint64, success bool) {
c.mutex.Lock() c.mutex.Lock()
for _, message := range c.messages { defer c.mutex.Unlock()
for i := len(c.messages) - 1; i >= 0; i-- {
message := c.messages[i]
if message.Status != ricochet.Message_SENDING || message.Identifier != id { if message.Status != ricochet.Message_SENDING || message.Identifier != id {
continue continue
} }
@ -77,58 +93,104 @@ func (c *Conversation) UpdateSentStatus(id uint64, success bool) {
message.Status = ricochet.Message_ERROR message.Status = ricochet.Message_ERROR
} }
c.mutex.Unlock()
event := ricochet.ConversationEvent{ event := ricochet.ConversationEvent{
Type: ricochet.ConversationEvent_UPDATE, Type: ricochet.ConversationEvent_UPDATE,
Msg: message, Msg: message,
} }
conversationStream.Publish(event) c.events.Publish(event)
return return
} }
c.mutex.Unlock()
log.Printf("Ignoring ack for unknown message id %d", id)
} }
func (c *Conversation) Send(text string) (*ricochet.Message, error) { func (c *Conversation) Send(text string) (*ricochet.Message, error) {
// XXX protocol if len(text) == 0 {
// XXX check that text is ok, get identifier, etc return nil, errors.New("Message text is empty")
// XXX decide whether sending or queued based on state } else if len(text) > 2000 {
return nil, errors.New("Message is too long")
}
c.mutex.Lock()
defer c.mutex.Unlock()
if c.lastSentMessageId == 0 {
// Rand is seeded by Ricochet.Init
c.lastSentMessageId = rand.Uint32()
}
if c.lastSentMessageId++; c.lastSentMessageId == 0 {
c.lastSentMessageId++
}
message := &ricochet.Message{ message := &ricochet.Message{
Sender: c.localEntity, Sender: c.localEntity,
Recipient: c.remoteEntity, Recipient: c.remoteEntity,
Timestamp: time.Now().Unix(), Timestamp: time.Now().Unix(),
Identifier: 0, // XXX Identifier: uint64(c.lastSentMessageId),
Status: ricochet.Message_QUEUED, Status: ricochet.Message_QUEUED,
Text: text, Text: text,
} }
// XXX witness thread disaster // XXX threading mess, and probably deadlockable. Need better API for conn.
conn := c.Contact.Connection() conn := c.Contact.Connection()
if conn != nil { if conn != nil {
// XXX hardcoded channel IDs, also channel IDs shouldn't be exposed sendMessageToConnection(conn, message)
channelId := int32(7)
if !conn.Client {
channelId++
}
// XXX no error handling
if conn.GetChannelType(channelId) != "im.ricochet.chat" {
conn.OpenChatChannel(channelId)
}
// XXX no message IDs, no acks
conn.SendMessage(channelId, text)
message.Status = ricochet.Message_SENDING message.Status = ricochet.Message_SENDING
} }
c.mutex.Lock()
c.messages = append(c.messages, message) c.messages = append(c.messages, message)
log.Printf("Conversation sent message: %v", message)
c.mutex.Unlock()
event := ricochet.ConversationEvent{ event := ricochet.ConversationEvent{
Type: ricochet.ConversationEvent_SEND, Type: ricochet.ConversationEvent_SEND,
Msg: message, Msg: message,
} }
conversationStream.Publish(event) c.events.Publish(event)
return message, nil return message, nil
} }
// Send all messages in the QUEUED state to the contact, if
// a connection is available. Should be called after a new
// connection is established.
func (c *Conversation) SendQueuedMessages() int {
c.mutex.Lock()
defer c.mutex.Unlock()
conn := c.Contact.Connection()
if conn == nil {
return 0
}
sent := 0
for _, message := range c.messages {
if message.Status != ricochet.Message_QUEUED {
continue
}
sendMessageToConnection(conn, message)
message.Status = ricochet.Message_SENDING
sent++
event := ricochet.ConversationEvent{
Type: ricochet.ConversationEvent_UPDATE,
Msg: message,
}
c.events.Publish(event)
}
return sent
}
func sendMessageToConnection(conn *protocol.OpenConnection, message *ricochet.Message) {
// XXX hardcoded channel IDs, also channel IDs shouldn't be exposed
channelId := int32(7)
if !conn.Client {
channelId++
}
// XXX no error handling
if conn.GetChannelType(channelId) != "im.ricochet.chat" {
conn.OpenChatChannel(channelId)
}
// XXX no message IDs
conn.SendMessage(channelId, message.Text)
}

View File

@ -21,11 +21,14 @@ type Identity struct {
address string address string
privateKey *rsa.PrivateKey privateKey *rsa.PrivateKey
contactList *ContactList contactList *ContactList
ConversationStream *utils.Publisher
} }
func CreateIdentity(core *Ricochet) (*Identity, error) { func CreateIdentity(core *Ricochet) (*Identity, error) {
me := &Identity{ me := &Identity{
core: core, core: core,
ConversationStream: utils.CreatePublisher(),
} }
if err := me.loadIdentity(); err != nil { if err := me.loadIdentity(); err != nil {

View File

@ -1,5 +1,13 @@
package core package core
import (
cryptorand "crypto/rand"
"log"
"math"
"math/big"
"math/rand"
)
type Ricochet struct { type Ricochet struct {
Config *Config Config *Config
Network *Network Network *Network
@ -7,6 +15,8 @@ type Ricochet struct {
} }
func (core *Ricochet) Init(conf *Config) error { func (core *Ricochet) Init(conf *Config) error {
initRand()
var err error var err error
core.Config = conf core.Config = conf
core.Network = CreateNetwork() core.Network = CreateNetwork()
@ -17,3 +27,12 @@ func (core *Ricochet) Init(conf *Config) error {
return nil return nil
} }
func initRand() {
n, err := cryptorand.Int(cryptorand.Reader, big.NewInt(math.MaxInt64))
if err != nil {
log.Panicf("rng failed: %v", err)
}
rand.Seed(n.Int64())
}