刘沙河 刘沙河
首页
  • Go语言基础

    • 数据类型
    • 反射
    • Go指针
  • Go语言进阶

    • go泛型
    • go条件编译
    • cgo教程
    • Go协程调度原理及GPM模型
    • Go内存管理
    • Go垃圾回收机制
    • Go语言内存对齐
  • Go语言实现原理

    • channel 实现原理
    • slice 实现原理
    • map 实现原理
    • sync.Mutex 实现原理
    • 乐观锁CAS 实现原理
    • singlefight 实现原理
  • gin框架

    • gin中间件原理
    • gin路由原理
  • gorm

    • GORM介绍和使用
    • GORM_CURD操作指南
  • go测试

    • benchmark基准测试
    • pprof 性能分析
  • python进阶

    • Numpy&Pandas
    • celery分布式任务队列
  • Django

    • Django 常见命令
    • middleware中间件
    • Django缓存系统
    • Django信号系统
    • Django REST Framework
  • Flask

    • Flask基础知识总结
    • Flask-SQLAlchemy
  • 爬虫

    • aiohttp
    • scrapy框架
  • Mysql

    • Mysql存储引擎和索引
    • MySQL主从复制
    • Mysql读写分离
    • 数据库分库分表
    • Mysql锁
    • Mysql事务和MVCC原理
    • 分库分表带来的读扩散问题
  • Redis

    • redis基础和数据类型
    • redis主从架构
    • redis哨兵架构
    • redis集群模式
    • 如何保证缓存和数据库双写一致
    • redis底层数据结构
    • redis分布式锁
  • Elasticsearch

    • es基本概念
    • es基础语法
    • es倒排索引
  • etcd

    • Go操作etcd
    • Raft原理
    • etcd分布式锁
  • kafka

    • 消息队列MQ总结
    • kafka 概述及原理
    • kafka 消费问题记录
    • 零拷贝技术
    • kafka分区规范
  • RabbitMQ

    • rabbitMQ基础
    • Go操作rabbitmq
  • RocketMQ

    • 可靠消息队列 rocketMQ
  • Http&Https

    • http&https
    • TCP和UDP
    • Ping 原理
  • RPC

    • RPC初识
    • grpc初识和实现
  • gRPC

    • grpc 初识
    • grpc 上下文 metadata
    • grpc 健康检查
    • grpc keepalive
    • grpc 命名解析
    • grpc 中间件&拦截器
    • grpc 负载均衡
    • grpc 身份认证
    • grpc 超时重试
    • grpc 链路追踪
    • grpc-gw将gRPC转RESTfu api
    • grpc-gw自定义选项
  • protobuf

    • protobuf 进阶
    • protobuf 编码原理
  • Docker

    • Docker基础
    • Docker常用命令
    • Dockerfile
    • Docker-Compose
    • Docker多阶段构建
    • Docker Config 教程
    • Docker Swarm 教程
    • Docker Stack 教程
    • Docker Buildx 教程
  • k8s

    • k8s 基础概念
    • k8s 集群架构
    • k8s 工作负载
    • Pod 网络
    • Service 网络
    • 外部接入网络
    • 一张图搞懂k8s各种pod
    • k8s 存储抽象
    • mac快速启动k8s
    • 自制申威架构k8s-reloader
  • go-kit

    • go-kit初识
    • go-kit启动http服务
    • go-kit集成gin启动服务
    • go-kit集成grpc和protobuf
    • go-kit中间件
    • go-kit服务注册发现与负载均衡
    • go-kit限流和熔断
    • go-kit链路追踪
    • go-kit集成Prometheus
  • 设计模式

    • 初识设计模式
    • 创建型模式
    • 结构型模式
    • 行为模式
  • 数据结构

    • 时间轮
    • 堆、双向链表、环形队列
    • 队列:优先队列
    • 队列:延迟队列
  • 算法

    • 递归算法
    • 枚举算法
    • 动态规划
    • 回溯算法
    • 分治算法
    • 贪心算法
    • LRU和LFU
    • 一致性哈希

