diff --git a/exp/lighthorizon/index/builder.go b/exp/lighthorizon/index/builder.go
index cacdb17580..0fd554308b 100644
--- a/exp/lighthorizon/index/builder.go
+++ b/exp/lighthorizon/index/builder.go
@@ -4,27 +4,166 @@ import (
 	"context"
 	"fmt"
 	"io"
+	"math"
+	"sync/atomic"
+	"time"
 
 	"github.com/stellar/go/historyarchive"
 	"github.com/stellar/go/ingest"
 	"github.com/stellar/go/ingest/ledgerbackend"
+	"github.com/stellar/go/support/errors"
 	"github.com/stellar/go/support/log"
 	"github.com/stellar/go/toid"
 	"github.com/stellar/go/xdr"
+	"golang.org/x/sync/errgroup"
 )
 
-// Module is a way to process data and store it into an index.
+func BuildIndices(
+	ctx context.Context,
+	sourceUrl string, // where is raw txmeta coming from?
+	targetUrl string, // where should the resulting indices go?
+	networkPassphrase string,
+	startLedger, endLedger uint32,
+	modules []string,
+	workerCount int,
+) error {
+	indexStore, err := Connect(targetUrl)
+	if err != nil {
+		return err
+	}
+
+	// Simple file os access
+	source, err := historyarchive.ConnectBackend(
+		sourceUrl,
+		historyarchive.ConnectOptions{
+			Context:           ctx,
+			NetworkPassphrase: networkPassphrase,
+		},
+	)
+	if err != nil {
+		return err
+	}
+
+	ledgerBackend := ledgerbackend.NewHistoryArchiveBackend(source)
+	defer ledgerBackend.Close()
+
+	if endLedger == 0 {
+		latest, err := ledgerBackend.GetLatestLedgerSequence(ctx)
+		if err != nil {
+			return err
+		}
+		endLedger = latest
+	}
+
+	ledgerCount := 1 + (endLedger - startLedger) // +1 because endLedger is inclusive
+	parallel := max(1, workerCount)
+
+	startTime := time.Now()
+	log.Infof("Creating indices for ledger range: %d through %d (%d ledgers)",
+		startLedger, endLedger, ledgerCount)
+	log.Infof("Using %d workers", parallel)
+
+	// Create a bunch of workers that process ledgers a checkpoint range at a
+	// time (better than a ledger at a time to minimize flushes).
+	wg, ctx := errgroup.WithContext(ctx)
+	ch := make(chan historyarchive.Range, parallel)
+
+	indexBuilder := NewIndexBuilder(indexStore, ledgerBackend, networkPassphrase)
+	for _, part := range modules {
+		switch part {
+		case "transactions":
+			indexBuilder.RegisterModule(ProcessTransaction)
+		case "accounts":
+			indexBuilder.RegisterModule(ProcessAccounts)
+		case "accounts_unbacked":
+			indexBuilder.RegisterModule(ProcessAccountsWithoutBackend)
+		default:
+			return fmt.Errorf("Unknown module: %s", part)
+		}
+	}
+
+	// Submit the work to the channels, breaking up the range into individual
+	// checkpoint ranges.
+	go func() {
+		// Recall: A ledger X is a checkpoint ledger iff (X + 1) % 64 == 0
+		nextCheckpoint := (((startLedger / 64) * 64) + 63)
+
+		ledger := startLedger
+		nextLedger := min(endLedger, ledger+(nextCheckpoint-startLedger))
+		for ledger <= endLedger {
+			chunk := historyarchive.Range{Low: ledger, High: nextLedger}
+			log.Debugf("Submitted [%d, %d] for work", chunk.Low, chunk.High)
+			ch <- chunk
+
+			ledger = nextLedger + 1
+			nextLedger = min(endLedger, ledger+63) // don't exceed upper bound
+		}
+
+		close(ch)
+	}()
+
+	processed := uint64(0)
+	for i := 0; i < parallel; i++ {
+		wg.Go(func() error {
+			for ledgerRange := range ch {
+				count := (ledgerRange.High - ledgerRange.Low) + 1
+				nprocessed := atomic.AddUint64(&processed, uint64(count))
+
+				log.Debugf("Working on checkpoint range [%d, %d]",
+					ledgerRange.Low, ledgerRange.High)
+
+				// Assertion for testing
+				if ledgerRange.High != endLedger && (ledgerRange.High+1)%64 != 0 {
+					log.Fatalf("Upper ledger isn't a checkpoint: %v", ledgerRange)
+				}
+
+				err = indexBuilder.Build(ctx, ledgerRange)
+				if err != nil {
+					return err
+				}
+
+				printProgress("Reading ledgers", nprocessed, uint64(ledgerCount), startTime)
+
+				// Upload indices once per checkpoint to save memory
+				if err := indexStore.Flush(); err != nil {
+					return errors.Wrap(err, "flushing indices failed")
+				}
+			}
+			return nil
+		})
+	}
+
+	if err := wg.Wait(); err != nil {
+		return errors.Wrap(err, "one or more workers failed")
+	}
+
+	printProgress("Reading ledgers", uint64(ledgerCount), uint64(ledgerCount), startTime)
+
+	// Assertion for testing
+	if processed != uint64(ledgerCount) {
+		log.Fatalf("processed %d but expected %d", processed, ledgerCount)
+	}
+
+	log.Infof("Processed %d ledgers via %d workers", processed, parallel)
+	log.Infof("Uploading indices to %s", targetUrl)
+	if err := indexStore.Flush(); err != nil {
+		return errors.Wrap(err, "flushing indices failed")
+	}
+
+	return nil
+}
+
+// Module is a way to process ingested data and shove it into an index store.
 type Module func(
-	idx Store,
+	indexStore Store,
 	ledger xdr.LedgerCloseMeta,
-	checkpoint uint32,
 	transaction ingest.LedgerTransaction,
 ) error
 
 // IndexBuilder contains everything needed to build indices from ledger ranges.
 type IndexBuilder struct {
 	store             Store
-	history           ledgerbackend.HistoryArchiveBackend
+	history           *ledgerbackend.HistoryArchiveBackend
 	networkPassphrase string
 
 	modules []Module
@@ -32,7 +171,7 @@ type IndexBuilder struct {
 
 func NewIndexBuilder(
 	indexStore Store,
-	backend ledgerbackend.HistoryArchiveBackend,
+	backend *ledgerbackend.HistoryArchiveBackend,
 	networkPassphrase string,
 ) *IndexBuilder {
 	return &IndexBuilder{
@@ -51,11 +190,10 @@ func (builder *IndexBuilder) RegisterModule(module Module) {
 // RunModules executes all of the registered modules on the given ledger.
 func (builder *IndexBuilder) RunModules(
 	ledger xdr.LedgerCloseMeta,
-	checkpoint uint32,
 	tx ingest.LedgerTransaction,
 ) error {
 	for _, module := range builder.modules {
-		if err := module(builder.store, ledger, checkpoint, tx); err != nil {
+		if err := module(builder.store, ledger, tx); err != nil {
 			return err
 		}
 	}
@@ -63,6 +201,12 @@ func (builder *IndexBuilder) RunModules(
 	return nil
 }
 
+// Build sequentially creates indices for each ledger in the given range based
+// on the registered modules.
+//
+// TODO: We can probably optimize this by doing GetLedger in parallel with the
+// ingestion & index building, since the network will be idle during the latter
+// portion.
 func (builder *IndexBuilder) Build(ctx context.Context, ledgerRange historyarchive.Range) error {
 	for ledgerSeq := ledgerRange.Low; ledgerSeq <= ledgerRange.High; ledgerSeq++ {
 		ledger, err := builder.history.GetLedger(ctx, ledgerSeq)
@@ -71,8 +215,6 @@ func (builder *IndexBuilder) Build(ctx context.Context, ledgerRange historyarchi
 			return err
 		}
 
-		checkpoint := (ledgerSeq / 64) + 1
-
 		reader, err := ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(
 			builder.networkPassphrase, ledger)
 		if err != nil {
@@ -87,7 +229,7 @@ func (builder *IndexBuilder) Build(ctx context.Context, ledgerRange historyarchi
 				return err
 			}
 
-			if err := builder.RunModules(ledger, checkpoint, tx); err != nil {
+			if err := builder.RunModules(ledger, tx); err != nil {
 				return err
 			}
 		}
@@ -99,7 +241,6 @@ func (builder *IndexBuilder) Build(ctx context.Context, ledgerRange historyarchi
 func ProcessTransaction(
 	indexStore Store,
 	ledger xdr.LedgerCloseMeta,
-	_ uint32,
 	tx ingest.LedgerTransaction,
 ) error {
 	return indexStore.AddTransactionToIndexes(
@@ -110,10 +251,10 @@ func ProcessTransaction(
 
 func ProcessAccounts(
 	indexStore Store,
-	_ xdr.LedgerCloseMeta,
-	checkpoint uint32,
+	ledger xdr.LedgerCloseMeta,
 	tx ingest.LedgerTransaction,
 ) error {
+	checkpoint := (ledger.LedgerSequence() / 64) + 1
 	allParticipants, err := getParticipants(tx)
 	if err != nil {
 		return err
@@ -148,6 +289,48 @@ func ProcessAccounts(
 
 	return nil
 }
+
+func ProcessAccountsWithoutBackend(
+	indexStore Store,
+	ledger xdr.LedgerCloseMeta,
+	tx ingest.LedgerTransaction,
+) error {
+	checkpoint := (ledger.LedgerSequence() / 64) + 1
+	allParticipants, err := getParticipants(tx)
+	if err != nil {
+		return err
+	}
+
+	err = indexStore.AddParticipantsToIndexesNoBackend(checkpoint, "all_all", allParticipants)
+	if err != nil {
+		return err
+	}
+
+	paymentsParticipants, err := getPaymentParticipants(tx)
+	if err != nil {
+		return err
+	}
+
+	err = indexStore.AddParticipantsToIndexesNoBackend(checkpoint, "all_payments", paymentsParticipants)
+	if err != nil {
+		return err
+	}
+
+	if tx.Result.Successful() {
+		err = indexStore.AddParticipantsToIndexesNoBackend(checkpoint, "successful_all", allParticipants)
+		if err != nil {
+			return err
+		}
+
+		err = indexStore.AddParticipantsToIndexesNoBackend(checkpoint, "successful_payments", paymentsParticipants)
+		if err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
+
 func getPaymentParticipants(transaction ingest.LedgerTransaction) ([]string, error) {
 	return participantsForOperations(transaction, true)
 }
@@ -263,13 +446,16 @@ func participantsForOperations(transaction ingest.LedgerTransaction, onlyPayment
 		// Requires meta
 		// sponsor, err := operation.getSponsor()
 		// if err != nil {
-		// 	return nil, err
+		//  return nil, err
 		// }
 		// if sponsor != nil {
-		// 	otherParticipants = append(otherParticipants, *sponsor)
+		//  otherParticipants = append(otherParticipants, *sponsor)
 		// }
 	}
 
+	// FIXME: This could probably be a set rather than a list, since there's no
+	// reason to track a participating account more than once if they are
+	// participants across multiple operations.
 	return participants, nil
 }
 
@@ -292,3 +478,47 @@ func getLedgerKeyParticipants(ledgerKey xdr.LedgerKey) []string {
 	}
 	return []string{}
 }
+
+func printProgress(prefix string, done, total uint64, startTime time.Time) {
+	// This should never happen, more of a runtime assertion for now.
+	// We can remove it when production-ready.
+	if done > total {
+		panic(fmt.Errorf("error for %s: done > total (%d > %d)",
+			prefix, done, total))
+	}
+
+	progress := float64(done) / float64(total)
+	elapsed := time.Since(startTime)
+
+	// Approximate based on how many ledgers are left and how long this much
+	// progress took, e.g. if 4/10 took 2s then 6/10 will "take" 3s (though this
+	// assumes consistent ledger load).
+	remaining := (float64(elapsed) / float64(done)) * float64(total-done)
+
+	var remainingStr string
+	if math.IsInf(remaining, 0) || math.IsNaN(remaining) {
+		remainingStr = "unknown"
+	} else {
+		remainingStr = time.Duration(remaining).Round(time.Millisecond).String()
+	}
+
+	log.Infof("%s - %.1f%% (%d/%d) - elapsed: %s, remaining: ~%s", prefix,
+		100*progress, done, total,
+		elapsed.Round(time.Millisecond),
+		remainingStr,
+	)
+}
+
+func min(a, b uint32) uint32 {
+	if a < b {
+		return a
+	}
+	return b
+}
+
+func max(a, b int) int {
+	if a > b {
+		return a
+	}
+	return b
+}
diff --git a/exp/lighthorizon/index/cmd/batch/map/main.go b/exp/lighthorizon/index/cmd/batch/map/main.go
index bd49f7d50b..bd48849422 100644
--- a/exp/lighthorizon/index/cmd/batch/map/main.go
+++ b/exp/lighthorizon/index/cmd/batch/map/main.go
@@ -3,352 +3,109 @@ package main
 import (
 	"context"
 	"fmt"
-	"io"
 	"os"
 	"strconv"
-	"sync/atomic"
-	"time"
 
-	"github.com/aws/aws-sdk-go/aws"
 	"github.com/stellar/go/exp/lighthorizon/index"
 	"github.com/stellar/go/historyarchive"
-	"github.com/stellar/go/ingest"
 	"github.com/stellar/go/network"
+	"github.com/stellar/go/support/errors"
 	"github.com/stellar/go/support/log"
-	"github.com/stellar/go/toid"
-	"github.com/stellar/go/xdr"
-	"golang.org/x/sync/errgroup"
 )
 
-var (
-	// Should we use runtime.NumCPU() for a reasonable default?
-	parallel = uint32(20)
-)
+type BatchConfig struct {
+	historyarchive.Range
+	TxMetaSourceUrl, TargetUrl string
+}
 
-func main() {
-	log.SetLevel(log.InfoLevel)
-	startTime := time.Now()
+const (
+	batchSizeEnv       = "BATCH_SIZE"
+	jobIndexEnv        = "AWS_BATCH_JOB_ARRAY_INDEX"
+	firstCheckpointEnv = "FIRST_CHECKPOINT"
+	txmetaSourceUrlEnv = "TXMETA_SOURCE"
+	indexTargetUrlEnv  = "INDEX_TARGET"
 
-	jobIndexString := os.Getenv("AWS_BATCH_JOB_ARRAY_INDEX")
-	if jobIndexString == "" {
-		panic("AWS_BATCH_JOB_ARRAY_INDEX env required")
-	}
+	s3BucketName = "sdf-txmeta-pubnet"
+)
 
-	jobIndex, err := strconv.ParseUint(jobIndexString, 10, 64)
+func NewS3BatchConfig() (*BatchConfig, error) {
+	jobIndex, err := strconv.ParseUint(os.Getenv(jobIndexEnv), 10, 32)
 	if err != nil {
-		panic(err)
+		return nil, errors.Wrap(err, "invalid parameter "+jobIndexEnv)
 	}
 
-	firstCheckpointString := os.Getenv("FIRST_CHECKPOINT")
-	firstCheckpoint, err := strconv.ParseUint(firstCheckpointString, 10, 64)
-	if err != nil {
-		panic(err)
+	url := fmt.Sprintf("s3://%s/job_%d?region=%s", s3BucketName, jobIndex, "us-east-1")
+	if err := os.Setenv(indexTargetUrlEnv, url); err != nil {
+		return nil, err
 	}
 
-	batchSizeString := os.Getenv("BATCH_SIZE")
-	batchSize, err := strconv.ParseUint(batchSizeString, 10, 64)
-	if err != nil {
-		panic(err)
-	}
+	return NewBatchConfig()
+}
 
-	startCheckpoint := uint32(firstCheckpoint + batchSize*jobIndex)
-	endCheckpoint := startCheckpoint + uint32(batchSize) - 1
+func NewBatchConfig() (*BatchConfig, error) {
+	targetUrl := os.Getenv(indexTargetUrlEnv)
+	if targetUrl == "" {
+		return nil, errors.New("required parameter: " + indexTargetUrlEnv)
+	}
 
-	indexStore, err := index.NewS3Store(
-		&aws.Config{Region: aws.String("us-east-1")},
-		fmt.Sprintf("job_%d", jobIndex),
-		parallel,
-	)
+	jobIndex, err := strconv.ParseUint(os.Getenv(jobIndexEnv), 10, 32)
 	if err != nil {
-		panic(err)
+		return nil, errors.Wrap(err, "invalid parameter "+jobIndexEnv)
 	}
 
-	historyArchive, err := historyarchive.Connect(
-		"s3://history.stellar.org/prd/core-live/core_live_001",
-		historyarchive.ConnectOptions{
-			NetworkPassphrase: network.PublicNetworkPassphrase,
-			S3Region:          "eu-west-1",
-			UnsignedRequests:  true,
-		},
-	)
+	firstCheckpoint, err := strconv.ParseUint(os.Getenv(firstCheckpointEnv), 10, 32)
 	if err != nil {
-		panic(err)
+		return nil, errors.Wrap(err, "invalid parameter "+firstCheckpointEnv)
 	}
-
-	all := endCheckpoint - startCheckpoint
-
-	ctx := context.Background()
-	wg, _ := errgroup.WithContext(ctx)
-
-	ch := make(chan uint32, parallel)
-
-	go func() {
-		for i := startCheckpoint; i <= endCheckpoint; i++ {
-			ch <- i
-		}
-		close(ch)
-	}()
-
-	processed := uint64(0)
-	for i := uint32(0); i < parallel; i++ {
-		wg.Go(func() error {
-			for checkpoint := range ch {
-
-				startLedger := checkpoint * 64
-				if startLedger == 0 {
-					startLedger = 1
-				}
-				endLedger := checkpoint*64 + 64 - 1
-
-				log.Info("Processing checkpoint ", checkpoint, " ledgers ", startLedger, endLedger)
-
-				ledgers, err := historyArchive.GetLedgers(startLedger, endLedger)
-				if err != nil {
-					log.WithField("error", err).Error("error getting ledgers")
-					ch <- checkpoint
-					continue
-				}
-
-				for i := startLedger; i <= endLedger; i++ {
-					ledger, ok := ledgers[i]
-					if !ok {
-						return fmt.Errorf("no ledger %d", i)
-					}
-
-					resultMeta := make([]xdr.TransactionResultMeta, len(ledger.TransactionResult.TxResultSet.Results))
-					for i, result := range ledger.TransactionResult.TxResultSet.Results {
-						resultMeta[i].Result = result
-					}
-
-					closeMeta := xdr.LedgerCloseMeta{
-						V0: &xdr.LedgerCloseMetaV0{
-							LedgerHeader: ledger.Header,
-							TxSet:        ledger.Transaction.TxSet,
-							TxProcessing: resultMeta,
-						},
-					}
-
-					reader, err := ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(network.PublicNetworkPassphrase, closeMeta)
-					if err != nil {
-						return err
-					}
-
-					for {
-						tx, err := reader.Read()
-						if err != nil {
-							if err == io.EOF {
-								break
-							}
-							return err
-						}
-
-						indexStore.AddTransactionToIndexes(
-							toid.New(int32(closeMeta.LedgerSequence()), int32(tx.Index), 0).ToInt64(),
-							tx.Result.TransactionHash,
-						)
-
-						allParticipants, err := participantsForOperations(tx, false)
-						if err != nil {
-							return err
-						}
-
-						err = indexStore.AddParticipantsToIndexesNoBackend(checkpoint, "all_all", allParticipants)
-						if err != nil {
-							return err
-						}
-
-						paymentsParticipants, err := participantsForOperations(tx, true)
-						if err != nil {
-							return err
-						}
-
-						err = indexStore.AddParticipantsToIndexesNoBackend(checkpoint, "all_payments", paymentsParticipants)
-						if err != nil {
-							return err
-						}
-
-						if tx.Result.Successful() {
-							allParticipants, err := participantsForOperations(tx, false)
-							if err != nil {
-								return err
-							}
-
-							err = indexStore.AddParticipantsToIndexesNoBackend(checkpoint, "successful_all", allParticipants)
-							if err != nil {
-								return err
-							}
-
-							paymentsParticipants, err := participantsForOperations(tx, true)
-							if err != nil {
-								return err
-							}
-
-							err = indexStore.AddParticipantsToIndexesNoBackend(checkpoint, "successful_payments", paymentsParticipants)
-							if err != nil {
-								return err
-							}
-						}
-					}
-				}
-
-				nprocessed := atomic.AddUint64(&processed, 1)
-
-				if nprocessed%100 == 0 {
-					log.Infof(
-						"Reading checkpoints... - %.2f%% - elapsed: %s, remaining: %s",
-						(float64(nprocessed)/float64(all))*100,
-						time.Since(startTime).Round(1*time.Second),
-						(time.Duration(int64(time.Since(startTime))*int64(all)/int64(nprocessed)) - time.Since(startTime)).Round(1*time.Second),
-					)
-				}
-			}
-			return nil
-		})
+	if (firstCheckpoint+1)%64 != 0 {
+		return nil, fmt.Errorf("invalid checkpoint: %d", firstCheckpoint)
 	}
 
-	if err := wg.Wait(); err != nil {
-		panic(err)
-	}
-	log.Infof("Uploading accounts")
-	if err := indexStore.FlushAccounts(); err != nil {
-		panic(err)
-	}
-	log.Infof("Uploading indexes")
-	if err := indexStore.Flush(); err != nil {
-		panic(err)
+	batchSize, err := strconv.ParseUint(os.Getenv(batchSizeEnv), 10, 32)
+	if err != nil {
+		return nil, errors.Wrap(err, "invalid parameter "+batchSizeEnv)
 	}
-}
 
-func participantsForOperations(transaction ingest.LedgerTransaction, onlyPayments bool) ([]string, error) {
-	var participants []string
+	sourceUrl := os.Getenv(txmetaSourceUrlEnv)
+	if sourceUrl == "" {
+		return nil, errors.New("required parameter " + txmetaSourceUrlEnv)
+	}
 
-	for opindex, operation := range transaction.Envelope.Operations() {
-		opSource := operation.SourceAccount
-		if opSource == nil {
-			txSource := transaction.Envelope.SourceAccount()
-			opSource = &txSource
-		}
+	log.Debugf("%s: %d", batchSizeEnv, batchSize)
+	log.Debugf("%s: %d", jobIndexEnv, jobIndex)
+	log.Debugf("%s: %d", firstCheckpointEnv, firstCheckpoint)
+	log.Debugf("%s: %v", txmetaSourceUrlEnv, sourceUrl)
 
-		switch operation.Body.Type {
-		case xdr.OperationTypeCreateAccount,
-			xdr.OperationTypePayment,
-			xdr.OperationTypePathPaymentStrictReceive,
-			xdr.OperationTypePathPaymentStrictSend,
-			xdr.OperationTypeAccountMerge:
-			participants = append(participants, opSource.Address())
-		default:
-			if onlyPayments {
-				continue
-			}
-			participants = append(participants, opSource.Address())
-		}
+	startCheckpoint := uint32(firstCheckpoint + batchSize*jobIndex)
+	endCheckpoint := startCheckpoint + uint32(batchSize) - 1
+	return &BatchConfig{
+		Range:           historyarchive.Range{Low: startCheckpoint, High: endCheckpoint},
+		TxMetaSourceUrl: sourceUrl,
+		TargetUrl:       targetUrl,
+	}, nil
+}
 
-		switch operation.Body.Type {
-		case xdr.OperationTypeCreateAccount:
-			participants = append(participants, operation.Body.MustCreateAccountOp().Destination.Address())
-		case xdr.OperationTypePayment:
-			participants = append(participants, operation.Body.MustPaymentOp().Destination.ToAccountId().Address())
-		case xdr.OperationTypePathPaymentStrictReceive:
-			participants = append(participants, operation.Body.MustPathPaymentStrictReceiveOp().Destination.ToAccountId().Address())
-		case xdr.OperationTypePathPaymentStrictSend:
-			participants = append(participants, operation.Body.MustPathPaymentStrictSendOp().Destination.ToAccountId().Address())
-		case xdr.OperationTypeManageBuyOffer:
-			// the only direct participant is the source_account
-		case xdr.OperationTypeManageSellOffer:
-			// the only direct participant is the source_account
-		case xdr.OperationTypeCreatePassiveSellOffer:
-			// the only direct participant is the source_account
-		case xdr.OperationTypeSetOptions:
-			// the only direct participant is the source_account
-		case xdr.OperationTypeChangeTrust:
-			// the only direct participant is the source_account
-		case xdr.OperationTypeAllowTrust:
-			participants = append(participants, operation.Body.MustAllowTrustOp().Trustor.Address())
-		case xdr.OperationTypeAccountMerge:
-			participants = append(participants, operation.Body.MustDestination().ToAccountId().Address())
-		case xdr.OperationTypeInflation:
-			// the only direct participant is the source_account
-		case xdr.OperationTypeManageData:
-			// the only direct participant is the source_account
-		case xdr.OperationTypeBumpSequence:
-			// the only direct participant is the source_account
-		case xdr.OperationTypeCreateClaimableBalance:
-			for _, c := range operation.Body.MustCreateClaimableBalanceOp().Claimants {
-				participants = append(participants, c.MustV0().Destination.Address())
-			}
-		case xdr.OperationTypeClaimClaimableBalance:
-			// the only direct participant is the source_account
-		case xdr.OperationTypeBeginSponsoringFutureReserves:
-			participants = append(participants, operation.Body.MustBeginSponsoringFutureReservesOp().SponsoredId.Address())
-		case xdr.OperationTypeEndSponsoringFutureReserves:
-			// Failed transactions may not have a compliant sandwich structure
-			// we can rely on (e.g. invalid nesting or a being operation with the wrong sponsoree ID)
-			// and thus we bail out since we could return incorrect information.
-			if transaction.Result.Successful() {
-				sponsoree := transaction.Envelope.SourceAccount().ToAccountId().Address()
-				if operation.SourceAccount != nil {
-					sponsoree = operation.SourceAccount.Address()
-				}
-				operations := transaction.Envelope.Operations()
-				for i := int(opindex) - 1; i >= 0; i-- {
-					if beginOp, ok := operations[i].Body.GetBeginSponsoringFutureReservesOp(); ok &&
-						beginOp.SponsoredId.Address() == sponsoree {
-						participants = append(participants, beginOp.SponsoredId.Address())
-					}
-				}
-			}
-		case xdr.OperationTypeRevokeSponsorship:
-			op := operation.Body.MustRevokeSponsorshipOp()
-			switch op.Type {
-			case xdr.RevokeSponsorshipTypeRevokeSponsorshipLedgerEntry:
-				participants = append(participants, getLedgerKeyParticipants(*op.LedgerKey)...)
-			case xdr.RevokeSponsorshipTypeRevokeSponsorshipSigner:
-				participants = append(participants, op.Signer.AccountId.Address())
-				// We don't add signer as a participant because a signer can be arbitrary account.
-				// This can spam successful operations history of any account.
-			}
-		case xdr.OperationTypeClawback:
-			op := operation.Body.MustClawbackOp()
-			participants = append(participants, op.From.ToAccountId().Address())
-		case xdr.OperationTypeClawbackClaimableBalance:
-			// the only direct participant is the source_account
-		case xdr.OperationTypeSetTrustLineFlags:
-			op := operation.Body.MustSetTrustLineFlagsOp()
-			participants = append(participants, op.Trustor.Address())
-		case xdr.OperationTypeLiquidityPoolDeposit:
-			// the only direct participant is the source_account
-		case xdr.OperationTypeLiquidityPoolWithdraw:
-			// the only direct participant is the source_account
-		default:
-			return nil, fmt.Errorf("unknown operation type: %s", operation.Body.Type)
-		}
+func main() {
+	// log.SetLevel(log.DebugLevel)
+	log.SetLevel(log.InfoLevel)
 
-		// Requires meta
-		// sponsor, err := operation.getSponsor()
-		// if err != nil {
-		// 	return nil, err
-		// }
-		// if sponsor != nil {
-		// 	otherParticipants = append(otherParticipants, *sponsor)
-		// }
+	batch, err := NewBatchConfig()
+	if err != nil {
+		panic(err)
 	}
 
-	return participants, nil
-}
-
-func getLedgerKeyParticipants(ledgerKey xdr.LedgerKey) []string {
-	var result []string
-	switch ledgerKey.Type {
-	case xdr.LedgerEntryTypeAccount:
-		result = append(result, ledgerKey.Account.AccountId.Address())
-	case xdr.LedgerEntryTypeClaimableBalance:
-		// nothing to do
-	case xdr.LedgerEntryTypeData:
-		result = append(result, ledgerKey.Data.AccountId.Address())
-	case xdr.LedgerEntryTypeOffer:
-		result = append(result, ledgerKey.Offer.SellerId.Address())
-	case xdr.LedgerEntryTypeTrustline:
-		result = append(result, ledgerKey.TrustLine.AccountId.Address())
+	log.Infof("Uploading ledger range [%d, %d] to %s",
+		batch.Range.Low, batch.Range.High, batch.TargetUrl)
+
+	if err := index.BuildIndices(
+		context.Background(),
+		batch.TxMetaSourceUrl,
+		batch.TargetUrl,
+		network.TestNetworkPassphrase,
+		batch.Low, batch.High,
+		[]string{"transactions", "accounts_unbacked"},
+		1,
+	); err != nil {
+		panic(err)
 	}
-	return result
 }
diff --git a/exp/lighthorizon/index/cmd/single/main.go b/exp/lighthorizon/index/cmd/single/main.go
index 8d95c35315..374e5c1204 100644
--- a/exp/lighthorizon/index/cmd/single/main.go
+++ b/exp/lighthorizon/index/cmd/single/main.go
@@ -3,19 +3,12 @@ package main
 import (
 	"context"
 	"flag"
-	"fmt"
-	"math"
 	"runtime"
 	"strings"
-	"sync/atomic"
-	"time"
 
 	"github.com/stellar/go/exp/lighthorizon/index"
-	"github.com/stellar/go/historyarchive"
-	"github.com/stellar/go/ingest/ledgerbackend"
 	"github.com/stellar/go/network"
 	"github.com/stellar/go/support/log"
-	"golang.org/x/sync/errgroup"
 )
 
 func main() {
@@ -33,159 +26,19 @@ func main() {
 	flag.Parse()
 	log.SetLevel(log.InfoLevel)
 
-	ctx := context.Background()
-
-	indexStore, err := index.Connect(*targetUrl)
-	if err != nil {
-		panic(err)
-	}
-
-	// Simple file os access
-	source, err := historyarchive.ConnectBackend(
+	err := index.BuildIndices(
+		context.Background(),
 		*sourceUrl,
-		historyarchive.ConnectOptions{
-			Context:           context.Background(),
-			NetworkPassphrase: *networkPassphrase,
-		},
+		*targetUrl,
+		*networkPassphrase,
+		uint32(max(*start, 2)),
+		uint32(*end),
+		strings.Split(*modules, ","),
+		*workerCount,
 	)
 	if err != nil {
 		panic(err)
 	}
-	ledgerBackend := ledgerbackend.NewHistoryArchiveBackend(source)
-	defer ledgerBackend.Close()
-
-	startTime := time.Now()
-
-	startLedger := uint32(max(*start, 2))
-	endLedger := uint32(*end)
-	if endLedger < 0 {
-		latest, err := ledgerBackend.GetLatestLedgerSequence(ctx)
-		if err != nil {
-			panic(err)
-		}
-		endLedger = latest
-	}
-	ledgerCount := 1 + (endLedger - startLedger) // +1 because endLedger is inclusive
-	parallel := max(1, *workerCount)
-
-	log.Infof("Creating indices for ledger range: %d through %d (%d ledgers)",
-		startLedger, endLedger, ledgerCount)
-	log.Infof("Using %d workers", parallel)
-
-	// Create a bunch of workers that process ledgers a checkpoint range at a
-	// time (better than a ledger at a time to minimize flushes).
-	wg, ctx := errgroup.WithContext(ctx)
-	ch := make(chan historyarchive.Range, parallel)
-
-	indexBuilder := index.NewIndexBuilder(indexStore, *ledgerBackend, *networkPassphrase)
-	for _, part := range strings.Split(*modules, ",") {
-		switch part {
-		case "transactions":
-			indexBuilder.RegisterModule(index.ProcessTransaction)
-		case "accounts":
-			indexBuilder.RegisterModule(index.ProcessAccounts)
-		default:
-			panic(fmt.Errorf("Unknown module: %s", part))
-		}
-	}
-
-	// Submit the work to the channels, breaking up the range into checkpoints.
-	go func() {
-		// Recall: A ledger X is a checkpoint ledger iff (X + 1) % 64 == 0
-		nextCheckpoint := (((startLedger / 64) * 64) + 63)
-
-		ledger := startLedger
-		nextLedger := ledger + (nextCheckpoint - startLedger)
-		for ledger <= endLedger {
-			ch <- historyarchive.Range{Low: ledger, High: nextLedger}
-
-			ledger = nextLedger + 1
-			// Ensure we don't exceed the upper ledger bound
-			nextLedger = uint32(min(int(endLedger), int(ledger+63)))
-		}
-
-		close(ch)
-	}()
-
-	processed := uint64(0)
-	for i := 0; i < parallel; i++ {
-		wg.Go(func() error {
-			for ledgerRange := range ch {
-				count := (ledgerRange.High - ledgerRange.Low) + 1
-				nprocessed := atomic.AddUint64(&processed, uint64(count))
-
-				log.Debugf("Working on checkpoint range %+v", ledgerRange)
-
-				// Assertion for testing
-				if ledgerRange.High != endLedger &&
-					(ledgerRange.High+1)%64 != 0 {
-					log.Fatalf("Uh oh: bad range")
-				}
-
-				err = indexBuilder.Build(ctx, ledgerRange)
-				if err != nil {
-					return err
-				}
-
-				printProgress("Reading ledgers",
-					nprocessed, uint64(ledgerCount), startTime)
-
-				// Upload indices once per checkpoint to save memory
-				if err := indexStore.Flush(); err != nil {
-					return err
-				}
-			}
-			return nil
-		})
-	}
-
-	if err := wg.Wait(); err != nil {
-		panic(err)
-	}
-
-	printProgress("Reading ledgers",
-		uint64(ledgerCount), uint64(ledgerCount), startTime)
-
-	// Assertion for testing
-	if processed != uint64(ledgerCount) {
-		log.Fatalf("processed %d but expected %d", processed, ledgerCount)
-	}
-
-	log.Infof("Processed %d ledgers via %d workers", processed, parallel)
-	log.Infof("Uploading indices to %s", *targetUrl)
-	if err := indexStore.Flush(); err != nil {
-		panic(err)
-	}
-}
-
-func printProgress(prefix string, done, total uint64, startTime time.Time) {
-	// This should never happen, more of a runtime assertion for now.
-	// We can remove it when production-ready.
-	if done > total {
-		panic(fmt.Errorf("error for %s: done > total (%d > %d)",
-			prefix, done, total))
-	}
-
-	progress := float64(done) / float64(total)
-	elapsed := time.Since(startTime)
-
-	// Approximate based on how many ledgers are left and how long this much
-	// progress took, e.g. if 4/10 took 2s then 6/10 will "take" 3s (though this
-	// assumes consistent ledger load).
-	remaining := (float64(elapsed) / float64(done)) * float64(total-done)
-
-	var remainingStr string
-	if math.IsInf(remaining, 0) || math.IsNaN(remaining) {
-		remainingStr = "unknown"
-	} else {
-		remainingStr = time.Duration(remaining).Round(time.Millisecond).String()
-	}
-
-	log.Infof("%s - %.1f%% (%d/%d) - elapsed: %s, remaining: ~%s", prefix,
-		100*progress, done, total,
-		elapsed.Round(time.Millisecond),
-		remainingStr,
-	)
 }
 
 func max(a, b int) int {
@@ -194,10 +47,3 @@ func max(a, b int) int {
 	}
 	return b
 }
-
-func min(a, b int) int {
-	if a < b {
-		return a
-	}
-	return b
-}