core: Move RpcServer into core

This commit is contained in:
John Brooks 2016-11-03 22:37:00 -06:00
parent 2a0e5b7976
commit 2ee0c07942
2 changed files with 54 additions and 55 deletions

View File

@ -28,8 +28,8 @@ func main() {
log.Fatalf("init error: %v", err) log.Fatalf("init error: %v", err)
} }
server := &RpcServer{ server := &ricochet.RpcServer{
core: core, Core: core,
} }
grpcServer := grpc.NewServer() grpcServer := grpc.NewServer()
rpc.RegisterRicochetCoreServer(grpcServer, server) rpc.RegisterRicochetCoreServer(grpcServer, server)

View File

@ -1,9 +1,8 @@
package main package core
import ( import (
"errors" "errors"
ricochet "github.com/ricochet-im/ricochet-go/core" "github.com/ricochet-im/ricochet-go/rpc"
rpc "github.com/ricochet-im/ricochet-go/rpc"
"golang.org/x/net/context" "golang.org/x/net/context"
"log" "log"
) )
@ -11,34 +10,34 @@ import (
var NotImplementedError error = errors.New("Not implemented") var NotImplementedError error = errors.New("Not implemented")
type RpcServer struct { type RpcServer struct {
core *ricochet.Ricochet Core *Ricochet
} }
func (s *RpcServer) GetServerStatus(ctx context.Context, req *rpc.ServerStatusRequest) (*rpc.ServerStatusReply, error) { func (s *RpcServer) GetServerStatus(ctx context.Context, req *ricochet.ServerStatusRequest) (*ricochet.ServerStatusReply, error) {
if req.RpcVersion != 1 { if req.RpcVersion != 1 {
return nil, errors.New("Unsupported RPC protocol version") return nil, errors.New("Unsupported RPC protocol version")
} }
return &rpc.ServerStatusReply{ return &ricochet.ServerStatusReply{
RpcVersion: 1, RpcVersion: 1,
ServerVersion: "0.0.0", ServerVersion: "0.0.0",
}, nil }, nil
} }
func (s *RpcServer) MonitorNetwork(req *rpc.MonitorNetworkRequest, stream rpc.RicochetCore_MonitorNetworkServer) error { func (s *RpcServer) MonitorNetwork(req *ricochet.MonitorNetworkRequest, stream ricochet.RicochetCore_MonitorNetworkServer) error {
events := s.core.Network.EventMonitor().Subscribe(20) events := s.Core.Network.EventMonitor().Subscribe(20)
defer s.core.Network.EventMonitor().Unsubscribe(events) defer s.Core.Network.EventMonitor().Unsubscribe(events)
// Send initial status event // Send initial status event
{ {
event := s.core.Network.GetStatus() event := s.Core.Network.GetStatus()
if err := stream.Send(&event); err != nil { if err := stream.Send(&event); err != nil {
return err return err
} }
} }
for { for {
event, ok := (<-events).(rpc.NetworkStatus) event, ok := (<-events).(ricochet.NetworkStatus)
if !ok { if !ok {
break break
} }
@ -52,41 +51,41 @@ func (s *RpcServer) MonitorNetwork(req *rpc.MonitorNetworkRequest, stream rpc.Ri
return nil return nil
} }
func (s *RpcServer) StartNetwork(ctx context.Context, req *rpc.StartNetworkRequest) (*rpc.NetworkStatus, error) { func (s *RpcServer) StartNetwork(ctx context.Context, req *ricochet.StartNetworkRequest) (*ricochet.NetworkStatus, error) {
// err represents the result of the first connection attempt, but as long // err represents the result of the first connection attempt, but as long
// as 'ok' is true, the network has started and this call was successful. // as 'ok' is true, the network has started and this call was successful.
ok, err := s.core.Network.Start("tcp://127.0.0.1:9051", "") ok, err := s.Core.Network.Start("tcp://127.0.0.1:9051", "")
if !ok { if !ok {
return nil, err return nil, err
} }
status := s.core.Network.GetStatus() status := s.Core.Network.GetStatus()
return &status, nil return &status, nil
} }
func (s *RpcServer) StopNetwork(ctx context.Context, req *rpc.StopNetworkRequest) (*rpc.NetworkStatus, error) { func (s *RpcServer) StopNetwork(ctx context.Context, req *ricochet.StopNetworkRequest) (*ricochet.NetworkStatus, error) {
s.core.Network.Stop() s.Core.Network.Stop()
status := s.core.Network.GetStatus() status := s.Core.Network.GetStatus()
return &status, nil return &status, nil
} }
func (s *RpcServer) GetIdentity(ctx context.Context, req *rpc.IdentityRequest) (*rpc.Identity, error) { func (s *RpcServer) GetIdentity(ctx context.Context, req *ricochet.IdentityRequest) (*ricochet.Identity, error) {
reply := rpc.Identity{ reply := ricochet.Identity{
Address: s.core.Identity.Address(), Address: s.Core.Identity.Address(),
} }
return &reply, nil return &reply, nil
} }
func (s *RpcServer) MonitorContacts(req *rpc.MonitorContactsRequest, stream rpc.RicochetCore_MonitorContactsServer) error { func (s *RpcServer) MonitorContacts(req *ricochet.MonitorContactsRequest, stream ricochet.RicochetCore_MonitorContactsServer) error {
monitor := s.core.Identity.ContactList().EventMonitor().Subscribe(20) monitor := s.Core.Identity.ContactList().EventMonitor().Subscribe(20)
defer s.core.Identity.ContactList().EventMonitor().Unsubscribe(monitor) defer s.Core.Identity.ContactList().EventMonitor().Unsubscribe(monitor)
// Populate // Populate
contacts := s.core.Identity.ContactList().Contacts() contacts := s.Core.Identity.ContactList().Contacts()
for _, contact := range contacts { for _, contact := range contacts {
event := &rpc.ContactEvent{ event := &ricochet.ContactEvent{
Type: rpc.ContactEvent_POPULATE, Type: ricochet.ContactEvent_POPULATE,
Subject: &rpc.ContactEvent_Contact{ Subject: &ricochet.ContactEvent_Contact{
Contact: contact.Data(), Contact: contact.Data(),
}, },
} }
@ -96,8 +95,8 @@ func (s *RpcServer) MonitorContacts(req *rpc.MonitorContactsRequest, stream rpc.
} }
// Terminate populate list with a null subject // Terminate populate list with a null subject
{ {
event := &rpc.ContactEvent{ event := &ricochet.ContactEvent{
Type: rpc.ContactEvent_POPULATE, Type: ricochet.ContactEvent_POPULATE,
} }
if err := stream.Send(event); err != nil { if err := stream.Send(event); err != nil {
return err return err
@ -105,7 +104,7 @@ func (s *RpcServer) MonitorContacts(req *rpc.MonitorContactsRequest, stream rpc.
} }
for { for {
event, ok := (<-monitor).(rpc.ContactEvent) event, ok := (<-monitor).(ricochet.ContactEvent)
if !ok { if !ok {
break break
} }
@ -119,9 +118,9 @@ func (s *RpcServer) MonitorContacts(req *rpc.MonitorContactsRequest, stream rpc.
return nil return nil
} }
func (s *RpcServer) AddContactRequest(ctx context.Context, req *rpc.ContactRequest) (*rpc.Contact, error) { func (s *RpcServer) AddContactRequest(ctx context.Context, req *ricochet.ContactRequest) (*ricochet.Contact, error) {
contactList := s.core.Identity.ContactList() contactList := s.Core.Identity.ContactList()
if req.Direction != rpc.ContactRequest_OUTBOUND { if req.Direction != ricochet.ContactRequest_OUTBOUND {
return nil, errors.New("Request must be outbound") return nil, errors.New("Request must be outbound")
} }
@ -133,12 +132,12 @@ func (s *RpcServer) AddContactRequest(ctx context.Context, req *rpc.ContactReque
return contact.Data(), nil return contact.Data(), nil
} }
func (s *RpcServer) UpdateContact(ctx context.Context, req *rpc.Contact) (*rpc.Contact, error) { func (s *RpcServer) UpdateContact(ctx context.Context, req *ricochet.Contact) (*ricochet.Contact, error) {
return nil, NotImplementedError return nil, NotImplementedError
} }
func (s *RpcServer) DeleteContact(ctx context.Context, req *rpc.DeleteContactRequest) (*rpc.DeleteContactReply, error) { func (s *RpcServer) DeleteContact(ctx context.Context, req *ricochet.DeleteContactRequest) (*ricochet.DeleteContactReply, error) {
contactList := s.core.Identity.ContactList() contactList := s.Core.Identity.ContactList()
contact := contactList.ContactByAddress(req.Address) contact := contactList.ContactByAddress(req.Address)
if contact == nil || (req.Id != 0 && contact.Id() != int(req.Id)) { if contact == nil || (req.Id != 0 && contact.Id() != int(req.Id)) {
return nil, errors.New("Contact not found") return nil, errors.New("Contact not found")
@ -148,33 +147,33 @@ func (s *RpcServer) DeleteContact(ctx context.Context, req *rpc.DeleteContactReq
return nil, err return nil, err
} }
return &rpc.DeleteContactReply{}, nil return &ricochet.DeleteContactReply{}, nil
} }
func (s *RpcServer) AcceptInboundRequest(ctx context.Context, req *rpc.ContactRequest) (*rpc.Contact, error) { func (s *RpcServer) AcceptInboundRequest(ctx context.Context, req *ricochet.ContactRequest) (*ricochet.Contact, error) {
return nil, NotImplementedError return nil, NotImplementedError
} }
func (s *RpcServer) RejectInboundRequest(ctx context.Context, req *rpc.ContactRequest) (*rpc.RejectInboundRequestReply, error) { func (s *RpcServer) RejectInboundRequest(ctx context.Context, req *ricochet.ContactRequest) (*ricochet.RejectInboundRequestReply, error) {
return nil, NotImplementedError return nil, NotImplementedError
} }
func (s *RpcServer) MonitorConversations(req *rpc.MonitorConversationsRequest, stream rpc.RicochetCore_MonitorConversationsServer) error { func (s *RpcServer) MonitorConversations(req *ricochet.MonitorConversationsRequest, stream ricochet.RicochetCore_MonitorConversationsServer) error {
// XXX Technically there is a race between starting to monitor // XXX Technically there is a race between starting to monitor
// and the list and state of messages used to populate, that could // and the list and state of messages used to populate, that could
// result in duplicate messages or other weird behavior. // result in duplicate messages or other weird behavior.
// Same problem exists for other places this pattern is used. // Same problem exists for other places this pattern is used.
monitor := s.core.Identity.ConversationStream.Subscribe(100) monitor := s.Core.Identity.ConversationStream.Subscribe(100)
defer s.core.Identity.ConversationStream.Unsubscribe(monitor) defer s.Core.Identity.ConversationStream.Unsubscribe(monitor)
{ {
// Populate with existing conversations // Populate with existing conversations
contacts := s.core.Identity.ContactList().Contacts() contacts := s.Core.Identity.ContactList().Contacts()
for _, contact := range contacts { for _, contact := range contacts {
messages := contact.Conversation().Messages() messages := contact.Conversation().Messages()
for _, message := range messages { for _, message := range messages {
event := rpc.ConversationEvent{ event := ricochet.ConversationEvent{
Type: rpc.ConversationEvent_POPULATE, Type: ricochet.ConversationEvent_POPULATE,
Msg: message, Msg: message,
} }
if err := stream.Send(&event); err != nil { if err := stream.Send(&event); err != nil {
@ -184,8 +183,8 @@ func (s *RpcServer) MonitorConversations(req *rpc.MonitorConversationsRequest, s
} }
// End population with an empty populate event // End population with an empty populate event
event := rpc.ConversationEvent{ event := ricochet.ConversationEvent{
Type: rpc.ConversationEvent_POPULATE, Type: ricochet.ConversationEvent_POPULATE,
} }
if err := stream.Send(&event); err != nil { if err := stream.Send(&event); err != nil {
return err return err
@ -193,7 +192,7 @@ func (s *RpcServer) MonitorConversations(req *rpc.MonitorConversationsRequest, s
} }
for { for {
event, ok := (<-monitor).(rpc.ConversationEvent) event, ok := (<-monitor).(ricochet.ConversationEvent)
if !ok { if !ok {
break break
} }
@ -206,14 +205,14 @@ func (s *RpcServer) MonitorConversations(req *rpc.MonitorConversationsRequest, s
return nil return nil
} }
func (s *RpcServer) SendMessage(ctx context.Context, req *rpc.Message) (*rpc.Message, error) { func (s *RpcServer) SendMessage(ctx context.Context, req *ricochet.Message) (*ricochet.Message, error) {
if req.Sender == nil || !req.Sender.IsSelf { if req.Sender == nil || !req.Sender.IsSelf {
return nil, errors.New("Invalid message sender") return nil, errors.New("Invalid message sender")
} else if req.Recipient == nil || req.Recipient.IsSelf { } else if req.Recipient == nil || req.Recipient.IsSelf {
return nil, errors.New("Invalid message recipient") return nil, errors.New("Invalid message recipient")
} }
contact := s.core.Identity.ContactList().ContactByAddress(req.Recipient.Address) contact := s.Core.Identity.ContactList().ContactByAddress(req.Recipient.Address)
if contact == nil || (req.Recipient.ContactId != 0 && int32(contact.Id()) != req.Recipient.ContactId) { if contact == nil || (req.Recipient.ContactId != 0 && int32(contact.Id()) != req.Recipient.ContactId) {
return nil, errors.New("Unknown recipient") return nil, errors.New("Unknown recipient")
} }
@ -230,16 +229,16 @@ func (s *RpcServer) SendMessage(ctx context.Context, req *rpc.Message) (*rpc.Mes
return message, nil return message, nil
} }
func (s *RpcServer) MarkConversationRead(ctx context.Context, req *rpc.MarkConversationReadRequest) (*rpc.Reply, error) { func (s *RpcServer) MarkConversationRead(ctx context.Context, req *ricochet.MarkConversationReadRequest) (*ricochet.Reply, error) {
if req.Entity == nil || req.Entity.IsSelf { if req.Entity == nil || req.Entity.IsSelf {
return nil, errors.New("Invalid entity") return nil, errors.New("Invalid entity")
} }
contact := s.core.Identity.ContactList().ContactByAddress(req.Entity.Address) contact := s.Core.Identity.ContactList().ContactByAddress(req.Entity.Address)
if contact == nil || (req.Entity.ContactId != 0 && int32(contact.Id()) != req.Entity.ContactId) { if contact == nil || (req.Entity.ContactId != 0 && int32(contact.Id()) != req.Entity.ContactId) {
return nil, errors.New("Unknown entity") return nil, errors.New("Unknown entity")
} }
contact.Conversation().MarkReadBeforeMessage(req.LastRecvIdentifier) contact.Conversation().MarkReadBeforeMessage(req.LastRecvIdentifier)
return &rpc.Reply{}, nil return &ricochet.Reply{}, nil
} }