花开半夏,半夏花开
首页
  • Go语言基础

    • 数据类型
    • 反射
    • Go指针
  • Go语言进阶

    • go泛型
    • go条件编译
    • cgo教程
    • Go协程调度原理及GPM模型
    • Go内存管理
    • Go垃圾回收机制
    • Go语言内存对齐
  • Go语言实现原理

    • channel 实现原理
    • slice 实现原理
    • map 实现原理
    • sync.Mutex 实现原理
    • 乐观锁CAS 实现原理
    • singlefight 实现原理
  • gin框架

    • gin中间件原理
    • gin路由原理
  • gorm

    • GORM介绍和使用
    • GORM_CURD操作指南
  • go测试

    • benchmark基准测试
    • pprof 性能分析
  • python进阶

    • Numpy&Pandas
    • celery分布式任务队列
  • Django

    • Django 常见命令
    • middleware中间件
    • Django缓存系统
    • Django信号系统
    • Django REST Framework
  • Flask

    • Flask基础知识总结
    • Flask-SQLAlchemy
  • 爬虫

    • aiohttp
    • scrapy框架
  • Mysql

    • Mysql存储引擎和索引
    • MySQL主从复制
    • Mysql读写分离
    • 数据库分库分表
    • Mysql锁
    • Mysql事务和MVCC原理
    • 分库分表带来的读扩散问题
  • Redis

    • redis基础和数据类型
    • redis主从架构
    • redis哨兵架构
    • redis集群模式
    • 如何保证缓存和数据库双写一致
    • redis底层数据结构
    • redis分布式锁
  • Elasticsearch

    • es基本概念
    • es基础语法
    • es倒排索引
  • etcd

    • Go操作etcd
    • Raft原理
    • etcd分布式锁
  • kafka

    • 消息队列MQ总结
    • kafka 概述及原理
    • kafka 消费问题记录
    • 零拷贝技术
    • kafka分区规范
  • RabbitMQ

    • rabbitMQ基础
    • Go操作rabbitmq
  • RocketMQ

    • 可靠消息队列 rocketMQ
  • Http&Https

    • http&https
    • TCP和UDP
    • Ping 原理
  • RPC

    • RPC初识
    • grpc初识和实现
  • gRPC

    • grpc 初识
    • grpc 上下文 metadata
    • grpc 健康检查
    • grpc keepalive
    • grpc 命名解析
    • grpc 中间件&拦截器
    • grpc 负载均衡
    • grpc 身份认证
    • grpc 超时重试
    • grpc 链路追踪
    • grpc-gw将gRPC转RESTfu api
    • grpc-gw自定义选项
  • protobuf

    • protobuf 进阶
    • protobuf 编码原理
  • Docker

    • Docker基础
    • Docker常用命令
    • Dockerfile
    • Docker-Compose
    • Docker多阶段构建
    • Docker Config 教程
    • Docker Swarm 教程
    • Docker Stack 教程
    • Docker Buildx 教程
  • k8s

    • k8s 基础概念
    • k8s 集群架构
    • k8s 工作负载
    • Pod 网络
    • Service 网络
    • 外部接入网络
    • 一张图搞懂k8s各种pod
    • k8s 存储抽象
    • mac快速启动k8s
    • 自制申威架构k8s-reloader
  • go-kit

    • go-kit初识
    • go-kit启动http服务
    • go-kit集成gin启动服务
    • go-kit集成grpc和protobuf
    • go-kit中间件
    • go-kit服务注册发现与负载均衡
    • go-kit限流和熔断
    • go-kit链路追踪
    • go-kit集成Prometheus
  • 设计模式

    • 初识设计模式
    • 创建型模式
    • 结构型模式
    • 行为模式
  • 数据结构

    • 时间轮
    • 堆、双向链表、环形队列
    • 队列:优先队列
    • 队列:延迟队列
  • 算法

    • 递归算法
    • 枚举算法
    • 动态规划
    • 回溯算法
    • 分治算法
    • 贪心算法
    • LRU和LFU
    • 一致性哈希
  • go-kit

    • 【go-kit教程】go-kit初识
    • 【go-kit教程】go-kit启动http服务
    • 【go-kit教程】go-kit集成gin启动服务
    • 【go-kit教程】go-kit集成grpc和protobuf
    • 【go-kit教程】go-kit中间件
    • 【go-kit教程】go-kit服务注册发现与负载均衡
      • 服务注册
        • 1. go-kit集成etcd示例
        • 2. go-kit集成consul示例
      • 服务发现与负载均衡
      • 完整代码
    • 【go-kit教程】go-kit限流和熔断
    • 【go-kit教程】go-kit链路追踪
    • 【go-kit教程】go-kit集成Prometheus
  • 微服务
  • go-kit
bigox
2023-02-23
1.6k
8.2m
目录
服务注册
1. go-kit集成etcd示例
2. go-kit集成consul示例
服务发现与负载均衡
完整代码

