午夜视频在线网站,日韩视频精品在线,中文字幕精品一区二区三区在线,在线播放精品,1024你懂我懂的旧版人,欧美日韩一级黄色片,一区二区三区在线观看视频

分享

gRPC服務(wù)發(fā)現(xiàn)&負載均衡

 WindySky 2017-08-10

構(gòu)建高可用、高性能的通信服務(wù),通常采用服務(wù)注冊與發(fā)現(xiàn)、負載均衡和容錯處理等機制實現(xiàn)。根據(jù)負載均衡實現(xiàn)所在的位置不同,通??煞譃橐韵氯N解決方案:

1、集中式LB(Proxy Model)

在服務(wù)消費者和服務(wù)提供者之間有一個獨立的LB,通常是專門的硬件設(shè)備如 F5,或者基于軟件如 LVS,HAproxy等實現(xiàn)。LB上有所有服務(wù)的地址映射表,通常由運維配置注冊,當(dāng)服務(wù)消費方調(diào)用某個目標服務(wù)時,它向LB發(fā)起請求,由LB以某種策略,比如輪詢(Round-Robin)做負載均衡后將請求轉(zhuǎn)發(fā)到目標服務(wù)。LB一般具備健康檢查能力,能自動摘除不健康的服務(wù)實例。 該方案主要問題:

  1. 單點問題,所有服務(wù)調(diào)用流量都經(jīng)過LB,當(dāng)服務(wù)數(shù)量和調(diào)用量大的時候,LB容易成為瓶頸,且一旦LB發(fā)生故障影響整個系統(tǒng);

  2. 服務(wù)消費方、提供方之間增加了一級,有一定性能開銷。

2、進程內(nèi)LB(Balancing-aware Client)

針對第一個方案的不足,此方案將LB的功能集成到服務(wù)消費方進程里,也被稱為軟負載或者客戶端負載方案。服務(wù)提供方啟動時,首先將服務(wù)地址注冊到服務(wù)注冊表,同時定期報心跳到服務(wù)注冊表以表明服務(wù)的存活狀態(tài),相當(dāng)于健康檢查,服務(wù)消費方要訪問某個服務(wù)時,它通過內(nèi)置的LB組件向服務(wù)注冊表查詢,同時緩存并定期刷新目標服務(wù)地址列表,然后以某種負載均衡策略選擇一個目標服務(wù)地址,最后向目標服務(wù)發(fā)起請求。LB和服務(wù)發(fā)現(xiàn)能力被分散到每一個服務(wù)消費者的進程內(nèi)部,同時服務(wù)消費方和服務(wù)提供方之間是直接調(diào)用,沒有額外開銷,性能比較好。該方案主要問題:

  1. 開發(fā)成本,該方案將服務(wù)調(diào)用方集成到客戶端的進程里頭,如果有多種不同的語言棧,就要配合開發(fā)多種不同的客戶端,有一定的研發(fā)和維護成本;

  2. 另外生產(chǎn)環(huán)境中,后續(xù)如果要對客戶庫進行升級,勢必要求服務(wù)調(diào)用方修改代碼并重新發(fā)布,升級較復(fù)雜。

3、獨立 LB 進程(External Load Balancing Service)

該方案是針對第二種方案的不足而提出的一種折中方案,原理和第二種方案基本類似。
不同之處是將LB和服務(wù)發(fā)現(xiàn)功能從進程內(nèi)移出來,變成主機上的一個獨立進程。主機上的一個或者多個服務(wù)要訪問目標服務(wù)時,他們都通過同一主機上的獨立LB進程做服務(wù)發(fā)現(xiàn)和負載均衡。該方案也是一種分布式方案沒有單點問題,一個LB進程掛了只影響該主機上的服務(wù)調(diào)用方,服務(wù)調(diào)用方和LB之間是進程內(nèi)調(diào)用性能好,同時該方案還簡化了服務(wù)調(diào)用方,不需要為不同語言開發(fā)客戶庫,LB的升級不需要服務(wù)調(diào)用方改代碼。 
該方案主要問題:部署較復(fù)雜,環(huán)節(jié)多,出錯調(diào)試排查問題不方便。

