Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

exp/lighthorizon/index: Parse network passphrase from the env. #4491

Merged
merged 4 commits into from
Jul 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions exp/lighthorizon/index/backend/parallel_flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func parallelFlush(parallel uint32, allIndexes map[string]types.NamedIndices, f
written := uint64(0)
for i := uint32(0); i < parallel; i++ {
wg.Add(1)
go func() {
go func(workerNum uint32) {
defer wg.Done()
for batch := range batches {
if err := f(batch); err != nil {
Expand All @@ -53,17 +53,18 @@ func parallelFlush(parallel uint32, allIndexes map[string]types.NamedIndices, f
}

nwritten := atomic.AddUint64(&written, 1)
if nwritten%1000 == 0 {
log.Infof("Writing indexes... %d/%d %.2f%%", nwritten,
len(allIndexes),
(float64(nwritten)/float64(len(allIndexes)))*100)
if nwritten%1234 == 0 {
log.WithField("worker", workerNum).
Infof("Writing indices... %d/%d (%.2f%%)",
nwritten, len(allIndexes),
(float64(nwritten)/float64(len(allIndexes)))*100)
}

if nwritten == uint64(len(allIndexes)) {
close(batches)
}
}
}()
}(i)
}

wg.Wait()
Expand Down
24 changes: 13 additions & 11 deletions exp/lighthorizon/index/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ func BuildIndices(

// Submit the work to the channels, breaking up the range into individual
// checkpoint ranges.
checkpoints := historyarchive.NewCheckpointManager(0)
go func() {
checkpoints := historyarchive.NewCheckpointManager(0)
for ledger := range ledgerRange.GenerateCheckpoints(checkpoints) {
chunk := checkpoints.GetCheckpointRange(ledger)
chunk.High = min(chunk.High, ledgerRange.High) // don't exceed upper bound
Expand All @@ -127,13 +127,15 @@ func BuildIndices(
}

nprocessed := atomic.AddUint64(&processed, uint64(count))
if nprocessed%97 == 0 {
printProgress("Reading ledgers", nprocessed, uint64(ledgerCount), startTime)
if nprocessed%1234 == 0 {
PrintProgress("Reading ledgers", nprocessed, uint64(ledgerCount), startTime)
}

// Upload indices once per checkpoint to save memory
if err := indexStore.Flush(); err != nil {
return errors.Wrap(err, "flushing indices failed")
// Upload indices once every 10 checkpoints to save memory
if nprocessed%(10*uint64(checkpoints.GetCheckpointFrequency())) == 0 {
if err := indexStore.Flush(); err != nil {
return errors.Wrap(err, "flushing indices failed")
}
}
}
return nil
Expand All @@ -144,7 +146,7 @@ func BuildIndices(
return indexBuilder, errors.Wrap(err, "one or more workers failed")
}

printProgress("Reading ledgers", processed, uint64(ledgerCount), startTime)
PrintProgress("Reading ledgers", processed, uint64(ledgerCount), startTime)

L.Infof("Processed %d ledgers via %d workers", processed, parallel)
L.Infof("Uploading indices to %s", targetUrl)
Expand Down Expand Up @@ -248,8 +250,8 @@ func (builder *IndexBuilder) Build(ctx context.Context, ledgerRange historyarchi
}

builder.lastBuiltLedgerWriteLock.Lock()
defer builder.lastBuiltLedgerWriteLock.Unlock()
builder.lastBuiltLedger = max(builder.lastBuiltLedger, ledgerRange.High)
builder.lastBuiltLedgerWriteLock.Unlock()

return nil
}
Expand Down Expand Up @@ -320,13 +322,13 @@ func (builder *IndexBuilder) Watch(ctx context.Context) error {
}
}

func printProgress(prefix string, done, total uint64, startTime time.Time) {
func PrintProgress(prefix string, done, total uint64, startTime time.Time) {
progress := float64(done) / float64(total)
elapsed := time.Since(startTime)

// Approximate based on how many ledgers are left and how long this much
// Approximate based on how many stuff is left to do and how long this much
// progress took, e.g. if 4/10 took 2s then 6/10 will "take" 3s (though this
// assumes consistent ledger load).
// assumes consistent load).
remaining := (float64(elapsed) / float64(done)) * float64(total-done)

var remainingStr string
Expand Down
41 changes: 30 additions & 11 deletions exp/lighthorizon/index/cmd/batch/map/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,19 @@ import (

type BatchConfig struct {
historyarchive.Range
TxMetaSourceUrl, IndexTargetUrl string
TxMetaSourceUrl string
IndexTargetUrl string
NetworkPassphrase string
}

const (
batchSizeEnv = "BATCH_SIZE"
jobIndexEnv = "AWS_BATCH_JOB_ARRAY_INDEX"
firstCheckpointEnv = "FIRST_CHECKPOINT"
txmetaSourceUrlEnv = "TXMETA_SOURCE"
indexTargetUrlEnv = "INDEX_TARGET"
workerCountEnv = "WORKER_COUNT"
batchSizeEnv = "BATCH_SIZE"
jobIndexEnv = "AWS_BATCH_JOB_ARRAY_INDEX"
firstCheckpointEnv = "FIRST_CHECKPOINT"
txmetaSourceUrlEnv = "TXMETA_SOURCE"
indexTargetUrlEnv = "INDEX_TARGET"
workerCountEnv = "WORKER_COUNT"
networkPassphraseEnv = "NETWORK_PASSPHRASE"
)

func NewBatchConfig() (*BatchConfig, error) {
Expand Down Expand Up @@ -65,12 +68,28 @@ func NewBatchConfig() (*BatchConfig, error) {
return nil, errors.New("required parameter " + txmetaSourceUrlEnv)
}

networkPassphrase := os.Getenv(networkPassphraseEnv)
switch networkPassphrase {
case "":
log.Warnf("%s not specified, defaulting to 'testnet'", networkPassphraseEnv)
fallthrough
case "testnet":
networkPassphrase = network.TestNetworkPassphrase
case "pubnet":
networkPassphrase = network.PublicNetworkPassphrase
default:
log.Warnf("%s is not a recognized shortcut ('pubnet' or 'testnet')",
networkPassphraseEnv)
}
log.Infof("Using network passphrase '%s'", networkPassphrase)

firstLedger := uint32(firstCheckpoint + batchSize*jobIndex)
lastLedger := firstLedger + uint32(batchSize) - 1
return &BatchConfig{
Range: historyarchive.Range{Low: firstLedger, High: lastLedger},
TxMetaSourceUrl: txmetaSourceUrl,
IndexTargetUrl: fmt.Sprintf("%s%cjob_%d", indexTargetRootUrl, os.PathSeparator, jobIndex),
Range: historyarchive.Range{Low: firstLedger, High: lastLedger},
TxMetaSourceUrl: txmetaSourceUrl,
IndexTargetUrl: fmt.Sprintf("%s%cjob_%d", indexTargetRootUrl, os.PathSeparator, jobIndex),
NetworkPassphrase: networkPassphrase,
}, nil
}

Expand Down Expand Up @@ -103,7 +122,7 @@ func main() {
context.Background(),
batch.TxMetaSourceUrl,
batch.IndexTargetUrl,
network.TestNetworkPassphrase,
batch.NetworkPassphrase,
batch.Range,
[]string{
"accounts_unbacked",
Expand Down
1 change: 1 addition & 0 deletions exp/lighthorizon/index/cmd/map.sh
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ for (( i=0; i < $BATCH_COUNT; i++ ))
do
echo -n "Creating map job $i... "

NETWORK_PASSPHRASE='testnet' \
AWS_BATCH_JOB_ARRAY_INDEX=$i BATCH_SIZE=$BATCH_SIZE FIRST_CHECKPOINT=$FIRST \
TXMETA_SOURCE=file://$1 INDEX_TARGET=file://$2 WORKER_COUNT=1 \
./map &
Expand Down
4 changes: 3 additions & 1 deletion exp/lighthorizon/index/cmd/mapreduce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/stellar/go/exp/lighthorizon/index"
"github.com/stellar/go/historyarchive"
"github.com/stellar/go/network"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -106,7 +107,8 @@ func RunMapTest(t *testing.T) (uint32, uint32, string) {
mapTestCmd.Env = append(os.Environ(),
fmt.Sprintf("BATCH_SIZE=%d", batchSize),
fmt.Sprintf("FIRST_LEDGER=%d", startLedger),
fmt.Sprintf("LAST_LEDGER=%d", endLedger))
fmt.Sprintf("LAST_LEDGER=%d", endLedger),
fmt.Sprintf("NETWORK_PASSPHRASE='%s'", network.TestNetworkPassphrase))
t.Logf("Running %d map jobs: %s", batchCount, mapTestCmd.String())
stdout, err := mapTestCmd.CombinedOutput()

Expand Down