【go-kit教程】go-kit服务注册发现与负载均衡

# 服务注册

  • Go-Kit框架本身不提供服务注册中心的实现,但Go-Kit提供了集成第三方服务注册中心的支持

    1. Consul:Consul是一种开源的服务发现和配置工具。Go-Kit提供了consul包,用于与Consul集成,支持服务注册、服务发现、健康检查等功能。
    2. etcd:etcd是一个分布式键值存储系统,也可以用于服务注册和服务发现。Go-Kit提供了etcd包,用于与etcd集成,支持服务注册、服务发现、健康检查等功能。
    3. ZooKeeper:ZooKeeper是一个分布式协调服务,也可以用于服务注册和服务发现。Go-Kit提供了zookeeper包,用于与ZooKeeper集成,支持服务注册、服务发现、健康检查等功能。
    4. Eureka:Eureka是Netflix开源的服务发现框架,Go-Kit提供了eureka包,用于与Eureka集成,支持服务注册、服务发现、健康检查等功能。
    5. Nacos:Nacos是阿里巴巴开源的服务发现和配置管理平台,Go-Kit提供了nacos包,用于与Nacos集成,支持服务注册、服务发现、健康检查等功能。
  • Go-Kit还提供了sd包,该包提供了一个标准的接口,可以与不同的服务注册中心进行集成。因此,如果你使用的服务注册中心不在上述列表中,你也可以使用sd包进行集成。

# 1. go-kit集成etcd示例

  • register_service 代码

    package register_service
    
    import (
    	"context"
    	"fmt"
    	// "github.com/go-kit/kit/sd/etcd"
    	"time"
    
    	"github.com/go-kit/kit/log"
    	"github.com/go-kit/kit/sd"
    	"github.com/go-kit/kit/sd/etcdv3"
    	"os"
    )
    
    func RegisterETCD(etcdHost, etcdPort, svcHost, svcPort string, logger log.Logger) (registrar sd.Registrar) {
    
    	// etcd的地址
    	etcdServerAddr := []string{fmt.Sprintf("%s:%s", etcdHost, etcdPort)}
    
    	// 服务的名称和端口
    	serviceName := "my-service"
    
    	ttl := 5 * time.Second
    	options := etcdv3.ClientOptions{
    		DialTimeout:   ttl,
    		DialKeepAlive: ttl,
    	}
    
    	// 创建etcd客户端
    	etcdClient, err := etcdv3.NewClient(context.Background(), etcdServerAddr, options)
    	if err != nil {
    		fmt.Printf("Failed to create etcd client: %v", err)
    		os.Exit(1)
    	}
    
    	// 创建一个etcd实例
    	registrar = etcdv3.NewRegistrar(etcdClient, etcdv3.Service{
    		Key:   serviceName,
    		Value: fmt.Sprintf("%s:%s", svcHost, svcPort),
    		TTL:   etcdv3.NewTTLOption(10*time.Second, 5*time.Second),
    	}, logger)
    
    	return registrar
    }
    
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
  • main.go

    /**
     * @date: 2023/2/22
     * @desc:
     */
    
    package main
    
    import (
    	"github.com/go-kit/kit/log/level"
    	"github.com/go-kit/log"
    	"google.golang.org/grpc"
    	"my-kit-register-demo/endpoints"
    	"my-kit-register-demo/middlewares"
    	"my-kit-register-demo/proto"
    	"my-kit-register-demo/register_service"
    	"my-kit-register-demo/services"
    	"my-kit-register-demo/transports"
    	"net"
    	"os"
    	"os/signal"
    	"syscall"
    )
    
    func main() {
    	logger := log.NewJSONLogger(os.Stdout)
    
    	listener, err := net.Listen("tcp", "0.0.0.0:8080")
    	if err != nil {
    		level.Error(logger).Log("listen error: ", err.Error())
    		os.Exit(1)
    	}
    
    	pingService := services.NewPingService()
    	pingService = middlewares.LoggingPingServiceMiddleware(logger)(pingService)
    	pingEndPoints := endpoints.NewPingEndpoint(pingService)
    	pingGRPCServer := transports.NewPingGRPCServer(pingEndPoints, logger)
    
    	helloEndPoint := endpoints.NewHelloEndpoint(services.NewHelloService())
    	helloGRPCServer := transports.NewHelloGRPCServer(helloEndPoint, logger)
    
    	grpcServer := grpc.NewServer()
    
    	proto.RegisterHelloServer(grpcServer, helloGRPCServer)
    	proto.RegisterPingServer(grpcServer, pingGRPCServer)
    	// r := register_service.RegisterConsul("172.24.25.123", "8500", "172.24.25.123", "8080", logger)
    	r := register_service.RegisterETCD("172.24.25.123", "2379", "172.24.25.123", "8080", logger)
    
    	go func() {
    		r.Register()
    		err = grpcServer.Serve(listener)
    		if err != nil {
    			level.Error(logger).Log("grpc error:", err)
    		}
    
    	}()
    
    	// 处理终止信号
    	signalChan := make(chan os.Signal, 1)
    	signal.Notify(signalChan, syscall.SIGTERM, syscall.SIGINT)
    	<-signalChan
    
    	// 取消注册服务
    	r.Deregister()
    
    }
    
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66