gRPC服務(wù)發(fā)現(xiàn)及負載均衡實現(xiàn)

gRPC開源組件官方并未直接提供服務(wù)注冊與發(fā)現(xiàn)的功能實現(xiàn),但其設(shè)計文檔已提供實現(xiàn)的思路,并在不同語言的gRPC代碼API中已提供了命名解析和負載均衡接口供擴展。

其基本實現(xiàn)原理:

  1. 服務(wù)啟動后gRPC客戶端向命名服務(wù)器發(fā)出名稱解析請求,名稱將解析為一個或多個IP地址,每個IP地址標示它是服務(wù)器地址還是負載均衡器地址,以及標示要使用那個客戶端負載均衡策略或服務(wù)配置。

  2. 客戶端實例化負載均衡策略,如果解析返回的地址是負載均衡器地址,則客戶端將使用grpclb策略,否則客戶端使用服務(wù)配置請求的負載均衡策略。

  3. 負載均衡策略為每個服務(wù)器地址創(chuàng)建一個子通道(channel)。

  4. 當(dāng)有rpc請求時,負載均衡策略決定那個子通道即grpc服務(wù)器將接收請求,當(dāng)可用服務(wù)器為空時客戶端的請求將被阻塞。

根據(jù)gRPC官方提供的設(shè)計思路,基于進程內(nèi)LB方案(即第2個案,阿里開源的服務(wù)框架 Dubbo 也是采用類似機制),結(jié)合分布式一致的組件(如Zookeeper、Consul、Etcd),可找到gRPC服務(wù)發(fā)現(xiàn)和負載均衡的可行解決方案。接下來以GO語言為例,簡單介紹下基于Etcd3的關(guān)鍵代碼實現(xiàn):

1)命名解析實現(xiàn):resolver.go

package etcdv3

import (
    "errors"
    "fmt"
    "strings"

    etcd3 "github.com/coreos/etcd/clientv3"
    "google.golang.org/grpc/naming"
)

// resolver is the implementaion of grpc.naming.Resolver
type resolver struct {
    serviceName string // service name to resolve
}

// NewResolver return resolver with service name
func NewResolver(serviceName string) *resolver {
    return &resolver{serviceName: serviceName}
}

// Resolve to resolve the service from etcd, target is the dial address of etcd
// target example: "http://127.0.0.1:2379,http://127.0.0.1:12379,http://127.0.0.1:22379"
func (re *resolver) Resolve(target string) (naming.Watcher, error) {
    if re.serviceName == "" {
        return nil, errors.New("grpclb: no service name provided")
    }

    // generate etcd client
    client, err := etcd3.New(etcd3.Config{
        Endpoints: strings.Split(target, ","),
    })
    if err != nil {
        return nil, fmt.Errorf("grpclb: creat etcd3 client failed: %s", err.Error())
    }

    // Return watcher
    return &watcher{re: re, client: *client}, nil
}

2)服務(wù)發(fā)現(xiàn)實現(xiàn):watcher.go

package etcdv3

import (
    "fmt"
    etcd3 "github.com/coreos/etcd/clientv3"
    "golang.org/x/net/context"
    "google.golang.org/grpc/naming"
    "github.com/coreos/etcd/mvcc/mvccpb"
)

// watcher is the implementaion of grpc.naming.Watcher
type watcher struct {
    re            *resolver // re: Etcd Resolver
    client        etcd3.Client
    isInitialized bool
}

// Close do nothing
func (w *watcher) Close() {
}

// Next to return the updates
func (w *watcher) Next() ([]*naming.Update, error) {
    // prefix is the etcd prefix/value to watch
    prefix := fmt.Sprintf("/%s/%s/", Prefix, w.re.serviceName)

    // check if is initialized
    if !w.isInitialized {
        // query addresses from etcd
        resp, err := w.client.Get(context.Background(), prefix, etcd3.WithPrefix())
        w.isInitialized = true
        if err == nil {
            addrs := extractAddrs(resp)
            //if not empty, return the updates or watcher new dir
            if l := len(addrs); l != 0 {
                updates := make([]*naming.Update, l)
                for i := range addrs {
                    updates[i] = &naming.Update{Op: naming.Add, Addr: addrs[i]}
                }
                return updates, nil
            }
        }
    }

    // generate etcd Watcher
    rch := w.client.Watch(context.Background(), prefix, etcd3.WithPrefix())
    for wresp := range rch {
        for _, ev := range wresp.Events {
            switch ev.Type {
            case mvccpb.PUT:
                return []*naming.Update{{Op: naming.Add, Addr: string(ev.Kv.Value)}}, nil
            case mvccpb.DELETE:
                return []*naming.Update{{Op: naming.Delete, Addr: string(ev.Kv.Value)}}, nil
            }
        }
    }
    return nil, nil
}

