Skip to content

Commit

Permalink
worker: add testWorker
Browse files Browse the repository at this point in the history
  • Loading branch information
peterjan committed Feb 26, 2024
1 parent 1192264 commit 2b626d9
Show file tree
Hide file tree
Showing 14 changed files with 719 additions and 490 deletions.
2 changes: 1 addition & 1 deletion internal/node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func NewBus(cfg BusConfig, dir string, seed types.PrivateKey, l *zap.Logger) (ht

func NewWorker(cfg config.Worker, b worker.Bus, seed types.PrivateKey, l *zap.Logger) (http.Handler, ShutdownFn, error) {
workerKey := blake2b.Sum256(append([]byte("worker"), seed...))
w, err := worker.New(workerKey, cfg.ID, b, cfg.ContractLockTimeout, cfg.BusFlushInterval, cfg.DownloadOverdriveTimeout, cfg.UploadOverdriveTimeout, cfg.DownloadMaxOverdrive, cfg.DownloadMaxMemory, cfg.UploadMaxMemory, cfg.UploadMaxOverdrive, cfg.AllowPrivateIPs, l)
w, err := worker.New(workerKey, cfg.ID, b, cfg.ContractLockTimeout, cfg.BusFlushInterval, cfg.DownloadOverdriveTimeout, cfg.UploadOverdriveTimeout, cfg.DownloadMaxOverdrive, cfg.UploadMaxOverdrive, cfg.DownloadMaxMemory, cfg.UploadMaxMemory, cfg.AllowPrivateIPs, l)
if err != nil {
return nil, nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion worker/download.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (w *worker) initDownloadManager(maxMemory, maxOverdrive uint64, overdriveTi
panic("download manager already initialized") // developer error
}

mm := newMemoryManager(logger, maxMemory)
mm := newMemoryManager(logger.Named("memorymanager"), maxMemory)
w.downloadManager = newDownloadManager(w.shutdownCtx, w, mm, w.bus, maxOverdrive, overdriveTimeout, logger)
}

Expand Down
12 changes: 8 additions & 4 deletions worker/downloader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,15 @@ import (
)

func TestDownloaderStopped(t *testing.T) {
w := newMockWorker()
h := w.addHost()
w.dl.refreshDownloaders(w.contracts())
w := newTestWorker(t)
hosts := w.addHosts(1)

dl := w.dl.downloaders[h.PublicKey()]
// convenience variables
dm := w.downloadManager
h := hosts[0]

dm.refreshDownloaders(w.contracts())
dl := w.downloadManager.downloaders[h.PublicKey()]
dl.Stop()

req := sectorDownloadReq{
Expand Down
2 changes: 1 addition & 1 deletion worker/gouging.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func GougingCheckerFromContext(ctx context.Context, criticalMigration bool) (Gou
return gc(criticalMigration)
}

func WithGougingChecker(ctx context.Context, cs consensusState, gp api.GougingParams) context.Context {
func WithGougingChecker(ctx context.Context, cs ConsensusState, gp api.GougingParams) context.Context {
return context.WithValue(ctx, keyGougingChecker, func(criticalMigration bool) (GougingChecker, error) {
consensusState, err := cs.ConsensusState(ctx)
if err != nil {
Expand Down
4 changes: 0 additions & 4 deletions worker/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,6 @@ type (
HostManager interface {
Host(hk types.PublicKey, fcid types.FileContractID, siamuxAddr string) Host
}

HostStore interface {
Host(ctx context.Context, hostKey types.PublicKey) (hostdb.HostInfo, error)
}
)

type (
Expand Down
118 changes: 115 additions & 3 deletions worker/host_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,130 @@ import (
"bytes"
"context"
"errors"
"io"
"sync"
"testing"
"time"

rhpv2 "go.sia.tech/core/rhp/v2"
rhpv3 "go.sia.tech/core/rhp/v3"
"go.sia.tech/core/types"
"go.sia.tech/renterd/api"
"go.sia.tech/renterd/hostdb"
"lukechampine.com/frand"
)

type (
testHost struct {
*hostMock
*contractMock
hptFn func() hostdb.HostPriceTable
}

testHostManager struct {
t *testing.T

mu sync.Mutex
hosts map[types.PublicKey]*testHost
}
)

func newTestHostManager(t *testing.T) *testHostManager {
return &testHostManager{t: t, hosts: make(map[types.PublicKey]*testHost)}
}

func (hm *testHostManager) Host(hk types.PublicKey, fcid types.FileContractID, siamuxAddr string) Host {
hm.mu.Lock()
defer hm.mu.Unlock()

if _, ok := hm.hosts[hk]; !ok {
hm.t.Fatal("host not found")
}
return hm.hosts[hk]
}

func (hm *testHostManager) addHost(hosts ...*testHost) {
hm.mu.Lock()
defer hm.mu.Unlock()
for _, h := range hosts {
hm.hosts[h.hk] = h
}
}

func newTestHost(h *hostMock, c *contractMock) *testHost {
return newTestHostCustom(h, c, func() hostdb.HostPriceTable { return newTestHostPriceTable(time.Now().Add(time.Minute)) })
}

func newTestHostCustom(h *hostMock, c *contractMock, hptFn func() hostdb.HostPriceTable) *testHost {
return &testHost{
hostMock: h,
contractMock: c,
hptFn: hptFn,
}
}

func newTestHostPriceTable(expiry time.Time) hostdb.HostPriceTable {
var uid rhpv3.SettingsID
frand.Read(uid[:])

return hostdb.HostPriceTable{
HostPriceTable: rhpv3.HostPriceTable{UID: uid, Validity: time.Minute},
Expiry: expiry,
}
}

func (h *testHost) PublicKey() types.PublicKey {
return h.hk
}

func (h *testHost) DownloadSector(ctx context.Context, w io.Writer, root types.Hash256, offset, length uint32, overpay bool) error {
sector, exist := h.Sector(root)
if !exist {
return errSectorNotFound
}
if offset+length > rhpv2.SectorSize {
return errSectorOutOfBounds
}
_, err := w.Write(sector[offset : offset+length])
return err
}

func (h *testHost) UploadSector(ctx context.Context, sector *[rhpv2.SectorSize]byte, rev types.FileContractRevision) (types.Hash256, error) {
return h.AddSector(sector), nil
}

func (h *testHost) FetchRevision(ctx context.Context, fetchTimeout time.Duration) (rev types.FileContractRevision, _ error) {
h.mu.Lock()
defer h.mu.Unlock()
rev = h.rev
return rev, nil
}

func (h *testHost) FetchPriceTable(ctx context.Context, rev *types.FileContractRevision) (hostdb.HostPriceTable, error) {
return h.hptFn(), nil
}

func (h *testHost) FundAccount(ctx context.Context, balance types.Currency, rev *types.FileContractRevision) error {
return nil
}

func (h *testHost) RenewContract(ctx context.Context, rrr api.RHPRenewRequest) (_ rhpv2.ContractRevision, _ []types.Transaction, _ types.Currency, err error) {
return rhpv2.ContractRevision{}, nil, types.ZeroCurrency, nil
}

func (h *testHost) SyncAccount(ctx context.Context, rev *types.FileContractRevision) error {
return nil
}

func TestHost(t *testing.T) {
h := newMockHost(types.PublicKey{1})
h.c = newMockContract(h.hk, types.FileContractID{1})
sector, root := newMockSector()
// create test host
h := newTestHost(
newHostMock(types.PublicKey{1}),
newContractMock(types.PublicKey{1}, types.FileContractID{1}),
)

// upload the sector
sector, root := newTestSector()
uploaded, err := h.UploadSector(context.Background(), sector, types.FileContractRevision{})
if err != nil {
t.Fatal(err)
Expand Down
Loading

0 comments on commit 2b626d9

Please sign in to comment.