Skip to content

Commit 573330f

Browse files
authored
Merge pull request ipfs-force-community#131 from ipfs-force-community/feat/dtynn/worker_management
Feat/dtynn/worker management
2 parents 8ed2b8f + e9d4c30 commit 573330f

File tree

27 files changed

+832
-131
lines changed

27 files changed

+832
-131
lines changed

venus-sector-manager/cmd/venus-sector-manager/internal/global.go

+16
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"fmt"
88
"io"
99
"os/signal"
10+
"reflect"
1011
"strconv"
1112
"syscall"
1213
"time"
@@ -230,3 +231,18 @@ func OuputJSON(w io.Writer, v interface{}) error {
230231
enc.SetIndent("", "\t")
231232
return enc.Encode(v)
232233
}
234+
235+
func FormatOrNull(arg interface{}, f func() string) string {
236+
if arg == nil {
237+
return "NULL"
238+
}
239+
240+
rv := reflect.ValueOf(arg)
241+
rt := rv.Type()
242+
if rt.Kind() == reflect.Ptr && rv.IsNil() {
243+
return "NULL"
244+
245+
}
246+
247+
return f()
248+
}

venus-sector-manager/cmd/venus-sector-manager/internal/util.go

+4
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@ var UtilCmd = &cli.Command{
1414
utilSealerCmd,
1515
utilMarketCmd,
1616
utilStorageCmd,
17+
utilWorkerCmd,
18+
},
19+
Flags: []cli.Flag{
20+
SealerListenFlag,
1721
},
1822
Before: func(cctx *cli.Context) error {
1923
logging.SetupForSub(logSubSystem)

venus-sector-manager/cmd/venus-sector-manager/internal/util_sealer.go

+2-4
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,8 @@ import (
1212
)
1313

1414
var utilSealerCmd = &cli.Command{
15-
Name: "sealer",
16-
Flags: []cli.Flag{
17-
SealerListenFlag,
18-
},
15+
Name: "sealer",
16+
Flags: []cli.Flag{},
1917
Subcommands: []*cli.Command{
2018
utilSealerSectorsCmd,
2119
utilSealerProvingCmd,
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,267 @@
1+
package internal
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"net"
7+
"os"
8+
"strconv"
9+
"text/tabwriter"
10+
"time"
11+
12+
"github.com/urfave/cli/v2"
13+
14+
"github.com/ipfs-force-community/venus-cluster/venus-sector-manager/core"
15+
"github.com/ipfs-force-community/venus-cluster/venus-sector-manager/modules/util"
16+
"github.com/ipfs-force-community/venus-cluster/venus-sector-manager/pkg/workercli"
17+
)
18+
19+
var utilWorkerCmd = &cli.Command{
20+
Name: "worker",
21+
Flags: []cli.Flag{},
22+
Usage: "utils for worker management",
23+
Subcommands: []*cli.Command{
24+
utilWorkerListCmd,
25+
utilWorkerInfoCmd,
26+
utilWorkerPauseCmd,
27+
utilWorkerResumeCmd,
28+
},
29+
}
30+
31+
var utilWorkerListCmd = &cli.Command{
32+
Name: "list",
33+
Flags: []cli.Flag{
34+
&cli.DurationFlag{
35+
Name: "expiration",
36+
Value: 10 * time.Minute,
37+
Usage: "timeout for regarding a woker as missing",
38+
},
39+
},
40+
Action: func(cctx *cli.Context) error {
41+
a, actx, stopper, err := extractAPI(cctx)
42+
if err != nil {
43+
return fmt.Errorf("get api: %w", err)
44+
}
45+
defer stopper()
46+
47+
pinfos, err := a.Sealer.WorkerPingInfoList(actx)
48+
if err != nil {
49+
return RPCCallError("WorkerPingInfoList", err)
50+
}
51+
52+
expiration := cctx.Duration("expiration")
53+
54+
tw := tabwriter.NewWriter(os.Stdout, 2, 4, 2, ' ', 0)
55+
defer tw.Flush()
56+
_, _ = fmt.Fprintln(tw, "Name\tDest\tThreads\tEmpty\tPaused\tErrors\tLastPing(with ! if expired)")
57+
for _, pinfo := range pinfos {
58+
lastPing := time.Since(time.Unix(pinfo.LastPing, 0))
59+
lastPingWarn := ""
60+
if lastPing > expiration {
61+
lastPingWarn = " (!)"
62+
}
63+
64+
_, _ = fmt.Fprintf(
65+
tw, "%s\t%s\t%d\t%d\t%d\t%d\t%s%s\n",
66+
pinfo.Info.Name,
67+
pinfo.Info.Dest,
68+
pinfo.Info.Summary.Threads,
69+
pinfo.Info.Summary.Empty,
70+
pinfo.Info.Summary.Paused,
71+
pinfo.Info.Summary.Errors,
72+
lastPing,
73+
lastPingWarn,
74+
)
75+
}
76+
77+
return nil
78+
},
79+
}
80+
81+
var utilWorkerInfoCmd = &cli.Command{
82+
Name: "info",
83+
Usage: "show details about the specific worker",
84+
ArgsUsage: "<worker instance name or address>",
85+
Action: func(cctx *cli.Context) error {
86+
args := cctx.Args()
87+
if args.Len() < 1 {
88+
return cli.ShowSubcommandHelp(cctx)
89+
}
90+
91+
name := args.First()
92+
93+
a, actx, stopper, err := extractAPI(cctx)
94+
if err != nil {
95+
return fmt.Errorf("get api: %w", err)
96+
}
97+
defer stopper()
98+
99+
dest, err := resolveWorkerDest(actx, a, name)
100+
if err != nil {
101+
return fmt.Errorf("resolve worker dest: %w", err)
102+
}
103+
104+
wcli, wstop, err := workercli.Connect(context.Background(), fmt.Sprintf("http://%s/", dest))
105+
if err != nil {
106+
return fmt.Errorf("connect to %s: %w", dest, err)
107+
}
108+
109+
defer wstop()
110+
111+
// use context.Background to avoid meta info
112+
details, err := wcli.WorkerList()
113+
if err != nil {
114+
return RPCCallError("WorkerList", err)
115+
}
116+
117+
if len(details) == 0 {
118+
return nil
119+
}
120+
121+
tw := tabwriter.NewWriter(os.Stdout, 2, 4, 2, ' ', 0)
122+
defer tw.Flush()
123+
_, _ = fmt.Fprintln(tw, "Index\tLoc\tSectorID\tPaused\tPausedElapsed\tState\tLastErr")
124+
125+
for _, detail := range details {
126+
_, _ = fmt.Fprintf(
127+
tw, "%d\t%s\t%s\t%v\t%s\t%s\t%s\n",
128+
detail.Index,
129+
detail.Location,
130+
FormatOrNull(detail.SectorID, func() string { return util.FormatSectorID(*detail.SectorID) }),
131+
detail.Paused,
132+
FormatOrNull(detail.PausedElapsed, func() string { return (time.Duration(*detail.PausedElapsed) * time.Second).String() }),
133+
detail.State,
134+
FormatOrNull(detail.LastError, func() string { return *detail.LastError }),
135+
)
136+
}
137+
138+
return nil
139+
},
140+
}
141+
142+
var utilWorkerPauseCmd = &cli.Command{
143+
Name: "pause",
144+
Usage: "pause the specified sealing thread inside target worker",
145+
ArgsUsage: "<worker instance name or address> <thread index>",
146+
Action: func(cctx *cli.Context) error {
147+
args := cctx.Args()
148+
if args.Len() < 2 {
149+
return cli.ShowSubcommandHelp(cctx)
150+
}
151+
152+
name := args.First()
153+
index, err := strconv.ParseUint(args.Get(1), 10, 64)
154+
if err != nil {
155+
return fmt.Errorf("parse thread index: %w", err)
156+
}
157+
158+
a, actx, stopper, err := extractAPI(cctx)
159+
if err != nil {
160+
return fmt.Errorf("get api: %w", err)
161+
}
162+
defer stopper()
163+
164+
dest, err := resolveWorkerDest(actx, a, name)
165+
if err != nil {
166+
return fmt.Errorf("resolve worker dest: %w", err)
167+
}
168+
169+
wcli, wstop, err := workercli.Connect(context.Background(), fmt.Sprintf("http://%s/", dest))
170+
if err != nil {
171+
return fmt.Errorf("connect to %s: %w", dest, err)
172+
}
173+
174+
defer wstop()
175+
176+
ok, err := wcli.WorkerPause(index)
177+
if err != nil {
178+
return RPCCallError("WorkerPause", err)
179+
}
180+
181+
Log.With("name", name, "dest", dest, "index", index).Infof("pause call done, ok = %v", ok)
182+
return nil
183+
},
184+
}
185+
186+
var utilWorkerResumeCmd = &cli.Command{
187+
Name: "resume",
188+
Usage: "resume the specified sealing thread inside target worker, with the given state if any",
189+
ArgsUsage: "<worker instance name or address> <thread index> [<next state>]",
190+
Action: func(cctx *cli.Context) error {
191+
args := cctx.Args()
192+
if args.Len() < 2 {
193+
return cli.ShowSubcommandHelp(cctx)
194+
}
195+
196+
name := args.First()
197+
index, err := strconv.ParseUint(args.Get(1), 10, 64)
198+
if err != nil {
199+
return fmt.Errorf("parse thread index: %w", err)
200+
}
201+
202+
var state *string
203+
if st := args.Get(2); st != "" {
204+
state = &st
205+
}
206+
207+
a, actx, stopper, err := extractAPI(cctx)
208+
if err != nil {
209+
return fmt.Errorf("get api: %w", err)
210+
}
211+
defer stopper()
212+
213+
dest, err := resolveWorkerDest(actx, a, name)
214+
if err != nil {
215+
return fmt.Errorf("resolve worker dest: %w", err)
216+
}
217+
218+
wcli, wstop, err := workercli.Connect(context.Background(), fmt.Sprintf("http://%s/", dest))
219+
if err != nil {
220+
return fmt.Errorf("connect to %s: %w", dest, err)
221+
}
222+
223+
defer wstop()
224+
225+
ok, err := wcli.WorkerResume(index, state)
226+
if err != nil {
227+
return RPCCallError("WorkerPause", err)
228+
}
229+
230+
Log.With("name", name, "dest", dest, "index", index, "state", state).Infof("resume call done, ok = %v", ok)
231+
return nil
232+
},
233+
}
234+
235+
func resolveWorkerDest(ctx context.Context, a *API, name string) (string, error) {
236+
var info *core.WorkerPingInfo
237+
var err error
238+
if a != nil {
239+
info, err = a.Sealer.WorkerGetPingInfo(ctx, name)
240+
if err != nil {
241+
return "", RPCCallError("WorkerGetPingInfo", err)
242+
}
243+
}
244+
245+
if info != nil {
246+
return info.Info.Dest, nil
247+
}
248+
249+
addr, err := net.ResolveTCPAddr("tcp", name)
250+
if err != nil {
251+
ip, err := net.ResolveIPAddr("", name)
252+
if err != nil {
253+
return "", fmt.Errorf("no instance found, and unable to parse %q as address", name)
254+
}
255+
256+
addr = &net.TCPAddr{
257+
IP: ip.IP,
258+
Zone: ip.Zone,
259+
}
260+
}
261+
262+
if addr.Port == 0 {
263+
addr.Port = core.DefaultWorkerListenPort
264+
}
265+
266+
return addr.String(), nil
267+
}

0 commit comments

Comments
 (0)