func extractAddrs(resp *etcd3.GetResponse) []string {
    addrs := []string{}

    if resp == nil || resp.Kvs == nil {
        return addrs
    }

    for i := range resp.Kvs {
        if v := resp.Kvs[i].Value; v != nil {
            addrs = append(addrs, string(v))
        }
    }

    return addrs
}

3)服務(wù)注冊實現(xiàn):register.go

package etcdv3

import (
    "fmt"
    "log"
    "strings"
    "time"

    etcd3 "github.com/coreos/etcd/clientv3"
    "golang.org/x/net/context"
    "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
)

// Prefix should start and end with no slash
var Prefix = "etcd3_naming"
var client etcd3.Client
var serviceKey string

var stopSignal = make(chan bool, 1)

// Register
func Register(name string, host string, port int, target string, interval time.Duration, ttl int) error {
    serviceValue := fmt.Sprintf("%s:%d", host, port)
    serviceKey = fmt.Sprintf("/%s/%s/%s", Prefix, name, serviceValue)

    // get endpoints for register dial address
    var err error
    client, err := etcd3.New(etcd3.Config{
        Endpoints: strings.Split(target, ","),
    })
    if err != nil {
        return fmt.Errorf("grpclb: create etcd3 client failed: %v", err)
    }

    go func() {
        // invoke self-register with ticker
        ticker := time.NewTicker(interval)
        for {
            // minimum lease TTL is ttl-second
            resp, _ := client.Grant(context.TODO(), int64(ttl))
            // should get first, if not exist, set it
            _, err := client.Get(context.Background(), serviceKey)
            if err != nil {
                if err == rpctypes.ErrKeyNotFound {
                    if _, err := client.Put(context.TODO(), serviceKey, serviceValue, etcd3.WithLease(resp.ID)); err != nil {
                        log.Printf("grpclb: set service '%s' with ttl to etcd3 failed: %s", name, err.Error())
                    }
                } else {
                    log.Printf("grpclb: service '%s' connect to etcd3 failed: %s", name, err.Error())
                }
            } else {
                // refresh set to true for not notifying the watcher
                if _, err := client.Put(context.Background(), serviceKey, serviceValue, etcd3.WithLease(resp.ID)); err != nil {
                    log.Printf("grpclb: refresh service '%s' with ttl to etcd3 failed: %s", name, err.Error())
                }
            }
            select {
            case <-stopSignal:
                return
            case <-ticker.C:
            }
        }
    }()

    return nil
}

// UnRegister delete registered service from etcd
func UnRegister() error {
    stopSignal <- true
    stopSignal = make(chan bool, 1) // just a hack to avoid multi UnRegister deadlock
    var err error;
    if _, err := client.Delete(context.Background(), serviceKey); err != nil {
        log.Printf("grpclb: deregister '%s' failed: %s", serviceKey, err.Error())
    } else {
        log.Printf("grpclb: deregister '%s' ok.", serviceKey)
    }
    return err
}

4)接口描述文件:helloworld.proto

syntax = "proto3";

option java_multiple_files = true;
option java_package = "com.midea.jr.test.grpc";
option java_outer_classname = "HelloWorldProto";
option objc_class_prefix = "HLW";

package helloworld;

// The greeting service definition.
service Greeter {
    //   Sends a greeting
    rpc SayHello (HelloRequest) returns (HelloReply) {
    }
}

// The request message containing the user's name.
message HelloRequest {
    string name = 1;
}

// The response message containing the greetings
message HelloReply {
    string message = 1;
}

5)實現(xiàn)服務(wù)端接口:helloworldserver.go

package main