# 2. go-kit集成consul示例

  • register_service 代码

    package register_service
    
    import (
    	"github.com/go-kit/kit/log"
    	"github.com/go-kit/kit/sd"
    	"github.com/go-kit/kit/sd/consul"
    	"github.com/hashicorp/consul/api"
    	"os"
    	"strconv"
    	"time"
    )
    
    func RegisterConsole(consulHost, consulPort, svcHost, svcPort string, logger log.Logger) (registrar sd.Registrar) {
    
    	// 创建Consul客户端连接
    	var client consul.Client
    	{
    		consulCfg := api.DefaultConfig()
    		consulCfg.Address = consulHost + ":" + consulPort
    		consulClient, err := api.NewClient(consulCfg)
    		if err != nil {
    			logger.Log("create consul client error:", err)
    			os.Exit(1)
    		}
    
    		client = consul.NewClient(consulClient)
    	}
    
    	// 设置Consul对 http 服务健康检查的参数
    	// check := api.AgentServiceCheck{
    	// 	HTTP:     "http://" + svcHost + ":" + svcPort + "/health",  // 此接口需要自己实现
    	// 	Interval: "10s",
    	// 	Timeout:  "1s",
    	// 	Notes:    "Consul check service health status.",
    	// }
        
        // 设置Consul对 GRPC 服务健康检查的参数 
    	check := api.AgentServiceCheck{
    		GRPC:     svcHost + ":" + svcPort  ,  
    		Interval: "5s",
    		Timeout:  "1s",
    		Notes:    "Consul check service health status.",
    	}
    
    	port, _ := strconv.Atoi(svcPort)
    
    	// 设置微服务想Consul的注册信息
    	reg := api.AgentServiceRegistration{
    		ID:      "my-kit-demo" + time.Now().String(),
    		Name:    "my-kit-demo",
    		Address: svcHost,
    		Port:    port,
    		Tags:    []string{"my-kit", "register-demo"},
    		// Check:   &check,
    	}
    
    	// 执行注册
    	registrar = consul.NewRegistrar(client, &reg, logger)
    	return
    }
    
    
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
  • man.go

    /**
     * @date: 2023/2/22
     * @desc:
     */
    
    package main
    
    import (
    	"github.com/go-kit/kit/log/level"
    	"github.com/go-kit/log"
    	"google.golang.org/grpc"
    	"google.golang.org/grpc/health"
    	"google.golang.org/grpc/health/grpc_health_v1"
    	"my-kit-register-demo/endpoints"
    	"my-kit-register-demo/middlewares"
    	"my-kit-register-demo/proto"
    	"my-kit-register-demo/register_service"
    	"my-kit-register-demo/services"
    	"my-kit-register-demo/transports"
    	"net"
    	"os"
    	"os/signal"
    	"syscall"
    )
    
    func main() {
    	defer func() {
    	}()
    	logger := log.NewJSONLogger(os.Stdout)
    
    	listener, err := net.Listen("tcp", "0.0.0.0:8080")
    	if err != nil {
    		level.Error(logger).Log("listen error: ", err.Error())
    		os.Exit(1)
    	}
    
    	pingService := services.NewPingService()
    	pingService = middlewares.LoggingPingServiceMiddleware(logger)(pingService)
    	pingEndPoints := endpoints.NewPingEndpoint(pingService)
    	pingGRPCServer := transports.NewPingGRPCServer(pingEndPoints, logger)
    
    	helloEndPoint := endpoints.NewHelloEndpoint(services.NewHelloService())
    	helloGRPCServer := transports.NewHelloGRPCServer(helloEndPoint, logger)
    
    	grpcServer := grpc.NewServer()
    
    	// 注册健康检查服务
    	healthCheckServiceName := "my-grpc-service"
    	healthServer := health.NewServer()
    	healthServer.SetServingStatus(healthCheckServiceName, grpc_health_v1.HealthCheckResponse_SERVING)
    	grpc_health_v1.RegisterHealthServer(grpcServer, healthServer)  // 注册健康检查服务
    
    
    	proto.RegisterHelloServer(grpcServer, helloGRPCServer)
    	proto.RegisterPingServer(grpcServer, pingGRPCServer)
    	r := register_service.RegisterConsole("172.24.25.123", "8500", "172.24.25.123", "8080", logger)
    
    	go func() {
    		r.Register()
    		err = grpcServer.Serve(listener)
    		if err != nil {
    			level.Error(logger).Log("grpc error:", err)
    		}
    
    	}()
    
    	// 处理终止信号
    	signalChan := make(chan os.Signal, 1)
    	signal.Notify(signalChan, syscall.SIGTERM, syscall.SIGINT)
    	<-signalChan
    
    	// 取消注册服务
    	r.Deregister()
    
    }
    
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76

