-
Notifications
You must be signed in to change notification settings - Fork 1
/
derive.go
109 lines (97 loc) · 2.7 KB
/
derive.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
// SPDX-License-Identifier: Apache-2.0
// Copyright Authors of Cilium
package statedb
import (
"context"
"github.com/cilium/hive/cell"
"github.com/cilium/hive/job"
)
type DeriveResult int
const (
DeriveInsert DeriveResult = 0 // Insert the object
DeriveUpdate DeriveResult = 1 // Update the object (if it exists)
DeriveDelete DeriveResult = 2 // Delete the object
DeriveSkip DeriveResult = 3 // Skip
)
type DeriveParams[In, Out any] struct {
cell.In
Lifecycle cell.Lifecycle
Jobs job.Registry
Health cell.Health
DB *DB
InTable Table[In]
OutTable RWTable[Out]
}
// Derive constructs and registers a job to transform objects from the input table to the
// output table, e.g. derive the output table from the input table. Useful when constructing
// a reconciler that has its desired state solely derived from a single table. For example
// the bandwidth manager's desired state is directly derived from the devices table.
//
// Derive is parametrized with the transform function that transforms the input object
// into the output object. If the transform function returns false, then the object
// is skipped.
//
// Example use:
//
// cell.Invoke(
// statedb.Derive[*tables.Device, *Foo](
// func(d *Device, deleted bool) (*Foo, DeriveResult) {
// if deleted {
// return &Foo{Index: d.Index}, DeriveDelete
// }
// return &Foo{Index: d.Index}, DeriveInsert
// }),
// )
func Derive[In, Out any](jobName string, transform func(obj In, deleted bool) (Out, DeriveResult)) func(DeriveParams[In, Out]) {
return func(p DeriveParams[In, Out]) {
g := p.Jobs.NewGroup(p.Health)
g.Add(job.OneShot(
jobName,
derive[In, Out]{p, jobName, transform}.loop),
)
p.Lifecycle.Append(g)
}
}
type derive[In, Out any] struct {
DeriveParams[In, Out]
jobName string
transform func(obj In, deleted bool) (Out, DeriveResult)
}
func (d derive[In, Out]) loop(ctx context.Context, _ cell.Health) error {
out := d.OutTable
txn := d.DB.WriteTxn(d.InTable)
iter, err := d.InTable.Changes(txn)
txn.Commit()
if err != nil {
return err
}
for {
wtxn := d.DB.WriteTxn(out)
changes, watch := iter.Next(wtxn)
for change := range changes {
outObj, result := d.transform(change.Object, change.Deleted)
switch result {
case DeriveInsert:
_, _, err = out.Insert(wtxn, outObj)
case DeriveUpdate:
_, _, found := out.Get(wtxn, out.PrimaryIndexer().QueryFromObject(outObj))
if found {
_, _, err = out.Insert(wtxn, outObj)
}
case DeriveDelete:
_, _, err = out.Delete(wtxn, outObj)
case DeriveSkip:
}
if err != nil {
wtxn.Abort()
return err
}
}
wtxn.Commit()
select {
case <-watch:
case <-ctx.Done():
return nil
}
}
}