import (
    "flag"
    "fmt"
    "log"
    "net"
    "os"
    "os/signal"
    "syscall"
    "time"

    "golang.org/x/net/context"
    "google.golang.org/grpc"

    grpclb "com.midea/jr/grpclb/naming/etcd/v3"
    "com.midea/jr/grpclb/example/pb"
)

var (
    serv = flag.String("service", "hello_service", "service name")
    port = flag.Int("port", 50001, "listening port")
    reg = flag.String("reg", "http://127.0.0.1:2379", "register etcd address")
)

func main() {
    flag.Parse()

    lis, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", *port))
    if err != nil {
        panic(err)
    }

    err = grpclb.Register(*serv, "127.0.0.1", *port, *reg, time.Second*10, 15)
    if err != nil {
        panic(err)
    }

    ch := make(chan os.Signal, 1)
    signal.Notify(ch, syscall.SIGTERM, syscall.SIGINT, syscall.SIGKILL, syscall.SIGHUP, syscall.SIGQUIT)
    go func() {
        s := <-ch
        log.Printf("receive signal '%v'", s)
        grpclb.UnRegister()
        os.Exit(1)
    }()

    log.Printf("starting hello service at %d", *port)
    s := grpc.NewServer()
    pb.RegisterGreeterServer(s, &server{})
    s.Serve(lis)
}

// server is used to implement helloworld.GreeterServer.
type server struct{}

// SayHello implements helloworld.GreeterServer
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
    fmt.Printf("%v: Receive is %s\n", time.Now(), in.Name)
    return &pb.HelloReply{Message: "Hello " + in.Name}, nil
}

6)實現(xiàn)客戶端接口:helloworldclient.go

package main

import (
    "flag"
    "fmt"
    "time"

    grpclb "com.midea/jr/grpclb/naming/etcd/v3"
    "com.midea/jr/grpclb/example/pb"
    "golang.org/x/net/context"
    "google.golang.org/grpc"
    "strconv"
)

var (
    serv = flag.String("service", "hello_service", "service name")
    reg = flag.String("reg", "http://127.0.0.1:2379", "register etcd address")
)

func main() {
    flag.Parse()
    r := grpclb.NewResolver(*serv)
    b := grpc.RoundRobin(r)

    ctx, _ := context.WithTimeout(context.Background(), 10*time.Second)
    conn, err := grpc.DialContext(ctx, *reg, grpc.WithInsecure(), grpc.WithBalancer(b))
    if err != nil {
        panic(err)
    }

    ticker := time.NewTicker(1 * time.Second)
    for t := range ticker.C {
        client := pb.NewGreeterClient(conn)
        resp, err := client.SayHello(context.Background(), &pb.HelloRequest{Name: "world " + strconv.Itoa(t.Second())})
        if err == nil {
            fmt.Printf("%v: Reply is %s\n", t, resp.Message)
        }
    }
}

7)運行測試

  1. 運行3個服務(wù)端S1、S2、S3,1個客戶端C,觀察各服務(wù)端接收的請求數(shù)是否相等?

  2. 關(guān)閉1個服務(wù)端S1,觀察請求是否會轉(zhuǎn)移到另外2個服務(wù)端?

  3. 重新啟動S1服務(wù)端,觀察另外2個服務(wù)端請求是否會平均分配到S1?

  4. 關(guān)閉Etcd3服務(wù)器,觀察客戶端與服務(wù)端通信是否正常? 
    關(guān)閉通信仍然正常,但新服務(wù)端不會注冊進來,服務(wù)端掉線了也無法摘除掉。

  5. 重新啟動Etcd3服務(wù)器,服務(wù)端上下線可自動恢復(fù)正常。

  6. 關(guān)閉所有服務(wù)端,客戶端請求將被阻塞。

參考:

http://www.grpc.io/docs/
https://github.com/grpc/grpc/blob/master/doc/load-balancing.md

    本站是提供個人知識管理的網(wǎng)絡(luò)存儲空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點。請注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購買等信息,謹防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請點擊一鍵舉報。
    轉(zhuǎn)藏 分享 獻花(0

    0條評論

    發(fā)表

    請遵守用戶 評論公約

    類似文章 更多