# 服务发现与负载均衡

  • go-kit 为不同的服务发现系统(eureka、zookeeper、consul、etcd等)提供适配器,Endpointer负责监听服务发现系统,并根据需要生成一组相同的endpoint

    type Endpointer interface {
    	Endpoints() ([]endpoint.Endpoint, error)
    }
    
    1
    2
    3
  • go-kit 提供了工厂函数——Factory, 它是一个将实例字符串(例如host:port)转换为特定端点的函数。提供多个端点的实例需要多个工厂函数。工厂函数还返回一个当实例消失并需要清理时调用的io.Closer。

    type Factory func(instance string) (endpoint.Endpoint, io.Closer, error)
    
    1
  • 定义如下工厂函数,它会根据传入的实例地址创建一个 gRPC 客户端的 endpoint。

    func factory(instance string) (endpoint.Endpoint, io.Closer, error) {
    	conn, err := grpc.Dial(instance, grpc.WithInsecure())
    	if err != nil {
    		return nil, nil, err
    	}
    	e := makeEndpoint(conn)
    	return e, conn, err
    }
    
    1
    2
    3
    4
    5
    6
    7
    8
  • demo:

    package register_service
    
    import (
    	"context"
    	"github.com/go-kit/kit/endpoint"
    	"github.com/go-kit/kit/log"
    	"github.com/go-kit/kit/sd"
    	"github.com/go-kit/kit/sd/consul"
    	"github.com/go-kit/kit/sd/lb"
    	"google.golang.org/grpc"
    	"io"
    	"time"
    )
    
    func MakeConsulDiscoverEndpoint(ctx context.Context, client consul.Client, logger log.Logger) endpoint.Endpoint {
    	serviceName := "my-kit-demo--grpc-check"
    	tags := []string{"my-kit", "register-demo"}
    	duration := 500 * time.Millisecond
    
    	// 创建consul的连接实例
    	instance := consul.NewInstancer(client, logger, serviceName, tags, true)
    
    	// 使用consul连接实例(发现服务系统)、factory创建sd.Factory
    	endpointer := sd.NewEndpointer(instance, factory, logger)
    
    	// 创建RoundRibbon负载均衡器
    	balancer := lb.NewRoundRobin(endpointer)
    
    	// 为负载均衡器增加重试功能,同时该对象为endpoint.Endpoint
    	retry := lb.Retry(1, duration, balancer)
    
    	return retry
    }
    
    func factory(instance string) (endpoint endpoint.Endpoint, closer io.Closer, err error) {
    	conn, err := grpc.Dial(instance, grpc.WithInsecure())
    	if err != nil {
    		return nil, nil, err
    	}
    	defer conn.Close()
    
    	return NewEndpoint(), nil, nil
    }
    
    
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44

# 完整代码

  • https://github.com/hellolib/go-kit-demo/tree/main/my-kit-register-demo
#Go
上次更新: 2023/04/16, 18:35:33
【go-kit教程】go-kit中间件
【go-kit教程】go-kit限流和熔断

← 【go-kit教程】go-kit中间件 【go-kit教程】go-kit限流和熔断→

最近更新
01
go与http代理
05-24
02
自制申威架构k8s-reloader
12-06
03
Docker Buildx 教程
12-01
更多文章>

Related Issues not found

Please contact @hellolib to initialize the comment

Theme by Vdoing | Copyright © 2020-2025 小刘扎扎 | MIT License
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式