Skip to content

Commit 64d87c4

Browse files
WangXiangUSTCIANTHEREAL
authored andcommitted
pump client: add compatible with kafka version && refine retry && add strategy config (#223)
* pump client: compatible with kafka version tidb-binlog && add unit test (#139) * pump client: write commit binlog will never return error (#148) * pkg watcher: move watcher from tidb-enterprise-tools (#146) * pump client: increase retry time, and refine some code (#158) * pump client: add initial log function (#165) * pump client: support change select pump's strategy (#221)
1 parent 419f308 commit 64d87c4

File tree

14 files changed

+1287
-292
lines changed

14 files changed

+1287
-292
lines changed

checker/main.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,10 @@ import (
2020
"os"
2121

2222
_ "github.com/go-sql-driver/mysql"
23-
"github.com/ngaut/log"
2423
"github.com/pingcap/tidb-tools/pkg/check"
2524
"github.com/pingcap/tidb-tools/pkg/dbutil"
2625
"github.com/pingcap/tidb-tools/pkg/utils"
26+
log "github.com/sirupsen/logrus"
2727
)
2828

2929
var (

dump_region/main.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ package main
1515

1616
import (
1717
"bytes"
18+
"context"
1819
"encoding/json"
1920
"flag"
2021
"fmt"
@@ -25,7 +26,6 @@ import (
2526
"github.com/pingcap/tidb-tools/pkg/utils"
2627
"github.com/pingcap/tidb/tablecodec"
2728
"github.com/pingcap/tidb/util/codec"
28-
"golang.org/x/net/context"
2929
)
3030

3131
var (

pkg/etcd/etcd.go

+12-9
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,14 @@
1414
package etcd
1515

1616
import (
17+
"context"
1718
"crypto/tls"
1819
"path"
1920
"strings"
2021
"time"
2122

2223
"github.com/coreos/etcd/clientv3"
2324
"github.com/pingcap/errors"
24-
"golang.org/x/net/context"
2525
)
2626

2727
// Node organizes the ectd query result as a Trie tree
@@ -89,18 +89,18 @@ func (e *Client) Create(ctx context.Context, key string, val string, opts []clie
8989
}
9090

9191
// Get returns a key/value matchs the given key
92-
func (e *Client) Get(ctx context.Context, key string) ([]byte, error) {
92+
func (e *Client) Get(ctx context.Context, key string) (value []byte, revision int64, err error) {
9393
key = keyWithPrefix(e.rootPath, key)
9494
resp, err := e.client.KV.Get(ctx, key)
9595
if err != nil {
96-
return nil, errors.Trace(err)
96+
return nil, -1, errors.Trace(err)
9797
}
9898

9999
if len(resp.Kvs) == 0 {
100-
return nil, errors.NotFoundf("key %s in etcd", key)
100+
return nil, -1, errors.NotFoundf("key %s in etcd", key)
101101
}
102102

103-
return resp.Kvs[0].Value, nil
103+
return resp.Kvs[0].Value, resp.Header.Revision, nil
104104
}
105105

106106
// Update updates a key/value.
@@ -156,15 +156,15 @@ func (e *Client) UpdateOrCreate(ctx context.Context, key string, val string, ttl
156156
}
157157

158158
// List returns the trie struct that constructed by the key/value with same prefix
159-
func (e *Client) List(ctx context.Context, key string) (*Node, error) {
159+
func (e *Client) List(ctx context.Context, key string) (node *Node, revision int64, err error) {
160160
key = keyWithPrefix(e.rootPath, key)
161161
if !strings.HasSuffix(key, "/") {
162162
key += "/"
163163
}
164164

165165
resp, err := e.client.KV.Get(ctx, key, clientv3.WithPrefix())
166166
if err != nil {
167-
return nil, errors.Trace(err)
167+
return nil, -1, errors.Trace(err)
168168
}
169169

170170
root := new(Node)
@@ -180,7 +180,7 @@ func (e *Client) List(ctx context.Context, key string) (*Node, error) {
180180
tailNode.Value = kv.Value
181181
}
182182

183-
return root, nil
183+
return root, resp.Header.Revision, nil
184184
}
185185

186186
// Delete deletes the key/values with matching prefix or key
@@ -200,7 +200,10 @@ func (e *Client) Delete(ctx context.Context, key string, withPrefix bool) error
200200
}
201201

202202
// Watch watchs the events of key with prefix.
203-
func (e *Client) Watch(ctx context.Context, prefix string) clientv3.WatchChan {
203+
func (e *Client) Watch(ctx context.Context, prefix string, revision int64) clientv3.WatchChan {
204+
if revision > 0 {
205+
return e.client.Watch(ctx, prefix, clientv3.WithPrefix(), clientv3.WithRev(revision))
206+
}
204207
return e.client.Watch(ctx, prefix, clientv3.WithPrefix())
205208
}
206209

pkg/etcd/etcd_test.go

+20-8
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,15 @@
1414
package etcd
1515

1616
import (
17+
"context"
1718
"testing"
1819
"time"
1920

2021
"github.com/coreos/etcd/clientv3"
2122
"github.com/coreos/etcd/integration"
23+
"github.com/pingcap/check"
2224
. "github.com/pingcap/check"
2325
"github.com/pingcap/errors"
24-
"golang.org/x/net/context"
2526
)
2627

2728
var (
@@ -71,7 +72,7 @@ func (t *testEtcdSuite) TestCreateWithTTL(c *C) {
7172
c.Assert(err, IsNil)
7273

7374
time.Sleep(2 * time.Second)
74-
_, err = etcdCli.Get(ctx, key)
75+
_, _, err = etcdCli.Get(ctx, key)
7576
c.Assert(errors.IsNotFound(err), IsTrue)
7677
}
7778

@@ -99,19 +100,25 @@ func (t *testEtcdSuite) TestUpdate(c *C) {
99100
err = etcdCli.Create(ctx, key, obj1, opts)
100101
c.Assert(err, IsNil)
101102

103+
res, revision1, err := etcdCli.Get(ctx, key)
104+
c.Assert(err, IsNil)
105+
c.Assert(string(res), Equals, obj1)
106+
102107
time.Sleep(time.Second)
103108

104109
err = etcdCli.Update(ctx, key, obj2, 3)
105110
c.Assert(err, IsNil)
106111

107112
time.Sleep(2 * time.Second)
108113

109-
res, err := etcdCli.Get(ctx, key)
114+
// the new revision should greater than the old
115+
res, revision2, err := etcdCli.Get(ctx, key)
110116
c.Assert(err, IsNil)
111117
c.Assert(string(res), Equals, obj2)
118+
c.Assert(revision2, check.Greater, revision1)
112119

113120
time.Sleep(2 * time.Second)
114-
res, err = etcdCli.Get(ctx, key)
121+
res, _, err = etcdCli.Get(ctx, key)
115122
c.Assert(errors.IsNotFound(err), IsTrue)
116123
}
117124

@@ -142,12 +149,17 @@ func (t *testEtcdSuite) TestList(c *C) {
142149
err = etcdCli.Create(ctx, k11, k11, nil)
143150
c.Assert(err, IsNil)
144151

145-
root, err := etcdCli.List(ctx, key)
152+
root, revision1, err := etcdCli.List(ctx, key)
146153
c.Assert(err, IsNil)
147154
c.Assert(string(root.Childs["level1"].Value), Equals, k1)
148155
c.Assert(string(root.Childs["level1"].Childs["level1"].Value), Equals, k11)
149156
c.Assert(string(root.Childs["level2"].Value), Equals, k2)
150157
c.Assert(string(root.Childs["level3"].Value), Equals, k3)
158+
159+
// the revision of list should equal to the latest update's revision
160+
_, revision2, err := etcdCli.Get(ctx, k11)
161+
c.Assert(err, IsNil)
162+
c.Assert(revision1, Equals, revision2)
151163
}
152164

153165
func (t *testEtcdSuite) TestDelete(c *C) {
@@ -158,21 +170,21 @@ func (t *testEtcdSuite) TestDelete(c *C) {
158170
c.Assert(err, IsNil)
159171
}
160172

161-
root, err := etcdCli.List(ctx, key)
173+
root, _, err := etcdCli.List(ctx, key)
162174
c.Assert(err, IsNil)
163175
c.Assert(root.Childs, HasLen, 2)
164176

165177
err = etcdCli.Delete(ctx, keys[1], false)
166178
c.Assert(err, IsNil)
167179

168-
root, err = etcdCli.List(ctx, key)
180+
root, _, err = etcdCli.List(ctx, key)
169181
c.Assert(err, IsNil)
170182
c.Assert(root.Childs, HasLen, 1)
171183

172184
err = etcdCli.Delete(ctx, key, true)
173185
c.Assert(err, IsNil)
174186

175-
root, err = etcdCli.List(ctx, key)
187+
root, _, err = etcdCli.List(ctx, key)
176188
c.Assert(err, IsNil)
177189
c.Assert(root.Childs, HasLen, 0)
178190
}

pkg/watcher/event.go

+75
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
package watcher
2+
3+
import (
4+
"bytes"
5+
"os"
6+
)
7+
8+
// Op represents file operation type
9+
type Op uint32
10+
11+
// Operations type current supported
12+
const (
13+
Create Op = 1 << iota
14+
Remove
15+
Modify
16+
Rename
17+
Chmod
18+
Move
19+
)
20+
21+
func (op Op) String() string {
22+
var buffer bytes.Buffer
23+
24+
// now, only one Op will used in polling, but it can combine multi Ops if needed
25+
if op&Create == Create {
26+
buffer.WriteString("|CREATE")
27+
}
28+
if op&Remove == Remove {
29+
buffer.WriteString("|REMOVE")
30+
}
31+
if op&Modify == Modify {
32+
buffer.WriteString("|MODIFY")
33+
}
34+
if op&Rename == Rename {
35+
buffer.WriteString("|RENAME")
36+
}
37+
if op&Chmod == Chmod {
38+
buffer.WriteString("|CHMOD")
39+
}
40+
if op&Move == Move {
41+
buffer.WriteString("|MOVE")
42+
}
43+
if buffer.Len() == 0 {
44+
return ""
45+
}
46+
return buffer.String()[1:] // Strip leading pipe
47+
}
48+
49+
// Event represents a single file operation event
50+
type Event struct {
51+
Path string
52+
Op Op
53+
FileInfo os.FileInfo
54+
}
55+
56+
// IsDirEvent returns whether is a event for a directory
57+
func (e *Event) IsDirEvent() bool {
58+
if e == nil {
59+
return false
60+
}
61+
return e.FileInfo.IsDir()
62+
}
63+
64+
// HasOps checks whether has any specified operation types
65+
func (e *Event) HasOps(ops ...Op) bool {
66+
if e == nil {
67+
return false
68+
}
69+
for _, op := range ops {
70+
if e.Op&op != 0 {
71+
return true
72+
}
73+
}
74+
return false
75+
}

0 commit comments

Comments
 (0)