-
Notifications
You must be signed in to change notification settings - Fork 84
/
Copy pathgc.go
228 lines (205 loc) · 6.14 KB
/
gc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
package postgres
import (
"context"
"errors"
"fmt"
"runtime"
"strings"
"time"
"github.com/google/uuid"
"github.com/jackc/pgtype"
"github.com/jackc/pgx/v4/pgxpool"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/quay/zlog"
"golang.org/x/sync/semaphore"
)
var (
gcCounter = promauto.NewCounterVec(
prometheus.CounterOpts{
Namespace: "claircore",
Subsystem: "vulnstore",
Name: "gc_total",
Help: "Total number of database queries issued in the GC method.",
},
[]string{"query", "success"},
)
gcDuration = promauto.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "claircore",
Subsystem: "vulnstore",
Name: "gc_duration_seconds",
Help: "The duration of all queries issued in the GC method",
},
[]string{"query"},
)
)
const (
// GCThrottle sets a limit for the number of deleted update operations
// (and subsequent cascade deletes in the uo_vuln table) that can occur in a GC run.
GCThrottle = 50
)
// GC is split into two phases, first it will identify any update operations
// which are older then the provided keep value and delete these.
//
// Next it will perform updater based deletions of any vulns from the vuln table
// which are not longer referenced by update operations.
//
// The GC is throttled to not overload the database with cascade deletes.
// If a full GC is required run this method until the returned int64 value
// is 0.
func (s *MatcherStore) GC(ctx context.Context, keep int) (int64, error) {
// obtain update operations which need deletin'
ops, totalOps, err := eligibleUpdateOpts(ctx, s.pool, keep)
if err != nil {
return 0, err
}
// delete em', but not too many...
if totalOps >= GCThrottle {
ops = ops[:GCThrottle]
}
deletedOps, err := s.DeleteUpdateOperations(ctx, ops...)
if err != nil {
return totalOps - deletedOps, err
}
// get all updaters we know about.
updaters, err := distinctUpdaters(ctx, s.pool)
if err != nil {
return totalOps - deletedOps, err
}
// issue concurrent updater-based deletion for known updaters
// limit concurrency by available goroutines.
cpus := int64(runtime.GOMAXPROCS(0))
sem := semaphore.NewWeighted(cpus)
errC := make(chan error, len(updaters))
for _, updater := range updaters {
err = sem.Acquire(ctx, 1)
if err != nil {
break
}
go func(u string) {
defer sem.Release(1)
err := vulnCleanup(ctx, s.pool, u)
if err != nil {
errC <- err
}
}(updater)
}
// unconditionally wait for all in-flight go routines to return.
// the use of context.Background and lack of error checking is intentional.
// all in-flight go routines are guarantee to release their sems.
sem.Acquire(context.Background(), cpus)
close(errC)
if len(errC) > 0 {
b := strings.Builder{}
b.WriteString("encountered the following errors during gc: \n")
for e := range errC {
b.WriteString(e.Error() + "\n")
}
return totalOps - deletedOps, errors.New(b.String())
}
return totalOps - deletedOps, nil
}
// distinctUpdaters returns all updaters which have registered an update
// operation.
func distinctUpdaters(ctx context.Context, pool *pgxpool.Pool) ([]string, error) {
const (
// will always contain at least two update operations
selectUpdaters = `
SELECT DISTINCT(updater) FROM update_operation;
`
)
rows, err := pool.Query(ctx, selectUpdaters)
if err != nil {
return nil, fmt.Errorf("error selecting distinct updaters: %v", err)
}
defer rows.Close()
var updaters []string
for rows.Next() {
var updater string
err := rows.Scan(&updater)
switch err {
case nil:
// hop out
default:
return nil, fmt.Errorf("error scanning updater: %v", err)
}
updaters = append(updaters, updater)
}
if rows.Err() != nil {
return nil, rows.Err()
}
return updaters, nil
}
// eligibleUpdateOpts returns a list of update operation refs which exceed the specified
// keep value.
func eligibleUpdateOpts(ctx context.Context, pool *pgxpool.Pool, keep int) ([]uuid.UUID, int64, error) {
const (
// this query will return rows of UUID arrays.
// each returned array are the UUIDs which exceed the provided keep value
updateOps = `
WITH ordered_ops AS (
SELECT array_agg(ref ORDER BY date DESC) AS refs FROM update_operation GROUP BY updater
)
SELECT ordered_ops.refs[$1:]
FROM ordered_ops
WHERE array_length(ordered_ops.refs, 1) > $2;
`
)
// gather any update operations exceeding our keep value.
// keep+1 is used because PG's array slicing is inclusive,
// we want to grab all items once after our keep value.
m := []uuid.UUID{}
start := time.Now()
rows, err := pool.Query(ctx, updateOps, keep+1, keep)
switch err {
case nil:
default:
gcCounter.WithLabelValues("updateOps", "false").Inc()
return nil, 0, fmt.Errorf("error querying for update operations: %v", err)
}
gcCounter.WithLabelValues("updateOps", "true").Inc()
gcDuration.WithLabelValues("updateOps").Observe(time.Since(start).Seconds())
defer rows.Close()
for rows.Next() {
// pgx will not scan directly into a []uuid.UUID
tmp := pgtype.UUIDArray{}
err := rows.Scan(&tmp)
if err != nil {
return nil, 0, fmt.Errorf("error scanning update operations: %w", err)
}
for _, u := range tmp.Elements {
m = append(m, u.Bytes) // this works since [16]byte value is assignable to uuid.UUID
}
}
if rows.Err() != nil {
return nil, 0, rows.Err()
}
return m, int64(len(m)), nil
}
func vulnCleanup(ctx context.Context, pool *pgxpool.Pool, updater string) error {
const (
deleteOrphanedVulns = `
DELETE FROM vuln v1 USING
vuln v2
LEFT JOIN uo_vuln uvl
ON v2.id = uvl.vuln
WHERE uvl.vuln IS NULL
AND v2.updater = $1
AND v1.id = v2.id;
`
)
start := time.Now()
ctx = zlog.ContextWithValues(ctx, "updater", updater)
zlog.Debug(ctx).
Msg("starting clean up")
res, err := pool.Exec(ctx, deleteOrphanedVulns, updater)
if err != nil {
gcCounter.WithLabelValues("deleteVulns", "false").Inc()
return fmt.Errorf("failed while exec'ing vuln delete: %w", err)
}
zlog.Debug(ctx).Int64("rows affected", res.RowsAffected()).Msg("vulns deleted")
gcCounter.WithLabelValues("deleteVulns", "true").Inc()
gcDuration.WithLabelValues("deleteVulns").Observe(time.Since(start).Seconds())
return nil
}