generated from rizalgowandy/library-template-go
-
-
Notifications
You must be signed in to change notification settings - Fork 5
/
distributed_lock.go
72 lines (63 loc) · 1.79 KB
/
distributed_lock.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
package interceptor
import (
"context"
"fmt"
"github.com/go-redsync/redsync/v4"
"github.com/rizalgowandy/cronx"
"github.com/rizalgowandy/gdk/pkg/env"
"github.com/rizalgowandy/gdk/pkg/errorx/v2"
"github.com/rizalgowandy/gdk/pkg/logx"
"github.com/rizalgowandy/gdk/pkg/netx"
"github.com/rizalgowandy/gdk/pkg/tags"
)
type DistributedLockItf interface {
Mutex(name string) *redsync.Mutex
}
// DistributedLock is a middleware that prevents a process executed at the same time across servers.
func DistributedLock(
serviceName string,
mode string,
dl DistributedLockItf,
sc SlackClientItf,
) cronx.Interceptor {
var (
currentEnv = env.GetCurrent()
isProduction = env.IsProduction()
ipAddress = netx.GetIPv4()
)
return func(ctx context.Context, job *cronx.Job, handler cronx.Handler) error {
// Create lock.
mutex := dl.Mutex(job.Name)
if err := mutex.LockContext(ctx); err != nil {
logx.ERR(
ctx,
errorx.E(err, errorx.Op(job.Name), errorx.CodeInternal, errorx.Fields{
tags.Address: ipAddress,
}),
"distributed lock: cannot gain lock for current process",
)
return err
}
jobErr := handler(ctx, job)
// Unlock.
if _, err := mutex.UnlockContext(ctx); err != nil {
logx.ERR(
ctx,
errorx.E(err, errorx.Op(job.Name), errorx.CodeInternal, errorx.Fields{
tags.Address: ipAddress,
}),
"distributed lock: cannot unlock process",
)
// Send slack alert when failed to unlock.
msg := fmt.Sprintf("[%s] *%v*", currentEnv, err)
msg += fmt.Sprintf(" | `host: %s` | `app: %s-%s`", ipAddress, serviceName, mode)
msg += fmt.Sprintf("\n*Job:* `%s`", job.Name)
msg += fmt.Sprintf(
"\n*Request ID:* `%s` _(search log file using this id)_",
logx.GetRequestID(ctx),
)
sc.Send(ctx, msg, isProduction)
}
return jobErr
}
}