561 lines
13 KiB
Go
561 lines
13 KiB
Go
/*
|
|
*
|
|
* Copyright 2016, Google Inc.
|
|
* All rights reserved.
|
|
*
|
|
* Redistribution and use in source and binary forms, with or without
|
|
* modification, are permitted provided that the following conditions are
|
|
* met:
|
|
*
|
|
* * Redistributions of source code must retain the above copyright
|
|
* notice, this list of conditions and the following disclaimer.
|
|
* * Redistributions in binary form must reproduce the above
|
|
* copyright notice, this list of conditions and the following disclaimer
|
|
* in the documentation and/or other materials provided with the
|
|
* distribution.
|
|
* * Neither the name of Google Inc. nor the names of its
|
|
* contributors may be used to endorse or promote products derived from
|
|
* this software without specific prior written permission.
|
|
*
|
|
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
|
|
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
|
|
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
|
|
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
|
|
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
|
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
|
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
|
|
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
|
|
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
|
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
|
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
|
*
|
|
*/
|
|
|
|
// Package grpclb implements the load balancing protocol defined at
|
|
// https://github.com/grpc/grpc/blob/master/doc/load-balancing.md.
|
|
// The implementation is currently EXPERIMENTAL.
|
|
package grpclb
|
|
|
|
import (
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
"golang.org/x/net/context"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/codes"
|
|
lbpb "google.golang.org/grpc/grpclb/grpc_lb_v1"
|
|
"google.golang.org/grpc/grpclog"
|
|
"google.golang.org/grpc/metadata"
|
|
"google.golang.org/grpc/naming"
|
|
)
|
|
|
|
// AddressType indicates the address type returned by name resolution.
|
|
type AddressType uint8
|
|
|
|
const (
|
|
// Backend indicates the server is a backend server.
|
|
Backend AddressType = iota
|
|
// GRPCLB indicates the server is a grpclb load balancer.
|
|
GRPCLB
|
|
)
|
|
|
|
// Metadata contains the information the name resolution for grpclb should provide. The
|
|
// name resolver used by grpclb balancer is required to provide this type of metadata in
|
|
// its address updates.
|
|
type Metadata struct {
|
|
// AddrType is the type of server (grpc load balancer or backend).
|
|
AddrType AddressType
|
|
// ServerName is the name of the grpc load balancer. Used for authentication.
|
|
ServerName string
|
|
}
|
|
|
|
// Balancer creates a grpclb load balancer.
|
|
func Balancer(r naming.Resolver) grpc.Balancer {
|
|
return &balancer{
|
|
r: r,
|
|
}
|
|
}
|
|
|
|
type remoteBalancerInfo struct {
|
|
addr string
|
|
// the server name used for authentication with the remote LB server.
|
|
name string
|
|
}
|
|
|
|
// addrInfo consists of the information of a backend server.
|
|
type addrInfo struct {
|
|
addr grpc.Address
|
|
connected bool
|
|
// dropRequest indicates whether a particular RPC which chooses this address
|
|
// should be dropped.
|
|
dropRequest bool
|
|
}
|
|
|
|
type balancer struct {
|
|
r naming.Resolver
|
|
mu sync.Mutex
|
|
seq int // a sequence number to make sure addrCh does not get stale addresses.
|
|
w naming.Watcher
|
|
addrCh chan []grpc.Address
|
|
rbs []remoteBalancerInfo
|
|
addrs []*addrInfo
|
|
next int
|
|
waitCh chan struct{}
|
|
done bool
|
|
expTimer *time.Timer
|
|
}
|
|
|
|
func (b *balancer) watchAddrUpdates(w naming.Watcher, ch chan remoteBalancerInfo) error {
|
|
updates, err := w.Next()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
if b.done {
|
|
return grpc.ErrClientConnClosing
|
|
}
|
|
var bAddr remoteBalancerInfo
|
|
if len(b.rbs) > 0 {
|
|
bAddr = b.rbs[0]
|
|
}
|
|
for _, update := range updates {
|
|
switch update.Op {
|
|
case naming.Add:
|
|
var exist bool
|
|
for _, v := range b.rbs {
|
|
// TODO: Is the same addr with different server name a different balancer?
|
|
if update.Addr == v.addr {
|
|
exist = true
|
|
break
|
|
}
|
|
}
|
|
if exist {
|
|
continue
|
|
}
|
|
md, ok := update.Metadata.(*Metadata)
|
|
if !ok {
|
|
// TODO: Revisit the handling here and may introduce some fallback mechanism.
|
|
grpclog.Printf("The name resolution contains unexpected metadata %v", update.Metadata)
|
|
continue
|
|
}
|
|
switch md.AddrType {
|
|
case Backend:
|
|
// TODO: Revisit the handling here and may introduce some fallback mechanism.
|
|
grpclog.Printf("The name resolution does not give grpclb addresses")
|
|
continue
|
|
case GRPCLB:
|
|
b.rbs = append(b.rbs, remoteBalancerInfo{
|
|
addr: update.Addr,
|
|
name: md.ServerName,
|
|
})
|
|
default:
|
|
grpclog.Printf("Received unknow address type %d", md.AddrType)
|
|
continue
|
|
}
|
|
case naming.Delete:
|
|
for i, v := range b.rbs {
|
|
if update.Addr == v.addr {
|
|
copy(b.rbs[i:], b.rbs[i+1:])
|
|
b.rbs = b.rbs[:len(b.rbs)-1]
|
|
break
|
|
}
|
|
}
|
|
default:
|
|
grpclog.Println("Unknown update.Op ", update.Op)
|
|
}
|
|
}
|
|
// TODO: Fall back to the basic round-robin load balancing if the resulting address is
|
|
// not a load balancer.
|
|
if len(b.rbs) > 0 {
|
|
// For simplicity, always use the first one now. May revisit this decision later.
|
|
if b.rbs[0] != bAddr {
|
|
select {
|
|
case <-ch:
|
|
default:
|
|
}
|
|
ch <- b.rbs[0]
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (b *balancer) serverListExpire(seq int) {
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
// TODO: gRPC interanls do not clear the connections when the server list is stale.
|
|
// This means RPCs will keep using the existing server list until b receives new
|
|
// server list even though the list is expired. Revisit this behavior later.
|
|
if b.done || seq < b.seq {
|
|
return
|
|
}
|
|
b.next = 0
|
|
b.addrs = nil
|
|
// Ask grpc internals to close all the corresponding connections.
|
|
b.addrCh <- nil
|
|
}
|
|
|
|
func convertDuration(d *lbpb.Duration) time.Duration {
|
|
if d == nil {
|
|
return 0
|
|
}
|
|
return time.Duration(d.Seconds)*time.Second + time.Duration(d.Nanos)*time.Nanosecond
|
|
}
|
|
|
|
func (b *balancer) processServerList(l *lbpb.ServerList, seq int) {
|
|
if l == nil {
|
|
return
|
|
}
|
|
servers := l.GetServers()
|
|
expiration := convertDuration(l.GetExpirationInterval())
|
|
var (
|
|
sl []*addrInfo
|
|
addrs []grpc.Address
|
|
)
|
|
for _, s := range servers {
|
|
md := metadata.Pairs("lb-token", s.LoadBalanceToken)
|
|
addr := grpc.Address{
|
|
Addr: fmt.Sprintf("%s:%d", s.IpAddress, s.Port),
|
|
Metadata: &md,
|
|
}
|
|
sl = append(sl, &addrInfo{
|
|
addr: addr,
|
|
dropRequest: s.DropRequest,
|
|
})
|
|
addrs = append(addrs, addr)
|
|
}
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
if b.done || seq < b.seq {
|
|
return
|
|
}
|
|
if len(sl) > 0 {
|
|
// reset b.next to 0 when replacing the server list.
|
|
b.next = 0
|
|
b.addrs = sl
|
|
b.addrCh <- addrs
|
|
if b.expTimer != nil {
|
|
b.expTimer.Stop()
|
|
b.expTimer = nil
|
|
}
|
|
if expiration > 0 {
|
|
b.expTimer = time.AfterFunc(expiration, func() {
|
|
b.serverListExpire(seq)
|
|
})
|
|
}
|
|
}
|
|
return
|
|
}
|
|
|
|
func (b *balancer) callRemoteBalancer(lbc lbpb.LoadBalancerClient, seq int) (retry bool) {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
stream, err := lbc.BalanceLoad(ctx, grpc.FailFast(false))
|
|
if err != nil {
|
|
grpclog.Printf("Failed to perform RPC to the remote balancer %v", err)
|
|
return
|
|
}
|
|
b.mu.Lock()
|
|
if b.done {
|
|
b.mu.Unlock()
|
|
return
|
|
}
|
|
b.mu.Unlock()
|
|
initReq := &lbpb.LoadBalanceRequest{
|
|
LoadBalanceRequestType: &lbpb.LoadBalanceRequest_InitialRequest{
|
|
InitialRequest: new(lbpb.InitialLoadBalanceRequest),
|
|
},
|
|
}
|
|
if err := stream.Send(initReq); err != nil {
|
|
// TODO: backoff on retry?
|
|
return true
|
|
}
|
|
reply, err := stream.Recv()
|
|
if err != nil {
|
|
// TODO: backoff on retry?
|
|
return true
|
|
}
|
|
initResp := reply.GetInitialResponse()
|
|
if initResp == nil {
|
|
grpclog.Println("Failed to receive the initial response from the remote balancer.")
|
|
return
|
|
}
|
|
// TODO: Support delegation.
|
|
if initResp.LoadBalancerDelegate != "" {
|
|
// delegation
|
|
grpclog.Println("TODO: Delegation is not supported yet.")
|
|
return
|
|
}
|
|
// Retrieve the server list.
|
|
for {
|
|
reply, err := stream.Recv()
|
|
if err != nil {
|
|
break
|
|
}
|
|
b.mu.Lock()
|
|
if b.done || seq < b.seq {
|
|
b.mu.Unlock()
|
|
return
|
|
}
|
|
b.seq++ // tick when receiving a new list of servers.
|
|
seq = b.seq
|
|
b.mu.Unlock()
|
|
if serverList := reply.GetServerList(); serverList != nil {
|
|
b.processServerList(serverList, seq)
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (b *balancer) Start(target string, config grpc.BalancerConfig) error {
|
|
// TODO: Fall back to the basic direct connection if there is no name resolver.
|
|
if b.r == nil {
|
|
return errors.New("there is no name resolver installed")
|
|
}
|
|
b.mu.Lock()
|
|
if b.done {
|
|
b.mu.Unlock()
|
|
return grpc.ErrClientConnClosing
|
|
}
|
|
b.addrCh = make(chan []grpc.Address)
|
|
w, err := b.r.Resolve(target)
|
|
if err != nil {
|
|
b.mu.Unlock()
|
|
return err
|
|
}
|
|
b.w = w
|
|
b.mu.Unlock()
|
|
balancerAddrCh := make(chan remoteBalancerInfo, 1)
|
|
// Spawn a goroutine to monitor the name resolution of remote load balancer.
|
|
go func() {
|
|
for {
|
|
if err := b.watchAddrUpdates(w, balancerAddrCh); err != nil {
|
|
grpclog.Printf("grpc: the naming watcher stops working due to %v.\n", err)
|
|
close(balancerAddrCh)
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
// Spawn a goroutine to talk to the remote load balancer.
|
|
go func() {
|
|
var cc *grpc.ClientConn
|
|
for {
|
|
rb, ok := <-balancerAddrCh
|
|
if cc != nil {
|
|
cc.Close()
|
|
}
|
|
if !ok {
|
|
// b is closing.
|
|
return
|
|
}
|
|
// Talk to the remote load balancer to get the server list.
|
|
var err error
|
|
creds := config.DialCreds
|
|
if creds == nil {
|
|
cc, err = grpc.Dial(rb.addr, grpc.WithInsecure())
|
|
} else {
|
|
if rb.name != "" {
|
|
if err := creds.OverrideServerName(rb.name); err != nil {
|
|
grpclog.Printf("Failed to override the server name in the credentials: %v", err)
|
|
continue
|
|
}
|
|
}
|
|
cc, err = grpc.Dial(rb.addr, grpc.WithTransportCredentials(creds))
|
|
}
|
|
if err != nil {
|
|
grpclog.Printf("Failed to setup a connection to the remote balancer %v: %v", rb.addr, err)
|
|
return
|
|
}
|
|
b.mu.Lock()
|
|
b.seq++ // tick when getting a new balancer address
|
|
seq := b.seq
|
|
b.next = 0
|
|
b.mu.Unlock()
|
|
go func(cc *grpc.ClientConn) {
|
|
lbc := lbpb.NewLoadBalancerClient(cc)
|
|
for {
|
|
if retry := b.callRemoteBalancer(lbc, seq); !retry {
|
|
cc.Close()
|
|
return
|
|
}
|
|
}
|
|
}(cc)
|
|
}
|
|
}()
|
|
return nil
|
|
}
|
|
|
|
func (b *balancer) down(addr grpc.Address, err error) {
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
for _, a := range b.addrs {
|
|
if addr == a.addr {
|
|
a.connected = false
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
func (b *balancer) Up(addr grpc.Address) func(error) {
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
if b.done {
|
|
return nil
|
|
}
|
|
var cnt int
|
|
for _, a := range b.addrs {
|
|
if a.addr == addr {
|
|
if a.connected {
|
|
return nil
|
|
}
|
|
a.connected = true
|
|
}
|
|
if a.connected && !a.dropRequest {
|
|
cnt++
|
|
}
|
|
}
|
|
// addr is the only one which is connected. Notify the Get() callers who are blocking.
|
|
if cnt == 1 && b.waitCh != nil {
|
|
close(b.waitCh)
|
|
b.waitCh = nil
|
|
}
|
|
return func(err error) {
|
|
b.down(addr, err)
|
|
}
|
|
}
|
|
|
|
func (b *balancer) Get(ctx context.Context, opts grpc.BalancerGetOptions) (addr grpc.Address, put func(), err error) {
|
|
var ch chan struct{}
|
|
b.mu.Lock()
|
|
if b.done {
|
|
b.mu.Unlock()
|
|
err = grpc.ErrClientConnClosing
|
|
return
|
|
}
|
|
|
|
if len(b.addrs) > 0 {
|
|
if b.next >= len(b.addrs) {
|
|
b.next = 0
|
|
}
|
|
next := b.next
|
|
for {
|
|
a := b.addrs[next]
|
|
next = (next + 1) % len(b.addrs)
|
|
if a.connected {
|
|
if !a.dropRequest {
|
|
addr = a.addr
|
|
b.next = next
|
|
b.mu.Unlock()
|
|
return
|
|
}
|
|
if !opts.BlockingWait {
|
|
b.next = next
|
|
b.mu.Unlock()
|
|
err = grpc.Errorf(codes.Unavailable, "%s drops requests", a.addr.Addr)
|
|
return
|
|
}
|
|
}
|
|
if next == b.next {
|
|
// Has iterated all the possible address but none is connected.
|
|
break
|
|
}
|
|
}
|
|
}
|
|
if !opts.BlockingWait {
|
|
if len(b.addrs) == 0 {
|
|
b.mu.Unlock()
|
|
err = grpc.Errorf(codes.Unavailable, "there is no address available")
|
|
return
|
|
}
|
|
// Returns the next addr on b.addrs for a failfast RPC.
|
|
addr = b.addrs[b.next].addr
|
|
b.next++
|
|
b.mu.Unlock()
|
|
return
|
|
}
|
|
// Wait on b.waitCh for non-failfast RPCs.
|
|
if b.waitCh == nil {
|
|
ch = make(chan struct{})
|
|
b.waitCh = ch
|
|
} else {
|
|
ch = b.waitCh
|
|
}
|
|
b.mu.Unlock()
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
err = ctx.Err()
|
|
return
|
|
case <-ch:
|
|
b.mu.Lock()
|
|
if b.done {
|
|
b.mu.Unlock()
|
|
err = grpc.ErrClientConnClosing
|
|
return
|
|
}
|
|
|
|
if len(b.addrs) > 0 {
|
|
if b.next >= len(b.addrs) {
|
|
b.next = 0
|
|
}
|
|
next := b.next
|
|
for {
|
|
a := b.addrs[next]
|
|
next = (next + 1) % len(b.addrs)
|
|
if a.connected {
|
|
if !a.dropRequest {
|
|
addr = a.addr
|
|
b.next = next
|
|
b.mu.Unlock()
|
|
return
|
|
}
|
|
if !opts.BlockingWait {
|
|
b.next = next
|
|
b.mu.Unlock()
|
|
err = grpc.Errorf(codes.Unavailable, "drop requests for the addreess %s", a.addr.Addr)
|
|
return
|
|
}
|
|
}
|
|
if next == b.next {
|
|
// Has iterated all the possible address but none is connected.
|
|
break
|
|
}
|
|
}
|
|
}
|
|
// The newly added addr got removed by Down() again.
|
|
if b.waitCh == nil {
|
|
ch = make(chan struct{})
|
|
b.waitCh = ch
|
|
} else {
|
|
ch = b.waitCh
|
|
}
|
|
b.mu.Unlock()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (b *balancer) Notify() <-chan []grpc.Address {
|
|
return b.addrCh
|
|
}
|
|
|
|
func (b *balancer) Close() error {
|
|
b.mu.Lock()
|
|
defer b.mu.Unlock()
|
|
b.done = true
|
|
if b.expTimer != nil {
|
|
b.expTimer.Stop()
|
|
}
|
|
if b.waitCh != nil {
|
|
close(b.waitCh)
|
|
}
|
|
if b.addrCh != nil {
|
|
close(b.addrCh)
|
|
}
|
|
if b.w != nil {
|
|
b.w.Close()
|
|
}
|
|
return nil
|
|
}
|