From 0b19ae4b66f434b03caa0b7aefa45f9690219716 Mon Sep 17 00:00:00 2001 From: John Brooks Date: Sat, 16 Sep 2017 12:12:46 +0200 Subject: [PATCH] Implement conversation with the new protocol API Conversation now acts as a ChatChannelHandler, so a Conversation is able to directly recieve messages on the associated ChatChannel. --- core/contact-protocol.go | 28 ++--------- core/conversation.go | 100 +++++++++++++++++++++++++++------------ 2 files changed, 75 insertions(+), 53 deletions(-) diff --git a/core/contact-protocol.go b/core/contact-protocol.go index fb2a50c..c823f55 100644 --- a/core/contact-protocol.go +++ b/core/contact-protocol.go @@ -3,8 +3,6 @@ package core import ( "github.com/s-rah/go-ricochet/channels" "github.com/s-rah/go-ricochet/connection" - "log" - "time" ) type ContactProtocolHandler struct { @@ -18,32 +16,14 @@ func NewContactProtocolHandler(contact *Contact, conn *connection.Connection) *C conn: conn, contact: contact, } - handler.Init(nil, conn.RemoteHostname) + handler.Init() handler.RegisterChannelHandler("im.ricochet.chat", func() channels.Handler { - chat := new(channels.ChatChannel) - chat.Handler = handler + chat := &channels.ChatChannel{ + Handler: contact.Conversation(), + } return chat }) - // XXX Somebody needs to be calling Process, nobody is yet, need that rework in contact return handler } - -// Implement ChatChannelHandler for im.ricochet.chat -func (handler *ContactProtocolHandler) ChatMessage(messageID uint32, when time.Time, message string) bool { - // XXX sanity checks, message contents, etc - log.Printf("chat message: %d %d %v %s", messageID, when, message) - - conversation := handler.contact.Conversation() - conversation.Receive(uint64(messageID), when.Unix(), message) - return true -} - -func (handler *ContactProtocolHandler) ChatMessageAck(messageID uint32) { - // XXX no success field - log.Printf("chat ack: %d", messageID) - - conversation := handler.contact.Conversation() - conversation.UpdateSentStatus(uint64(messageID), true) -} diff --git a/core/conversation.go b/core/conversation.go index 49f92d2..7c28042 100644 --- a/core/conversation.go +++ b/core/conversation.go @@ -4,7 +4,7 @@ import ( "errors" "github.com/ricochet-im/ricochet-go/core/utils" "github.com/ricochet-im/ricochet-go/rpc" - connection "github.com/s-rah/go-ricochet/connection" + "github.com/s-rah/go-ricochet/channels" "log" "math/rand" "sync" @@ -129,10 +129,13 @@ func (c *Conversation) Send(text string) (*ricochet.Message, error) { Text: text, } - // XXX threading mess, and probably deadlockable. Need better API for conn. - conn := c.Contact.Connection() - if conn != nil { - sendMessageToConnection(conn, message) + if online, err := c.sendMessageToConnection(message); err != nil { + if online { + message.Status = ricochet.Message_ERROR + } else { + message.Status = ricochet.Message_QUEUED + } + } else { // XXX go-ricochet doesn't support message IDs & ack properly yet, so skip SENDING //message.Status = ricochet.Message_SENDING message.Status = ricochet.Message_DELIVERED @@ -166,11 +169,19 @@ func (c *Conversation) SendQueuedMessages() int { continue } - sendMessageToConnection(conn, message) - // XXX go-ricochet doesn't support message IDs & ack properly yet, so skip SENDING - //message.Status = ricochet.Message_SENDING - message.Status = ricochet.Message_DELIVERED - sent++ + if online, err := c.sendMessageToConnection(message); err != nil { + if online { + message.Status = ricochet.Message_ERROR + } else { + // Offline again? + break + } + } else { + // XXX go-ricochet doesn't support message IDs & ack properly yet, so skip SENDING + //message.Status = ricochet.Message_SENDING + message.Status = ricochet.Message_DELIVERED + sent++ + } event := ricochet.ConversationEvent{ Type: ricochet.ConversationEvent_UPDATE, @@ -182,25 +193,6 @@ func (c *Conversation) SendQueuedMessages() int { return sent } -func sendMessageToConnection(conn *connection.Connection, message *ricochet.Message) { - // XXX - panic("sendMessageToConnection needs implementing for new protocol API") - /* - // XXX hardcoded channel IDs, also channel IDs shouldn't be exposed - channelId := int32(7) - if conn.IsInbound { - channelId++ - } - // XXX no error handling - if conn.GetChannelType(channelId) != "im.ricochet.chat" { - conn.OpenChatChannel(channelId) - } - - // XXX no message IDs - conn.SendMessage(channelId, message.Text) - */ -} - // XXX This is inefficient -- it'll usually only be marking the last message // or few messages. Need a better way to know what's unread. func (c *Conversation) MarkReadBeforeMessage(msgId uint64) int { @@ -227,3 +219,53 @@ func (c *Conversation) MarkReadBeforeMessage(msgId uint64) int { return marked } + +// Implement ChatChannelHandler (im.ricochet.chat) +func (c *Conversation) ChatMessage(messageID uint32, when time.Time, message string) bool { + // XXX sanity checks, message contents, etc + log.Printf("chat message: %d %d %v %s", messageID, when, message) + + c.Receive(uint64(messageID), when.Unix(), message) + return true +} + +func (c *Conversation) ChatMessageAck(messageID uint32) { + // XXX no success field + log.Printf("chat ack: %d", messageID) + + c.UpdateSentStatus(uint64(messageID), true) +} + +func (c *Conversation) sendMessageToConnection(message *ricochet.Message) (connected bool, err error) { + conn := c.Contact.Connection() + if conn == nil { + err = errors.New("not connected") + return + } + connected = true + + err = conn.Do(func() error { + channel := conn.Channel("im.ricochet.chat", channels.Outbound) + if channel == nil { + if ch, err := conn.RequestOpenChannel("im.ricochet.chat", &channels.ChatChannel{Handler: c}); err != nil { + return err + } else { + channel = ch + } + } + chat, ok := channel.Handler.(*channels.ChatChannel) + if !ok { + channel.CloseChannel() + return errors.New("invalid chat channel") + } + + // XXX message id and all of that + chat.SendMessage(message.Text) + return nil + }) + + if err != nil { + log.Printf("chat send failed: %s", err) + } + return +}