ricochet-go/core/onionconnector.go

148 lines
3.8 KiB
Go

package core
import (
"errors"
"github.com/ricochet-im/ricochet-go/rpc"
"golang.org/x/net/context"
"log"
"math/rand"
"net"
"time"
)
type OnionConnector struct {
Network *Network
NeverGiveUp bool
AttemptCount int
}
// Attempt to connect to 'address', which must be a .onion address and port,
// using the Network from this OnionConnector instance.
//
// If NeverGiveUp is set, failed connections will be retried automatically,
// with appropriate backoff periods, and an error is only returned in fatal
// situations. The backoff is defined by AttemptCount on the OnionConnector
// instance. The backoff counter is __not__ reset after Connect returns.
//
// If the Network is not ready, this function will wait until the network
// becomes ready. The backoff attempt counter is automatically reset when
// the network status changes.
//
// Context can be used to set a timeout, deadline, or cancel function for
// the overall connection attempts.
func (oc *OnionConnector) Connect(address string, c context.Context) (net.Conn, error) {
if oc.Network == nil {
return nil, errors.New("No network configured for connector")
}
host, _, err := net.SplitHostPort(address)
if err != nil || !IsOnionValid(host) {
return nil, errors.New("Invalid address")
}
options := &net.Dialer{
Cancel: c.Done(),
Timeout: 60 * time.Second, // Maximum time for a single connect attempt
}
// Internal context used by blocking functions, assigned in the loop
var waitCtx context.Context
cancelWaitFunc := func() {}
// Cancel at return, extra closure required to call the current value
defer func() { cancelWaitFunc() }()
// Monitor for network connection status changes. On any change, reset backoff
// and call cancelWaitFunc, which is set with waitCtx in the loop below, to cancel
// the current wait and try again.
networkMonitor := oc.Network.EventMonitor().Subscribe(20)
defer oc.Network.EventMonitor().Unsubscribe(networkMonitor)
go func() {
var prevConnStatus ricochet.TorConnectionStatus_Status
for {
select {
case <-c.Done():
return
case v, ok := <-networkMonitor:
if !ok {
// monitor channel is closed when Connect() returns
return
}
event := v.(ricochet.NetworkStatus)
var connStatus ricochet.TorConnectionStatus_Status
if event.Connection != nil {
connStatus = event.Connection.Status
}
if connStatus != prevConnStatus {
prevConnStatus = connStatus
oc.ResetBackoff()
cancelWaitFunc()
}
}
}
}()
for {
waitCtx, cancelWaitFunc = context.WithCancel(c)
proxy, err := oc.Network.WaitForProxyDialer(options, waitCtx)
if err != nil {
if c.Err() != nil {
return nil, c.Err()
} else if waitCtx.Err() != nil {
continue
} else {
return nil, err
}
}
conn, err := proxy.Dial("tcp", address)
if err == nil {
// Success!
return conn, nil
} else if c.Err() != nil {
return nil, c.Err()
} else if !oc.NeverGiveUp {
return nil, err
}
log.Printf("Connection attempt %d to %s failed: %s", oc.AttemptCount, address, err)
if err := oc.Backoff(waitCtx); err != nil {
if c.Err() != nil {
return nil, c.Err()
} else if waitCtx.Err() != nil {
continue
} else {
return nil, err
}
}
}
}
var backoffDelay [7]int = [7]int{0, 30, 60, 120, 300, 600, 900}
func (oc *OnionConnector) Backoff(c context.Context) error {
oc.AttemptCount++
var delay int
if oc.AttemptCount < len(backoffDelay) {
delay = backoffDelay[oc.AttemptCount]
} else {
delay = backoffDelay[len(backoffDelay)-1]
}
// Jitter by +/-20%
delay += int(float32(delay) * (rand.Float32()*0.4 - 0.2))
waitCtx, finish := context.WithTimeout(c, time.Duration(delay)*time.Second)
defer finish()
<-waitCtx.Done()
return c.Err()
}
func (oc *OnionConnector) ResetBackoff() {
oc.AttemptCount = 0
}