forked from application-research/estuary
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathgc.go
231 lines (184 loc) · 5.36 KB
/
gc.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
package main
import (
"context"
"fmt"
"github.com/application-research/estuary/util"
"github.com/ipfs/go-cid"
"golang.org/x/xerrors"
"gorm.io/gorm"
)
func (cm *ContentManager) GarbageCollect(ctx context.Context) error {
// since we're reference counting all the content, garbage collection becomes easy
// its even easier if we don't care that its 'perfect'
// We can probably even just remove stuff when its references are removed from the database
keych, err := cm.Blockstore.AllKeysChan(ctx)
if err != nil {
return err
}
for c := range keych {
_, err := cm.maybeRemoveObject(ctx, c)
if err != nil {
return err
}
}
return nil
}
func (cm *ContentManager) maybeRemoveObject(ctx context.Context, c cid.Cid) (bool, error) {
cm.contentLk.Lock()
defer cm.contentLk.Unlock()
keep, err := cm.trackingObject(c)
if err != nil {
return false, err
}
if !keep {
// can batch these deletes and execute them at the datastore layer for more perfs
if err := cm.Blockstore.DeleteBlock(ctx, c); err != nil {
return false, err
}
return true, nil
}
return false, nil
}
func (cm *ContentManager) trackingObject(c cid.Cid) (bool, error) {
cm.inflightCidsLk.Lock()
ok := cm.isInflight(c)
cm.inflightCidsLk.Unlock()
if ok {
return true, nil
}
var count int64
if err := cm.DB.Model(&Object{}).Where("cid = ?", c.Bytes()).Count(&count).Error; err != nil {
if xerrors.Is(err, gorm.ErrRecordNotFound) {
return false, nil
}
return false, err
}
return count > 0, nil
}
func (cm *ContentManager) RemoveContent(ctx context.Context, c uint, now bool) error {
ctx, span := cm.tracer.Start(ctx, "RemoveContent")
defer span.End()
cm.contentLk.Lock()
defer cm.contentLk.Unlock()
if err := cm.DB.Delete(&Content{}, c).Error; err != nil {
return fmt.Errorf("failed to delete content from db: %w", err)
}
var objIds []struct {
Object uint
}
if err := cm.DB.Model(&ObjRef{}).Find(&objIds, "content = ?", c).Error; err != nil {
return fmt.Errorf("failed to gather referenced object IDs: %w", err)
}
if err := cm.DB.Where("content = ?", c).Delete(&ObjRef{}).Error; err != nil {
return fmt.Errorf("failed to delete related object references: %w", err)
}
ids := make([]uint, len(objIds))
for i, obj := range objIds {
ids[i] = obj.Object
}
// Since im kinda bad at sql, this is going to be faster than the naive
// query for now. Maybe can think of something more clever later
batchSize := 100
for i := 0; i < len(ids); i += 100 {
count := batchSize
if len(ids[i:]) < count {
count = len(ids[i:])
}
slice := ids[i : i+count]
subq := cm.DB.Table("obj_refs").Select("1").Where("obj_refs.object = objects.id")
if err := cm.DB.Where("id IN ? and not exists (?)", slice, subq).Delete(&Object{}).Error; err != nil {
return err
}
}
if !now {
return nil
}
// TODO: copied from the offloading method, need to refactor this into something better
q := cm.DB.Model(&ObjRef{}).
Select("cid").
Joins("left join objects on obj_refs.object = objects.id").
Group("cid").
Having("MIN(obj_refs.offloaded) = 1")
rows, err := q.Rows()
if err != nil {
return err
}
for rows.Next() {
var dbc util.DbCID
if err := rows.Scan(&dbc); err != nil {
return err
}
if err := cm.Blockstore.DeleteBlock(ctx, dbc.CID); err != nil {
return err
}
}
return nil
}
func (cm *ContentManager) unpinContent(ctx context.Context, contid uint) error {
var pin Content
if err := cm.DB.First(&pin, "id = ?", contid).Error; err != nil {
return err
}
objs, err := cm.objectsForPin(ctx, pin.ID)
if err != nil {
return err
}
if err := cm.DB.Delete(&Content{ID: pin.ID}).Error; err != nil {
return err
}
if err := cm.DB.Where("content = ?", pin.ID).Delete(&ObjRef{}).Error; err != nil {
return err
}
if err := cm.clearUnreferencedObjects(ctx, objs); err != nil {
return err
}
for _, o := range objs {
// TODO: this is safe, but... slow?
if _, err := cm.deleteIfNotPinned(ctx, o); err != nil {
return err
}
}
return nil
}
func (cm *ContentManager) deleteIfNotPinned(ctx context.Context, o *Object) (bool, error) {
ctx, span := cm.tracer.Start(ctx, "deleteIfNotPinned")
defer span.End()
cm.contentLk.Lock()
defer cm.contentLk.Unlock()
return cm.deleteIfNotPinnedLock(ctx, o)
}
func (cm *ContentManager) deleteIfNotPinnedLock(ctx context.Context, o *Object) (bool, error) {
ctx, span := cm.tracer.Start(ctx, "deleteIfNotPinnedLock")
defer span.End()
var objs []Object
if err := cm.DB.Limit(1).Model(Object{}).Where("id = ? OR cid = ?", o.ID, o.Cid).Find(&objs).Error; err != nil {
return false, err
}
if len(objs) == 0 {
return true, cm.Node.Blockstore.DeleteBlock(ctx, o.Cid.CID)
}
return false, nil
}
func (cm *ContentManager) clearUnreferencedObjects(ctx context.Context, objs []*Object) error {
var ids []uint
for _, o := range objs {
ids = append(ids, o.ID)
}
cm.contentLk.Lock()
defer cm.contentLk.Unlock()
if err := cm.DB.Where("(?) = 0 and id in ?",
cm.DB.Model(ObjRef{}).Where("object = objects.id").Select("count(1)"), ids).
Delete(Object{}).Error; err != nil {
return err
}
return nil
}
func (cm *ContentManager) objectsForPin(ctx context.Context, cont uint) ([]*Object, error) {
var objects []*Object
if err := cm.DB.Model(ObjRef{}).Where("content = ?", cont).
Joins("left join objects on obj_refs.object = objects.id").
Scan(&objects).Error; err != nil {
return nil, err
}
return objects, nil
}