Skip to content
This repository has been archived by the owner on Jun 20, 2023. It is now read-only.

Bulk Provide/Reproviding System #34

Merged
merged 29 commits into from
May 14, 2021
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
c707e99
fix: in queue sync datastore to disk
aschmahmann Apr 9, 2021
c193919
feat: add a batching provider/reprovider system
aschmahmann Apr 9, 2021
829bc09
simplify batch provider to remove the managed provider list, remove t…
aschmahmann Apr 22, 2021
0fdf1d6
batch provider waits for provider system to be ready before attemptin…
aschmahmann Apr 22, 2021
93ff866
fix: handle periods with no providing, and handle the case of a missi…
aschmahmann Apr 23, 2021
fb01b26
moved import
aschmahmann May 12, 2021
bf3cf56
batched: moved context creation to prevent leakage
aschmahmann May 12, 2021
0023040
cleanup code a bit and add more logging
aschmahmann May 12, 2021
8bbdb3b
remove unnecessary goroutine
aschmahmann May 12, 2021
9e61fa1
rename getTime to parseTime
aschmahmann May 12, 2021
c3fd90b
removed newly added syncs from the provider queue and updated its doc…
aschmahmann May 12, 2021
c52c0b0
batched: wait for goroutines to close before returning from Close
aschmahmann May 12, 2021
34b4e40
batched: reuse timers in the Run function
aschmahmann May 12, 2021
4407f46
renamed variable
aschmahmann May 12, 2021
f498246
some go timer fixes
aschmahmann May 12, 2021
cffff34
have a max collection threshold even when only processing regular pro…
aschmahmann May 12, 2021
6d08b5d
max collection timer only starts once we have our first provide
aschmahmann May 12, 2021
3a94d3a
remove cid from provide queue if its invalid
aschmahmann May 12, 2021
866d979
switch reproviding channel to be unbuffered
aschmahmann May 12, 2021
f84c7d9
batched: do not panic if no key provider is set, instead default to r…
aschmahmann May 12, 2021
8c07658
batched: renamed dynamicCh and dynamicCidLoop to reprovideCh and repr…
aschmahmann May 12, 2021
71c6564
batched: add test
aschmahmann May 12, 2021
b017f26
fix max collection check
aschmahmann May 12, 2021
d366b61
fixed waitgroup usage bug
aschmahmann May 12, 2021
843e75c
batched: internally configurable initial reprovide
aschmahmann May 13, 2021
04452c4
batched: add basic test
aschmahmann May 13, 2021
edb6021
batched: rework timer usage
aschmahmann May 13, 2021
1c19caa
use closure
aschmahmann May 13, 2021
c86f3de
fix debug log usage
aschmahmann May 14, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
332 changes: 332 additions & 0 deletions batched/system.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,332 @@
package batched

import (
"context"
"errors"
"fmt"
"strconv"
"time"

"github.com/ipfs/go-cid"
"github.com/ipfs/go-datastore"
provider "github.com/ipfs/go-ipfs-provider"
"github.com/ipfs/go-ipfs-provider/queue"
"github.com/ipfs/go-ipfs-provider/simple"
logging "github.com/ipfs/go-log"
"github.com/ipfs/go-verifcid"
"github.com/multiformats/go-multihash"
)

var log = logging.Logger("provider.batched")

type BatchProvidingSystem struct {
ctx context.Context
close context.CancelFunc

reprovideInterval time.Duration
rsys provideMany
keyProvider simple.KeyChanFunc

q *queue.Queue
ds datastore.Batching

dynamicCh chan cid.Cid

totalProvides, lastReprovideBatchSize int
avgProvideDuration, lastReprovideDuration time.Duration
}

var _ provider.System = (*BatchProvidingSystem)(nil)

type provideMany interface {
ProvideMany(ctx context.Context, keys []multihash.Multihash) error
Ready() bool
}

// Option defines the functional option type that can be used to configure
// BatchProvidingSystem instances
type Option func(system *BatchProvidingSystem) error

var lastReprovideKey = datastore.NewKey("/provider/reprovide/lastreprovide")

