From 2ee0c0794231b65c3c98bdf48b2579e739a9a7be Mon Sep 17 00:00:00 2001 From: John Brooks Date: Thu, 3 Nov 2016 22:37:00 -0600 Subject: [PATCH] core: Move RpcServer into core --- backend/backend.go | 4 +- backend/rpc.go => core/rpcserver.go | 105 ++++++++++++++-------------- 2 files changed, 54 insertions(+), 55 deletions(-) rename backend/rpc.go => core/rpcserver.go (53%) diff --git a/backend/backend.go b/backend/backend.go index 180550f..17dd552 100644 --- a/backend/backend.go +++ b/backend/backend.go @@ -28,8 +28,8 @@ func main() { log.Fatalf("init error: %v", err) } - server := &RpcServer{ - core: core, + server := &ricochet.RpcServer{ + Core: core, } grpcServer := grpc.NewServer() rpc.RegisterRicochetCoreServer(grpcServer, server) diff --git a/backend/rpc.go b/core/rpcserver.go similarity index 53% rename from backend/rpc.go rename to core/rpcserver.go index 61d13f9..d891700 100644 --- a/backend/rpc.go +++ b/core/rpcserver.go @@ -1,9 +1,8 @@ -package main +package core import ( "errors" - ricochet "github.com/ricochet-im/ricochet-go/core" - rpc "github.com/ricochet-im/ricochet-go/rpc" + "github.com/ricochet-im/ricochet-go/rpc" "golang.org/x/net/context" "log" ) @@ -11,34 +10,34 @@ import ( var NotImplementedError error = errors.New("Not implemented") type RpcServer struct { - core *ricochet.Ricochet + Core *Ricochet } -func (s *RpcServer) GetServerStatus(ctx context.Context, req *rpc.ServerStatusRequest) (*rpc.ServerStatusReply, error) { +func (s *RpcServer) GetServerStatus(ctx context.Context, req *ricochet.ServerStatusRequest) (*ricochet.ServerStatusReply, error) { if req.RpcVersion != 1 { return nil, errors.New("Unsupported RPC protocol version") } - return &rpc.ServerStatusReply{ + return &ricochet.ServerStatusReply{ RpcVersion: 1, ServerVersion: "0.0.0", }, nil } -func (s *RpcServer) MonitorNetwork(req *rpc.MonitorNetworkRequest, stream rpc.RicochetCore_MonitorNetworkServer) error { - events := s.core.Network.EventMonitor().Subscribe(20) - defer s.core.Network.EventMonitor().Unsubscribe(events) +func (s *RpcServer) MonitorNetwork(req *ricochet.MonitorNetworkRequest, stream ricochet.RicochetCore_MonitorNetworkServer) error { + events := s.Core.Network.EventMonitor().Subscribe(20) + defer s.Core.Network.EventMonitor().Unsubscribe(events) // Send initial status event { - event := s.core.Network.GetStatus() + event := s.Core.Network.GetStatus() if err := stream.Send(&event); err != nil { return err } } for { - event, ok := (<-events).(rpc.NetworkStatus) + event, ok := (<-events).(ricochet.NetworkStatus) if !ok { break } @@ -52,41 +51,41 @@ func (s *RpcServer) MonitorNetwork(req *rpc.MonitorNetworkRequest, stream rpc.Ri return nil } -func (s *RpcServer) StartNetwork(ctx context.Context, req *rpc.StartNetworkRequest) (*rpc.NetworkStatus, error) { +func (s *RpcServer) StartNetwork(ctx context.Context, req *ricochet.StartNetworkRequest) (*ricochet.NetworkStatus, error) { // err represents the result of the first connection attempt, but as long // as 'ok' is true, the network has started and this call was successful. - ok, err := s.core.Network.Start("tcp://127.0.0.1:9051", "") + ok, err := s.Core.Network.Start("tcp://127.0.0.1:9051", "") if !ok { return nil, err } - status := s.core.Network.GetStatus() + status := s.Core.Network.GetStatus() return &status, nil } -func (s *RpcServer) StopNetwork(ctx context.Context, req *rpc.StopNetworkRequest) (*rpc.NetworkStatus, error) { - s.core.Network.Stop() - status := s.core.Network.GetStatus() +func (s *RpcServer) StopNetwork(ctx context.Context, req *ricochet.StopNetworkRequest) (*ricochet.NetworkStatus, error) { + s.Core.Network.Stop() + status := s.Core.Network.GetStatus() return &status, nil } -func (s *RpcServer) GetIdentity(ctx context.Context, req *rpc.IdentityRequest) (*rpc.Identity, error) { - reply := rpc.Identity{ - Address: s.core.Identity.Address(), +func (s *RpcServer) GetIdentity(ctx context.Context, req *ricochet.IdentityRequest) (*ricochet.Identity, error) { + reply := ricochet.Identity{ + Address: s.Core.Identity.Address(), } return &reply, nil } -func (s *RpcServer) MonitorContacts(req *rpc.MonitorContactsRequest, stream rpc.RicochetCore_MonitorContactsServer) error { - monitor := s.core.Identity.ContactList().EventMonitor().Subscribe(20) - defer s.core.Identity.ContactList().EventMonitor().Unsubscribe(monitor) +func (s *RpcServer) MonitorContacts(req *ricochet.MonitorContactsRequest, stream ricochet.RicochetCore_MonitorContactsServer) error { + monitor := s.Core.Identity.ContactList().EventMonitor().Subscribe(20) + defer s.Core.Identity.ContactList().EventMonitor().Unsubscribe(monitor) // Populate - contacts := s.core.Identity.ContactList().Contacts() + contacts := s.Core.Identity.ContactList().Contacts() for _, contact := range contacts { - event := &rpc.ContactEvent{ - Type: rpc.ContactEvent_POPULATE, - Subject: &rpc.ContactEvent_Contact{ + event := &ricochet.ContactEvent{ + Type: ricochet.ContactEvent_POPULATE, + Subject: &ricochet.ContactEvent_Contact{ Contact: contact.Data(), }, } @@ -96,8 +95,8 @@ func (s *RpcServer) MonitorContacts(req *rpc.MonitorContactsRequest, stream rpc. } // Terminate populate list with a null subject { - event := &rpc.ContactEvent{ - Type: rpc.ContactEvent_POPULATE, + event := &ricochet.ContactEvent{ + Type: ricochet.ContactEvent_POPULATE, } if err := stream.Send(event); err != nil { return err @@ -105,7 +104,7 @@ func (s *RpcServer) MonitorContacts(req *rpc.MonitorContactsRequest, stream rpc. } for { - event, ok := (<-monitor).(rpc.ContactEvent) + event, ok := (<-monitor).(ricochet.ContactEvent) if !ok { break } @@ -119,9 +118,9 @@ func (s *RpcServer) MonitorContacts(req *rpc.MonitorContactsRequest, stream rpc. return nil } -func (s *RpcServer) AddContactRequest(ctx context.Context, req *rpc.ContactRequest) (*rpc.Contact, error) { - contactList := s.core.Identity.ContactList() - if req.Direction != rpc.ContactRequest_OUTBOUND { +func (s *RpcServer) AddContactRequest(ctx context.Context, req *ricochet.ContactRequest) (*ricochet.Contact, error) { + contactList := s.Core.Identity.ContactList() + if req.Direction != ricochet.ContactRequest_OUTBOUND { return nil, errors.New("Request must be outbound") } @@ -133,12 +132,12 @@ func (s *RpcServer) AddContactRequest(ctx context.Context, req *rpc.ContactReque return contact.Data(), nil } -func (s *RpcServer) UpdateContact(ctx context.Context, req *rpc.Contact) (*rpc.Contact, error) { +func (s *RpcServer) UpdateContact(ctx context.Context, req *ricochet.Contact) (*ricochet.Contact, error) { return nil, NotImplementedError } -func (s *RpcServer) DeleteContact(ctx context.Context, req *rpc.DeleteContactRequest) (*rpc.DeleteContactReply, error) { - contactList := s.core.Identity.ContactList() +func (s *RpcServer) DeleteContact(ctx context.Context, req *ricochet.DeleteContactRequest) (*ricochet.DeleteContactReply, error) { + contactList := s.Core.Identity.ContactList() contact := contactList.ContactByAddress(req.Address) if contact == nil || (req.Id != 0 && contact.Id() != int(req.Id)) { return nil, errors.New("Contact not found") @@ -148,33 +147,33 @@ func (s *RpcServer) DeleteContact(ctx context.Context, req *rpc.DeleteContactReq return nil, err } - return &rpc.DeleteContactReply{}, nil + return &ricochet.DeleteContactReply{}, nil } -func (s *RpcServer) AcceptInboundRequest(ctx context.Context, req *rpc.ContactRequest) (*rpc.Contact, error) { +func (s *RpcServer) AcceptInboundRequest(ctx context.Context, req *ricochet.ContactRequest) (*ricochet.Contact, error) { return nil, NotImplementedError } -func (s *RpcServer) RejectInboundRequest(ctx context.Context, req *rpc.ContactRequest) (*rpc.RejectInboundRequestReply, error) { +func (s *RpcServer) RejectInboundRequest(ctx context.Context, req *ricochet.ContactRequest) (*ricochet.RejectInboundRequestReply, error) { return nil, NotImplementedError } -func (s *RpcServer) MonitorConversations(req *rpc.MonitorConversationsRequest, stream rpc.RicochetCore_MonitorConversationsServer) error { +func (s *RpcServer) MonitorConversations(req *ricochet.MonitorConversationsRequest, stream ricochet.RicochetCore_MonitorConversationsServer) error { // XXX Technically there is a race between starting to monitor // and the list and state of messages used to populate, that could // result in duplicate messages or other weird behavior. // Same problem exists for other places this pattern is used. - monitor := s.core.Identity.ConversationStream.Subscribe(100) - defer s.core.Identity.ConversationStream.Unsubscribe(monitor) + monitor := s.Core.Identity.ConversationStream.Subscribe(100) + defer s.Core.Identity.ConversationStream.Unsubscribe(monitor) { // Populate with existing conversations - contacts := s.core.Identity.ContactList().Contacts() + contacts := s.Core.Identity.ContactList().Contacts() for _, contact := range contacts { messages := contact.Conversation().Messages() for _, message := range messages { - event := rpc.ConversationEvent{ - Type: rpc.ConversationEvent_POPULATE, + event := ricochet.ConversationEvent{ + Type: ricochet.ConversationEvent_POPULATE, Msg: message, } if err := stream.Send(&event); err != nil { @@ -184,8 +183,8 @@ func (s *RpcServer) MonitorConversations(req *rpc.MonitorConversationsRequest, s } // End population with an empty populate event - event := rpc.ConversationEvent{ - Type: rpc.ConversationEvent_POPULATE, + event := ricochet.ConversationEvent{ + Type: ricochet.ConversationEvent_POPULATE, } if err := stream.Send(&event); err != nil { return err @@ -193,7 +192,7 @@ func (s *RpcServer) MonitorConversations(req *rpc.MonitorConversationsRequest, s } for { - event, ok := (<-monitor).(rpc.ConversationEvent) + event, ok := (<-monitor).(ricochet.ConversationEvent) if !ok { break } @@ -206,14 +205,14 @@ func (s *RpcServer) MonitorConversations(req *rpc.MonitorConversationsRequest, s return nil } -func (s *RpcServer) SendMessage(ctx context.Context, req *rpc.Message) (*rpc.Message, error) { +func (s *RpcServer) SendMessage(ctx context.Context, req *ricochet.Message) (*ricochet.Message, error) { if req.Sender == nil || !req.Sender.IsSelf { return nil, errors.New("Invalid message sender") } else if req.Recipient == nil || req.Recipient.IsSelf { return nil, errors.New("Invalid message recipient") } - contact := s.core.Identity.ContactList().ContactByAddress(req.Recipient.Address) + contact := s.Core.Identity.ContactList().ContactByAddress(req.Recipient.Address) if contact == nil || (req.Recipient.ContactId != 0 && int32(contact.Id()) != req.Recipient.ContactId) { return nil, errors.New("Unknown recipient") } @@ -230,16 +229,16 @@ func (s *RpcServer) SendMessage(ctx context.Context, req *rpc.Message) (*rpc.Mes return message, nil } -func (s *RpcServer) MarkConversationRead(ctx context.Context, req *rpc.MarkConversationReadRequest) (*rpc.Reply, error) { +func (s *RpcServer) MarkConversationRead(ctx context.Context, req *ricochet.MarkConversationReadRequest) (*ricochet.Reply, error) { if req.Entity == nil || req.Entity.IsSelf { return nil, errors.New("Invalid entity") } - contact := s.core.Identity.ContactList().ContactByAddress(req.Entity.Address) + contact := s.Core.Identity.ContactList().ContactByAddress(req.Entity.Address) if contact == nil || (req.Entity.ContactId != 0 && int32(contact.Id()) != req.Entity.ContactId) { return nil, errors.New("Unknown entity") } contact.Conversation().MarkReadBeforeMessage(req.LastRecvIdentifier) - return &rpc.Reply{}, nil + return &ricochet.Reply{}, nil }