需要安装etcd服务,docker安装见

1、服务注册文件

package etcd

import (
    "context"
    "github.com/astaxie/beego/logs"
    "log"
    "time"
    "go.etcd.io/etcd/clientv3"
)

//ServiceRegister 创建租约注册服务
type ServiceRegister struct {
    cli     *clientv3.Client //etcd client
    leaseID clientv3.LeaseID //租约ID
    //租约keepalieve相应chan
    keepAliveChan <-chan *clientv3.LeaseKeepAliveResponse
    key           string //key
    val           string //value
}

//NewServiceRegister 新建注册服务
func NewServiceRegister(endpoints []string, key, val string, lease int64) (*ServiceRegister, error) {
    cli, err := clientv3.New(clientv3.Config{
        Endpoints:   endpoints,
        DialTimeout: 5 * time.Second,
    })
    if err != nil {
        log.Fatal(err)
    }

    ser := &ServiceRegister{
        cli: cli,
        key: key,
        val: val,
    }

    //申请租约设置时间keepalive
    if err := ser.putKeyWithLease(lease); err != nil {
        return nil, err
    }

    return ser, nil
}

//设置租约
func (s *ServiceRegister) putKeyWithLease(lease int64) error {
    //设置租约时间
    resp, err := s.cli.Grant(context.Background(), lease)
    if err != nil {
        return err
    }
    //注册服务并绑定租约
    _, err = s.cli.Put(context.Background(), s.key, s.val, clientv3.WithLease(resp.ID))
    if err != nil {
        return err
    }
    //设置续租 定期发送需求请求
    leaseRespChan, err := s.cli.KeepAlive(context.Background(), resp.ID)

    if err != nil {
        return err
    }
    s.leaseID = resp.ID
    log.Println(s.leaseID)
    s.keepAliveChan = leaseRespChan
    logs.Info("Put key:%s  val:%s  success!", s.key, s.val)
    return nil
}

//ListenLeaseRespChan 监听 续租情况
func (s *ServiceRegister) ListenLeaseRespChan() {
    for leaseKeepResp := range s.keepAliveChan {
        logs.Info("续约成功", leaseKeepResp)
    }
    logs.Info("关闭续租")
}

// Close 注销服务
func (s *ServiceRegister) Close() error {
    //撤销租约
    if _, err := s.cli.Revoke(context.Background(), s.leaseID); err != nil {
        return err
    }
    logs.Info("撤销租约")
    return s.cli.Close()
}

2、服务发现文件

package etcd

import (
    "context"
    "log"
    "sync"
    "time"

    "github.com/coreos/etcd/mvcc/mvccpb"
    "go.etcd.io/etcd/clientv3"
)

//ServiceDiscovery 服务发现
type ServiceDiscovery struct {
    cli        *clientv3.Client  //etcd client
    serverList map[string]string //服务列表
    lock       sync.Mutex
}

//NewServiceDiscovery  新建发现服务
func NewServiceDiscovery(endpoints []string) *ServiceDiscovery {
    cli, err := clientv3.New(clientv3.Config{
        Endpoints:   endpoints,
        DialTimeout: 5 * time.Second,
    })
    if err != nil {
        log.Fatal(err)
    }

    return &ServiceDiscovery{
        cli:        cli,
        serverList: make(map[string]string),
    }
}

//WatchService 初始化服务列表和监视
func (s *ServiceDiscovery) WatchService(prefix string) error {
    //根据前缀获取现有的key
    resp, err := s.cli.Get(context.Background(), prefix, clientv3.WithPrefix())
    if err != nil {
        return err
    }

    for _, ev := range resp.Kvs {
        s.SetServiceList(string(ev.Key), string(ev.Value))
    }

    //监视前缀,修改变更的server
    go s.watcher(prefix)
    return nil
}

//watcher 监听前缀
func (s *ServiceDiscovery) watcher(prefix string) {
    rch := s.cli.Watch(context.Background(), prefix, clientv3.WithPrefix())
    log.Printf("watching prefix:%s now...", prefix)
    for wresp := range rch {
        for _, ev := range wresp.Events {
            switch ev.Type {
            case mvccpb.PUT: //修改或者新增
                s.SetServiceList(string(ev.Kv.Key), string(ev.Kv.Value))
            case mvccpb.DELETE: //删除
                s.DelServiceList(string(ev.Kv.Key))
            }
        }
    }
}

//SetServiceList 新增服务地址
func (s *ServiceDiscovery) SetServiceList(key, val string) {
    s.lock.Lock()
    defer s.lock.Unlock()
    s.serverList[key] = string(val)
    log.Println("put key :", key, "val:", val)
}

//DelServiceList 删除服务地址
func (s *ServiceDiscovery) DelServiceList(key string) {
    s.lock.Lock()
    defer s.lock.Unlock()
    delete(s.serverList, key)
    log.Println("del key:", key)
}

//GetServices 获取服务地址
func (s *ServiceDiscovery) GetServices() []string {
    s.lock.Lock()
    defer s.lock.Unlock()
    addrs := make([]string, 0)

    for _, v := range s.serverList {
        addrs = append(addrs, v)
    }
    return addrs
}

//Close 关闭服务
func (s *ServiceDiscovery) Close() error {
    return s.cli.Close()
}

3、使用服务注册文件(核心代码)

//实现gRPC Server
s := grpc.NewServer()

pb.RegisterActivityServer(s, ActivityServer) //内部调用了s.RegisterServer()

var etc_ip = beego.AppConfig.String("etcd.ips")

var endpoints = []string{etc_ip}
ser, err := etcd.NewServiceRegister(endpoints, "/api/node1", Address, 5)
if err != nil {
    fmt.Println(err,"获取grpc地址失败")

}


go ser.ListenLeaseRespChan()


if err := s.Serve(listen); err != nil {
    fmt.Println(err)
}


select {

}

4、客户端调用代码

var etc_ip = beego.AppConfig.String("etcd.ips")

    var endpoints = []string{etc_ip}

    fmt.Println(endpoints)

    ser := etcd.NewServiceDiscovery(endpoints)

    ser.WatchService("/api/node")

    var address = "127.0.0.1:5200"

    fmt.Print("服务地址:",ser.GetServices(),"-----",len(ser.GetServices()))

    if len(ser.GetServices())!=0{
        address = ser.GetServices()[0]
    }else{
        logs.Error("没有grpc服务地址")
    }

    conn, err := grpc.Dial(address, grpc.WithInsecure())
    if err != nil {
        logs.Error("did not connect: %v", err)
    }



    return conn
最后修改:2020 年 08 月 13 日 10 : 57 PM
如果觉得我的文章对你有用,请随意赞赏