func New(provider provideMany, q *queue.Queue, opts ...Option) (*BatchProvidingSystem, error) {
s := &BatchProvidingSystem{
reprovideInterval: time.Hour * 24,
rsys: provider,
keyProvider: nil,
q: q,
ds: datastore.NewMapDatastore(),
dynamicCh: make(chan cid.Cid, 1),
}

for _, o := range opts {
if err := o(s); err != nil {
return nil, err
}
}

// This is after the options processing so we do not have to worry about leaking a context if there is an
// initialization error processing the options
ctx, cancel := context.WithCancel(context.Background())
s.ctx = ctx
s.close = cancel

return s, nil
}

func Datastore(batching datastore.Batching) Option {
return func(system *BatchProvidingSystem) error {
system.ds = batching
return nil
}
}

func ReproviderInterval(duration time.Duration) Option {
return func(system *BatchProvidingSystem) error {
system.reprovideInterval = duration
return nil
}
}

func KeyProvider(fn simple.KeyChanFunc) Option {
return func(system *BatchProvidingSystem) error {
system.keyProvider = fn
return nil
}
}

func (s *BatchProvidingSystem) Run() {
const pauseDetectionThreshold = time.Millisecond * 500

provCh := s.q.Dequeue()

go func() {
m := make(map[cid.Cid]struct{})
for {
pauseDetectTimer := time.NewTimer(time.Hour)
maxDurationCollectionTimer := time.NewTimer(time.Minute * 10)
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved

performedReprovide := false
loop:
for {
select {
case c := <-provCh:
m[c] = struct{}{}
pauseDetectTimer.Reset(pauseDetectionThreshold)
continue
default:
}

select {
case c := <-provCh:
m[c] = struct{}{}
pauseDetectTimer.Reset(pauseDetectionThreshold)
case c := <-s.dynamicCh:
m[c] = struct{}{}
pauseDetectTimer.Reset(pauseDetectionThreshold)
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
performedReprovide = true
case <-pauseDetectTimer.C:
break loop
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
case <-maxDurationCollectionTimer.C:
break loop
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
case <-s.ctx.Done():
return
}
}

if len(m) == 0 {
continue
}

keys := make([]multihash.Multihash, 0, len(m))
for c := range m {
// hash security
if err := verifcid.ValidateCid(c); err != nil {
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
log.Errorf("insecure hash in reprovider, %s (%s)", c, err)
continue
}

keys = append(keys, c.Hash())
delete(m, c)
}

for !s.rsys.Ready() {
Stebalien marked this conversation as resolved.
Show resolved Hide resolved
log.Debugf("reprovider system not ready")
select {
case <-time.After(time.Minute):
case <-s.ctx.Done():
return
}
}

log.Debugf("starting provide of %d keys", len(keys))
start := time.Now()
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
err := s.rsys.ProvideMany(s.ctx, keys)
if err != nil {
log.Debugf("providing failed %v", err)
continue
}
dur := time.Since(start)

totalProvideTime := int64(s.totalProvides) * int64(s.avgProvideDuration)
recentAvgProvideDuration := time.Duration(int64(dur) / int64(len(keys)))
s.avgProvideDuration = time.Duration((totalProvideTime + int64(dur)) / int64(s.totalProvides+len(keys)))
Stebalien marked this conversation as resolved.
Show resolved Hide resolved
s.totalProvides += len(keys)

aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
log.Debugf("finished providing of %d keys. It took %v with an average of %v per provide", len(keys), dur, recentAvgProvideDuration)

if performedReprovide {
s.lastReprovideBatchSize = len(keys)
s.lastReprovideDuration = dur

if err := s.ds.Put(lastReprovideKey, storeTime(time.Now())); err != nil {
log.Errorf("could not store last reprovide time: %v", err)
}
if err := s.ds.Sync(lastReprovideKey); err != nil {
Stebalien marked this conversation as resolved.
Show resolved Hide resolved
log.Errorf("could not perform sync of last reprovide time: %v", err)
}
}
}
}()

go func() {
var initialReprovideCh, reprovideCh <-chan time.Time

// If reproviding is enabled (non-zero)
if s.reprovideInterval > 0 {
reprovideTicker := time.NewTicker(s.reprovideInterval)
defer reprovideTicker.Stop()
reprovideCh = reprovideTicker.C

// If the reprovide ticker is larger than a minute (likely),
// provide once after we've been up a minute.
//
// Don't provide _immediately_ as we might be just about to stop.
if s.reprovideInterval > time.Minute {
initialReprovideTimer := time.NewTimer(time.Minute)
defer initialReprovideTimer.Stop()

initialReprovideCh = initialReprovideTimer.C
}
}

for s.ctx.Err() == nil {
select {
case <-initialReprovideCh:
case <-reprovideCh:
case <-s.ctx.Done():
return
}

err := s.reprovide(s.ctx, false)

// only log if we've hit an actual error, otherwise just tell the client we're shutting down
if s.ctx.Err() == nil && err != nil {
log.Errorf("failed to reprovide: %s", err)
}
}
}()
}

