-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathlister.go
146 lines (136 loc) · 3.86 KB
/
lister.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package configv3
/* lister.go implements lister struct that lists all file names under specified path. */
import (
"container/list"
"context"
"fmt"
"path/filepath"
"regexp"
"strings"
"sync"
clientv3 "go.etcd.io/etcd/client/v3"
"github.com/17media/logrus"
)
// lister implemnts config library's List() function.
type lister struct {
client *clientImpl
// path is the path to be listed.
path string
// tree stores all files under path and is updated automatically by listening to zk.
tree map[string]struct{}
// lock protect the tree
lock sync.RWMutex
}
// etcd usage is not as expected now
func newLister(client *clientImpl, path string) (*lister, error) {
ctx := context.TODO()
if client == nil {
return nil, fmt.Errorf("empty config client")
}
if path == "" {
return nil, fmt.Errorf("need to specify a related path")
}
//create a filepath instance of storer
logrus.Infof("new storer filepath with root %v, path %v", client.root, path)
ls := &lister{
client: client,
path: strings.Trim(path, "/"),
}
//monitor the all dir/files that under the root
escPath := regexp.QuoteMeta(path)
regex, err := regexp.Compile("^" + escPath)
if err != nil {
return nil, fmt.Errorf("regexp compile error: %v", err)
}
// add listener to listen the change
// if any modify on remote storer path, modify the tree in the storer filepath
logrus.Infof("listening on %v, path %v, escaped path %v", regex, path, escPath)
ch := ls.client.AddListener(regex)
// build the tree first
lsFunc := func(path string) ([]string, error) {
resp, err := client.etcdConn.Get(ctx, client.root, clientv3.WithSort(clientv3.SortByKey, clientv3.SortAscend))
if err != nil {
return nil, err
}
c := []string{}
for _, n := range resp.Kvs {
if string(n.Key) != filepath.Base(infoPrefix) {
// NOTE: in etcd, children are absolute path, so change path to relative path
if f, e := filepath.Rel(path, string(n.Key)); e == nil {
c = append(c, f)
}
}
}
return c, nil
}
ls.tree, err = buildTree(client.root, path, lsFunc)
if err != nil {
return nil, fmt.Errorf("can't build tree. root %v path %v err %v", client.root, path, err)
}
go func() {
for mod := range *ch {
logrus.Infof("Op: %v, path: %v", mod.Op, mod.Path)
//update the struct
switch strings.ToUpper(mod.Op) {
case "A":
logrus.Infof("add file path: %v", mod.Path)
ls.lock.Lock()
ls.tree[mod.Path] = struct{}{}
ls.lock.Unlock()
case "D":
logrus.Infof("delete file path: %v", mod.Path)
ls.lock.Lock()
delete(ls.tree, mod.Path)
ls.lock.Unlock()
}
}
logrus.Warning("leaving config service listener thread")
}()
return ls, nil
}
// List returns a snapshot of current tree strcuture
func (l *lister) List() map[string][]byte {
l.lock.RLock()
defer l.lock.RUnlock()
t := make(map[string][]byte)
for k := range l.tree {
v, err := l.client.Get(k)
if err != nil {
l.client.ctr.BumpSum(cListGetFail, 1)
logrus.Warningf("Skip path %v because can't get the content. Err %v", k, err)
continue
}
t[k] = v
}
return t
}
// buildTree clone the whole tree in config service to storer filepath
var buildTree = func(
confRoot string,
treeRoot string,
lsFunc func(path string) ([]string, error),
) (map[string]struct{}, error) {
tree := make(map[string]struct{})
queue := list.New()
queue.PushBack(treeRoot)
for e := queue.Front(); e != nil; e = e.Next() {
path := e.Value.(string)
path = strings.TrimPrefix(path, "/")
//setting
children, err := lsFunc(filepath.Join(confRoot, path))
if err != nil {
logrus.Errorf("get children from %v error: %v", path, err)
return nil, err
}
// This is a file. Get the content and add to tree.
if len(children) == 0 {
tree[path] = struct{}{}
continue
}
// This is a directory. Enqueue all children.
for _, file := range children {
queue.PushBack(filepath.Join(path, file))
}
}
return tree, nil
}