244 lines
7.0 KiB
Go
244 lines
7.0 KiB
Go
|
/*
|
||
|
*
|
||
|
* Copyright 2014, Google Inc.
|
||
|
* All rights reserved.
|
||
|
*
|
||
|
* Redistribution and use in source and binary forms, with or without
|
||
|
* modification, are permitted provided that the following conditions are
|
||
|
* met:
|
||
|
*
|
||
|
* * Redistributions of source code must retain the above copyright
|
||
|
* notice, this list of conditions and the following disclaimer.
|
||
|
* * Redistributions in binary form must reproduce the above
|
||
|
* copyright notice, this list of conditions and the following disclaimer
|
||
|
* in the documentation and/or other materials provided with the
|
||
|
* distribution.
|
||
|
* * Neither the name of Google Inc. nor the names of its
|
||
|
* contributors may be used to endorse or promote products derived from
|
||
|
* this software without specific prior written permission.
|
||
|
*
|
||
|
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
|
||
|
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
|
||
|
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
|
||
|
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
|
||
|
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
||
|
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
||
|
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
|
||
|
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
|
||
|
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
||
|
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
||
|
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||
|
*
|
||
|
*/
|
||
|
|
||
|
package grpc
|
||
|
|
||
|
import (
|
||
|
"container/list"
|
||
|
"fmt"
|
||
|
"sync"
|
||
|
|
||
|
"golang.org/x/net/context"
|
||
|
"google.golang.org/grpc/grpclog"
|
||
|
"google.golang.org/grpc/naming"
|
||
|
"google.golang.org/grpc/transport"
|
||
|
)
|
||
|
|
||
|
// Picker picks a Conn for RPC requests.
|
||
|
// This is EXPERIMENTAL and please do not implement your own Picker for now.
|
||
|
type Picker interface {
|
||
|
// Init does initial processing for the Picker, e.g., initiate some connections.
|
||
|
Init(cc *ClientConn) error
|
||
|
// Pick blocks until either a transport.ClientTransport is ready for the upcoming RPC
|
||
|
// or some error happens.
|
||
|
Pick(ctx context.Context) (transport.ClientTransport, error)
|
||
|
// PickAddr picks a peer address for connecting. This will be called repeated for
|
||
|
// connecting/reconnecting.
|
||
|
PickAddr() (string, error)
|
||
|
// State returns the connectivity state of the underlying connections.
|
||
|
State() (ConnectivityState, error)
|
||
|
// WaitForStateChange blocks until the state changes to something other than
|
||
|
// the sourceState. It returns the new state or error.
|
||
|
WaitForStateChange(ctx context.Context, sourceState ConnectivityState) (ConnectivityState, error)
|
||
|
// Close closes all the Conn's owned by this Picker.
|
||
|
Close() error
|
||
|
}
|
||
|
|
||
|
// unicastPicker is the default Picker which is used when there is no custom Picker
|
||
|
// specified by users. It always picks the same Conn.
|
||
|
type unicastPicker struct {
|
||
|
target string
|
||
|
conn *Conn
|
||
|
}
|
||
|
|
||
|
func (p *unicastPicker) Init(cc *ClientConn) error {
|
||
|
c, err := NewConn(cc)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
p.conn = c
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (p *unicastPicker) Pick(ctx context.Context) (transport.ClientTransport, error) {
|
||
|
return p.conn.Wait(ctx)
|
||
|
}
|
||
|
|
||
|
func (p *unicastPicker) PickAddr() (string, error) {
|
||
|
return p.target, nil
|
||
|
}
|
||
|
|
||
|
func (p *unicastPicker) State() (ConnectivityState, error) {
|
||
|
return p.conn.State(), nil
|
||
|
}
|
||
|
|
||
|
func (p *unicastPicker) WaitForStateChange(ctx context.Context, sourceState ConnectivityState) (ConnectivityState, error) {
|
||
|
return p.conn.WaitForStateChange(ctx, sourceState)
|
||
|
}
|
||
|
|
||
|
func (p *unicastPicker) Close() error {
|
||
|
if p.conn != nil {
|
||
|
return p.conn.Close()
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// unicastNamingPicker picks an address from a name resolver to set up the connection.
|
||
|
type unicastNamingPicker struct {
|
||
|
cc *ClientConn
|
||
|
resolver naming.Resolver
|
||
|
watcher naming.Watcher
|
||
|
mu sync.Mutex
|
||
|
// The list of the addresses are obtained from watcher.
|
||
|
addrs *list.List
|
||
|
// It tracks the current picked addr by PickAddr(). The next PickAddr may
|
||
|
// push it forward on addrs.
|
||
|
pickedAddr *list.Element
|
||
|
conn *Conn
|
||
|
}
|
||
|
|
||
|
// NewUnicastNamingPicker creates a Picker to pick addresses from a name resolver
|
||
|
// to connect.
|
||
|
func NewUnicastNamingPicker(r naming.Resolver) Picker {
|
||
|
return &unicastNamingPicker{
|
||
|
resolver: r,
|
||
|
addrs: list.New(),
|
||
|
}
|
||
|
}
|
||
|
|
||
|
type addrInfo struct {
|
||
|
addr string
|
||
|
// Set to true if this addrInfo needs to be deleted in the next PickAddrr() call.
|
||
|
deleting bool
|
||
|
}
|
||
|
|
||
|
// processUpdates calls Watcher.Next() once and processes the obtained updates.
|
||
|
func (p *unicastNamingPicker) processUpdates() error {
|
||
|
updates, err := p.watcher.Next()
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
for _, update := range updates {
|
||
|
switch update.Op {
|
||
|
case naming.Add:
|
||
|
p.mu.Lock()
|
||
|
p.addrs.PushBack(&addrInfo{
|
||
|
addr: update.Addr,
|
||
|
})
|
||
|
p.mu.Unlock()
|
||
|
// Initial connection setup
|
||
|
if p.conn == nil {
|
||
|
conn, err := NewConn(p.cc)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
p.conn = conn
|
||
|
}
|
||
|
case naming.Delete:
|
||
|
p.mu.Lock()
|
||
|
for e := p.addrs.Front(); e != nil; e = e.Next() {
|
||
|
if update.Addr == e.Value.(*addrInfo).addr {
|
||
|
if e == p.pickedAddr {
|
||
|
// Do not remove the element now if it is the current picked
|
||
|
// one. We leave the deletion to the next PickAddr() call.
|
||
|
e.Value.(*addrInfo).deleting = true
|
||
|
// Notify Conn to close it. All the live RPCs on this connection
|
||
|
// will be aborted.
|
||
|
p.conn.NotifyReset()
|
||
|
} else {
|
||
|
p.addrs.Remove(e)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
p.mu.Unlock()
|
||
|
default:
|
||
|
grpclog.Println("Unknown update.Op ", update.Op)
|
||
|
}
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// monitor runs in a standalone goroutine to keep watching name resolution updates until the watcher
|
||
|
// is closed.
|
||
|
func (p *unicastNamingPicker) monitor() {
|
||
|
for {
|
||
|
if err := p.processUpdates(); err != nil {
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (p *unicastNamingPicker) Init(cc *ClientConn) error {
|
||
|
w, err := p.resolver.Resolve(cc.target)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
p.watcher = w
|
||
|
p.cc = cc
|
||
|
// Get the initial name resolution.
|
||
|
if err := p.processUpdates(); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
go p.monitor()
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (p *unicastNamingPicker) Pick(ctx context.Context) (transport.ClientTransport, error) {
|
||
|
return p.conn.Wait(ctx)
|
||
|
}
|
||
|
|
||
|
func (p *unicastNamingPicker) PickAddr() (string, error) {
|
||
|
p.mu.Lock()
|
||
|
defer p.mu.Unlock()
|
||
|
if p.pickedAddr == nil {
|
||
|
p.pickedAddr = p.addrs.Front()
|
||
|
} else {
|
||
|
pa := p.pickedAddr
|
||
|
p.pickedAddr = pa.Next()
|
||
|
if pa.Value.(*addrInfo).deleting {
|
||
|
p.addrs.Remove(pa)
|
||
|
}
|
||
|
if p.pickedAddr == nil {
|
||
|
p.pickedAddr = p.addrs.Front()
|
||
|
}
|
||
|
}
|
||
|
if p.pickedAddr == nil {
|
||
|
return "", fmt.Errorf("there is no address available to pick")
|
||
|
}
|
||
|
return p.pickedAddr.Value.(*addrInfo).addr, nil
|
||
|
}
|
||
|
|
||
|
func (p *unicastNamingPicker) State() (ConnectivityState, error) {
|
||
|
return 0, fmt.Errorf("State() is not supported for unicastNamingPicker")
|
||
|
}
|
||
|
|
||
|
func (p *unicastNamingPicker) WaitForStateChange(ctx context.Context, sourceState ConnectivityState) (ConnectivityState, error) {
|
||
|
return 0, fmt.Errorf("WaitForStateChange is not supported for unicastNamingPciker")
|
||
|
}
|
||
|
|
||
|
func (p *unicastNamingPicker) Close() error {
|
||
|
p.watcher.Close()
|
||
|
p.conn.Close()
|
||
|
return nil
|
||
|
}
|