func storeTime(t time.Time) []byte {
val := []byte(fmt.Sprintf("%d", t.UnixNano()))
return val
}

func parseTime(b []byte) (time.Time, error) {
tns, err := strconv.ParseInt(string(b), 10, 64)
if err != nil {
return time.Time{}, err
}
return time.Unix(0, tns), nil
}

func (s *BatchProvidingSystem) Close() error {
s.close()
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
return s.q.Close()
}

func (s *BatchProvidingSystem) Provide(cid cid.Cid) error {
return s.q.Enqueue(cid)
}

func (s *BatchProvidingSystem) Reprovide(ctx context.Context) error {
return s.reprovide(ctx, true)
}

func (s *BatchProvidingSystem) reprovide(ctx context.Context, force bool) error {
if !s.shouldReprovide() && !force {
return nil
}

kch, err := s.keyProvider(ctx)
if err != nil {
return err
}

dynamicCidLoop:
for {
select {
case c, ok := <-kch:
if !ok {
break dynamicCidLoop
}

select {
case s.dynamicCh <- c:
case <-ctx.Done():
return ctx.Err()
}
case <-ctx.Done():
return ctx.Err()
}
}

return nil
}

func (s *BatchProvidingSystem) getLastReprovideTime() (time.Time, error) {
val, err := s.ds.Get(lastReprovideKey)
if errors.Is(err, datastore.ErrNotFound) {
return time.Time{}, nil
}
if err != nil {
return time.Time{}, fmt.Errorf("could not get last reprovide time")
}

t, err := parseTime(val)
if err != nil {
return time.Time{}, fmt.Errorf("could not decode last reprovide time, got %q", string(val))
}

return t, nil
}

func (s *BatchProvidingSystem) shouldReprovide() bool {
t, err := s.getLastReprovideTime()
if err != nil {
log.Debugf(err.Error())
aschmahmann marked this conversation as resolved.
Show resolved Hide resolved
return false
}

if time.Since(t) < time.Duration(float64(s.reprovideInterval)*0.5) {
return false
}
return true
}

type BatchedProviderStats struct {
TotalProvides, LastReprovideBatchSize int
AvgProvideDuration, LastReprovideDuration time.Duration
}

// Stat returns various stats about this provider system
func (s *BatchProvidingSystem) Stat(ctx context.Context) (BatchedProviderStats, error) {
// TODO: Does it matter that there is no locking around the total+average values?
return BatchedProviderStats{
TotalProvides: s.totalProvides,
LastReprovideBatchSize: s.lastReprovideBatchSize,
AvgProvideDuration: s.avgProvideDuration,
LastReprovideDuration: s.lastReprovideDuration,
}, nil
}
7 changes: 4 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@ retract [v1.0.0, v1.0.1]
require (
github.com/cenkalti/backoff v2.2.1+incompatible
github.com/ipfs/go-blockservice v0.1.2
github.com/ipfs/go-cid v0.0.3
github.com/ipfs/go-cid v0.0.7
github.com/ipfs/go-cidutil v0.0.2
github.com/ipfs/go-datastore v0.1.0
github.com/ipfs/go-datastore v0.4.5
github.com/ipfs/go-ipfs-blockstore v0.1.0
github.com/ipfs/go-ipfs-blocksutil v0.0.1
github.com/ipfs/go-ipfs-ds-help v0.0.1
github.com/ipfs/go-ipfs-exchange-offline v0.0.1
github.com/ipfs/go-ipfs-routing v0.1.0
github.com/ipfs/go-ipld-cbor v0.0.3
Expand All @@ -21,5 +22,5 @@ require (
github.com/ipfs/go-verifcid v0.0.1
github.com/libp2p/go-libp2p-core v0.2.2
github.com/libp2p/go-libp2p-testing v0.1.0
github.com/multiformats/go-multihash v0.0.8
github.com/multiformats/go-multihash v0.0.13
)
Loading