-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathpncounter.go
101 lines (78 loc) · 1.81 KB
/
pncounter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
package rapport
import (
"sync"
"github.com/luma/pith/rapport/marshalling"
)
type PNCounter struct {
replicaId string
value *marshalling.PNCounterValue
l sync.RWMutex
}
func CreatePNCounter(replicaId string) *PNCounter {
return &PNCounter{
replicaId: replicaId,
value: marshalling.CreatePNCounter(replicaId),
}
}
func (p *PNCounter) Incr() int64 {
p.l.Lock()
value := p.value.Incr(p.replicaId)
p.l.Unlock()
return value
}
func (p *PNCounter) IncrBy(amount int64) int64 {
p.l.Lock()
value := p.value.IncrBy(p.replicaId, amount)
p.l.Unlock()
return value
}
func (p *PNCounter) Decr() (int64, error) {
p.l.Lock()
value, err := p.value.Decr(p.replicaId)
p.l.Unlock()
return value, err
}
func (p *PNCounter) DecrBy(amount int64) (int64, error) {
return p.value.DecrBy(p.replicaId, amount)
}
func (p *PNCounter) Value() (total int64) {
p.l.RLock()
value := p.value.Value()
p.l.RUnlock()
return value
}
func (p *PNCounter) Merge(crdt CRDT) {
other := crdt.(*PNCounter)
p.l.Lock()
other.l.Lock()
defer func() {
p.l.Unlock()
other.l.Unlock()
}()
for id, incVal := range other.value.Inc {
if localInc, exists := p.value.Inc[id]; !exists || localInc < incVal {
p.value.Inc[id] = incVal
}
if localDec, exists := p.value.Dec[id]; !exists || localDec < other.value.Dec[id] {
p.value.Dec[id] = other.value.Dec[id]
}
}
}
// Marshal serialises the counter data to bytes
func (p *PNCounter) Marshal() ([]*Segment, error) {
v, err := p.value.Marshal()
if err != nil {
return nil, err
}
segment := &Segment{
Value: v,
}
return []*Segment{segment}, nil
}
// Marshal deserialises the counter data from bytes
func (p *PNCounter) Unmarshal(data []*Segment) error {
if p.value == nil {
p.value = marshalling.CreatePNCounter(p.replicaId)
}
return p.value.Unmarshal(data[0].Value)
}