Go操作etcd
# 安装
go get go.etcd.io/etcd/client/v3
1
# 基本操作
# 1. get/put 操作
/**
* @date: 2022/7/23
* @desc:
*/
package main
import (
"context"
"fmt"
"go.etcd.io/etcd/client/v3"
"log"
"time"
)
func main() {
client, err := clientv3.New(clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
})
if err != nil {
log.Fatalln(err)
}
// PUT
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
key := "/ns/service"
value := "127.0.0.1:8000"
_, err = client.Put(ctx, key, value)
if err != nil {
log.Fatalf("etcd put error,%v\n", err)
return
}
// GET
getResponse, err := client.Get(ctx, key)
if err != nil {
log.Printf("etcd GET error,%v\n", err)
return
}
for _, kv := range getResponse.Kvs {
fmt.Printf("%s=%s\n", kv.Key, kv.Value)
}
// /ns/service=127.0.0.1:8000
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# 2. watch 监听操作
/**
* @date: 2022/7/23
* @desc:
*/
package main
import (
"context"
"fmt"
"go.etcd.io/etcd/client/v3"
"log"
"strconv"
"time"
)
func main() {
client, err := clientv3.New(clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
})
if err != nil {
log.Fatalln(err)
}
key := "/ns/service"
// 监听变化
go watcher(client, key)
value := "127.0.0.1:800"
ctx := context.Background()
// 每隔2秒重新PUT一次
for i := 0; i < 100; i++ {
time.Sleep(2 * time.Second)
_, err := client.Put(ctx, key, value+strconv.Itoa(i))
if err != nil {
log.Printf("put error %v", err)
}
}
}
func watcher(client *clientv3.Client, key string) {
// 监听这个chan
watchChan := client.Watch(context.Background(), key)
for watchResponse := range watchChan {
for _, event := range watchResponse.Events {
fmt.Printf("Type:%s,Key:%s,Value:%s\n", event.Type, event.Kv.Key, event.Kv.Value)
/**
Type:PUT,Key:/ns/service,Value:127.0.0.1:8000
Type:PUT,Key:/ns/service,Value:127.0.0.1:8001
Type:PUT,Key:/ns/service,Value:127.0.0.1:8002
Type:PUT,Key:/ns/service,Value:127.0.0.1:8003
...
*/
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# 3. 租约、续租
package main
import (
"context"
"fmt"
"go.etcd.io/etcd/client/v3"
"os"
"time"
)
const (
NewLeaseErr = 101
LeasTtlErr = 102
KeepAliveErr = 103
PutErr = 104
GetErr = 105
RevokeErr = 106
)
func main() {
var conf = clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
}
client, err := clientv3.New(conf)
defer client.Close()
if err != nil {
fmt.Printf("创建client失败:\n", err.Error())
os.Exit(NewLeaseErr)
}
//创建租约
lease := clientv3.NewLease(client)
//设置租约时间
leaseResp, err := lease.Grant(context.TODO(), 10)
if err != nil {
fmt.Printf("设置租约时间失败:%s\n", err.Error())
os.Exit(LeasTtlErr)
}
//设置续租
leaseID := leaseResp.ID
ctx, cancelFunc := context.WithCancel(context.TODO())
leaseRespChan, err := lease.KeepAlive(ctx, leaseID)
if err != nil {
fmt.Printf("续租失败:%s\n", err.Error())
os.Exit(KeepAliveErr)
}
//监听租约
go func() {
for {
select {
case leaseKeepResp := <-leaseRespChan:
if leaseKeepResp == nil {
fmt.Printf("已经关闭续租功能\n")
return
} else {
fmt.Printf("续租成功\n")
goto END
}
}
END:
time.Sleep(500*time.Millisecond)
}
}()
//监听某个key的变化
//ctx1, _ := context.WithTimeout(context.TODO(),20)
go func() {
wc := client.Watch(context.TODO(), "/job/v3/1", clientv3.WithPrevKV())
for v := range wc {
for _, e := range v.Events {
fmt.Printf("type:%v kv:%v prevKey:%v \n ", e.Type, string(e.Kv.Key), e.PrevKv)
}
}
}()
kv := clientv3.NewKV(client)
//通过租约put
putResp, err := kv.Put(context.TODO(), "/job/v3/1", "koock",clientv3.WithLease(leaseID))
if err != nil {
fmt.Printf("put 失败:%s", err.Error())
os.Exit(PutErr)
}
fmt.Printf("%v\n",putResp.Header)
cancelFunc()
time.Sleep(2*time.Second)
_, err = lease.Revoke(context.TODO(), leaseID)
if err != nil {
fmt.Printf("撤销租约失败:%s\n",err.Error())
os.Exit(RevokeErr)
}
fmt.Printf("撤销租约成功")
getResp,err := kv.Get(context.TODO(),"/job/v3/1")
if err != nil{
fmt.Printf("get 失败:%s",err.Error())
os.Exit(GetErr)
}
fmt.Printf("%v",getResp.Kvs)
time.Sleep(20 * time.Second)
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
上次更新: 2023/04/16, 18:35:33