From 99d44e12781e104ee57ea5c0e039064504ffa648 Mon Sep 17 00:00:00 2001 From: John Brooks Date: Sat, 15 Oct 2016 21:57:07 -0600 Subject: [PATCH] cli: Fix threading issues around event monitors --- cli/cli.go | 2 + cli/client.go | 269 +++++++++++++++++++++++++++----------------------- 2 files changed, 145 insertions(+), 126 deletions(-) diff --git a/cli/cli.go b/cli/cli.go index 2776f6d..3fb0940 100644 --- a/cli/cli.go +++ b/cli/cli.go @@ -42,6 +42,8 @@ func main() { os.Exit(1) } + go c.Run() + input.SetPrompt("> ") for { line := input.Line() diff --git a/cli/client.go b/cli/client.go index 345a71e..7dab2b0 100644 --- a/cli/client.go +++ b/cli/client.go @@ -16,15 +16,18 @@ type Client struct { ServerStatus ricochet.ServerStatusReply Identity ricochet.Identity - // XXX threadsafety NetworkStatus ricochet.NetworkStatus Contacts *ContactList CurrentContact *Contact + + monitorsChannel chan interface{} + populatedContacts bool } // XXX need to handle backend connection loss/reconnection.. func (c *Client) Initialize() error { c.Contacts = NewContactList() + c.monitorsChannel = make(chan interface{}, 10) // Query server status and version status, err := c.Backend.GetServerStatus(context.Background(), &ricochet.ServerStatusRequest{ @@ -48,12 +51,30 @@ func (c *Client) Initialize() error { // Spawn routines to query and monitor state changes go c.monitorNetwork() go c.monitorContacts() - go c.monitorConversations() + // Conversation monitor isn't started until contacts are populated // XXX block until populated/initialized? return nil } +func (c *Client) Run() { + for { + select { + case v := <-c.monitorsChannel: + switch event := v.(type) { + case *ricochet.NetworkStatus: + c.onNetworkStatus(event) + case *ricochet.ContactEvent: + c.onContactEvent(event) + case *ricochet.ConversationEvent: + c.onConversationEvent(event) + default: + log.Panicf("Unknown event type on monitor channel: %v", event) + } + } + } +} + func (c *Client) SetCurrentContact(contact *Contact) { c.CurrentContact = contact if c.CurrentContact != nil { @@ -86,8 +107,7 @@ func (c *Client) monitorNetwork() { break } - log.Printf("Network status changed: %v", status) - c.NetworkStatus = *status + c.monitorsChannel <- status } } @@ -99,84 +119,15 @@ func (c *Client) monitorContacts() { return } - // Populate initial contacts list for { event, err := stream.Recv() if err != nil { - log.Printf("Contact populate error: %v", err) + log.Printf("Contact monitor error: %v", err) // XXX handle break } - if event.Type != ricochet.ContactEvent_POPULATE { - log.Printf("Ignoring unexpected contact event during populate: %v", event) - continue - } - - // Populate is terminated by a nil subject - if event.Subject == nil { - break - } - - if contact := event.GetContact(); contact != nil { - c.Contacts.Populate(contact) - } else if request := event.GetRequest(); request != nil { - // XXX handle requests - log.Printf("XXX contact requests not supported") - } else { - log.Printf("XXX invalid event") - } - } - - log.Printf("Loaded %d contacts", len(c.Contacts.Contacts)) - - for { - event, err := stream.Recv() - if err != nil { - log.Printf("Contact status monitor error: %v", err) - // XXX handle - break - } - - data := event.GetContact() - - switch event.Type { - case ricochet.ContactEvent_ADD: - if data == nil { - log.Printf("Ignoring contact add event with null data") - continue - } - - c.Contacts.Added(data) - - case ricochet.ContactEvent_UPDATE: - if data == nil { - log.Printf("Ignoring contact update event with null data") - continue - } - - contact := c.Contacts.ByIdAndAddress(data.Id, data.Address) - if contact == nil { - log.Printf("Ignoring contact update event for unknown contact: %v", data) - } else { - contact.Updated(data) - } - - case ricochet.ContactEvent_DELETE: - if data == nil { - log.Printf("Ignoring contact delete event with null data") - continue - } - - contact, _ := c.Contacts.Deleted(data) - - if c.CurrentContact == contact { - c.SetCurrentContact(nil) - } - - default: - log.Printf("Ignoring unknown contact event: %v", event) - } + c.monitorsChannel <- event } } @@ -188,8 +139,6 @@ func (c *Client) monitorConversations() { return } - log.Printf("Monitoring conversations") - for { event, err := stream.Recv() if err != nil { @@ -198,54 +147,122 @@ func (c *Client) monitorConversations() { break } - // XXX Should also handle POPULATE - if event.Type != ricochet.ConversationEvent_RECEIVE && - event.Type != ricochet.ConversationEvent_SEND { - continue - } - - message := event.Msg - if message == nil || message.Recipient == nil || message.Sender == nil { - log.Printf("Ignoring invalid conversation event: %v", event) - continue - } - - var remoteEntity *ricochet.Entity - if !message.Sender.IsSelf { - remoteEntity = message.Sender - } else { - remoteEntity = message.Recipient - } - - remoteContact := c.Contacts.ByIdAndAddress(remoteEntity.ContactId, remoteEntity.Address) - if remoteContact == nil { - log.Printf("Ignoring conversation event with unknown contact: %v", event) - continue - } - - if remoteContact == c.CurrentContact { - // XXX so unsafe - if message.Sender.IsSelf { - fmt.Fprintf(c.Input.Stdout(), "\r%s > %s\n", remoteContact.Data.Nickname, message.Text) - } else { - fmt.Fprintf(c.Input.Stdout(), "\r%s < %s\n", remoteContact.Data.Nickname, message.Text) - } - } else if !message.Sender.IsSelf { - fmt.Fprintf(c.Input.Stdout(), "\r---- %s < %s\n", remoteContact.Data.Nickname, message.Text) - } - - if !message.Sender.IsSelf { - backend := c.Backend - message := message - go func() { - _, err := backend.MarkConversationRead(context.Background(), &ricochet.MarkConversationReadRequest{ - Entity: message.Sender, - LastRecvIdentifier: message.Identifier, - }) - if err != nil { - log.Printf("Mark conversation read failed: %v", err) - } - }() - } + c.monitorsChannel <- event + } +} + +func (c *Client) onNetworkStatus(status *ricochet.NetworkStatus) { + log.Printf("Network status changed: %v", status) + c.NetworkStatus = *status +} + +func (c *Client) onContactEvent(event *ricochet.ContactEvent) { + if !c.populatedContacts && event.Type != ricochet.ContactEvent_POPULATE { + log.Printf("Ignoring unexpected contact event during populate: %v", event) + return + } + + data := event.GetContact() + + switch event.Type { + case ricochet.ContactEvent_POPULATE: + // Populate is terminated by a nil subject + if event.Subject == nil { + c.populatedContacts = true + log.Printf("Loaded %d contacts", len(c.Contacts.Contacts)) + go c.monitorConversations() + } else if data != nil { + c.Contacts.Populate(data) + } else { + log.Printf("Invalid contact populate event: %v", event) + } + + case ricochet.ContactEvent_ADD: + if data == nil { + log.Printf("Ignoring contact add event with null data") + return + } + + c.Contacts.Added(data) + + case ricochet.ContactEvent_UPDATE: + if data == nil { + log.Printf("Ignoring contact update event with null data") + return + } + + contact := c.Contacts.ByIdAndAddress(data.Id, data.Address) + if contact == nil { + log.Printf("Ignoring contact update event for unknown contact: %v", data) + } else { + contact.Updated(data) + } + + case ricochet.ContactEvent_DELETE: + if data == nil { + log.Printf("Ignoring contact delete event with null data") + return + } + + contact, _ := c.Contacts.Deleted(data) + + if c.CurrentContact == contact { + c.SetCurrentContact(nil) + } + + default: + log.Printf("Ignoring unknown contact event: %v", event) + } +} + +func (c *Client) onConversationEvent(event *ricochet.ConversationEvent) { + if event.Type != ricochet.ConversationEvent_RECEIVE && + event.Type != ricochet.ConversationEvent_SEND && + event.Type != ricochet.ConversationEvent_POPULATE { + return + } + + message := event.Msg + if message == nil || message.Recipient == nil || message.Sender == nil { + log.Printf("Ignoring invalid conversation event: %v", event) + return + } + + var remoteEntity *ricochet.Entity + if !message.Sender.IsSelf { + remoteEntity = message.Sender + } else { + remoteEntity = message.Recipient + } + + remoteContact := c.Contacts.ByIdAndAddress(remoteEntity.ContactId, remoteEntity.Address) + if remoteContact == nil { + log.Printf("Ignoring conversation event with unknown contact: %v", event) + return + } + + if remoteContact == c.CurrentContact { + // XXX so unsafe + if message.Sender.IsSelf { + fmt.Fprintf(c.Input.Stdout(), "\r%s > %s\n", remoteContact.Data.Nickname, message.Text) + } else { + fmt.Fprintf(c.Input.Stdout(), "\r%s < %s\n", remoteContact.Data.Nickname, message.Text) + } + } else if !message.Sender.IsSelf { + fmt.Fprintf(c.Input.Stdout(), "\r---- %s < %s\n", remoteContact.Data.Nickname, message.Text) + } + + if !message.Sender.IsSelf { + backend := c.Backend + message := message + go func() { + _, err := backend.MarkConversationRead(context.Background(), &ricochet.MarkConversationReadRequest{ + Entity: message.Sender, + LastRecvIdentifier: message.Identifier, + }) + if err != nil { + log.Printf("Mark conversation read failed: %v", err) + } + }() } }