Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: deletion script will not OOM #1679

Merged
merged 6 commits into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 115 additions & 5 deletions tools/cassandra_delete_range/cassandra_delete_range.go
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
package main

import (
"bufio"
"fmt"
"log"
"os"
"strconv"
"strings"
"time"
"xrplf/clio/cassandra_delete_range/internal/cass"
Expand All @@ -33,7 +35,7 @@ var (
deleteBefore = app.Command("delete-before", "Prunes everything before the given ledger index")
deleteBeforeLedgerIdx = deleteBefore.Arg("idx", "Sets the latest ledger_index to keep around (delete everything before this ledger index)").Required().Uint64()

getLedgerRange = app.Command("get-ledger-range", "Fetch the current lender_range table values")
getLedgerRange = app.Command("get-ledger-range", "Fetch the current ledger_range table values")

nodesInCluster = app.Flag("nodes-in-cluster", "Number of nodes in your Scylla cluster").Short('n').Default(fmt.Sprintf("%d", defaultNumberOfNodesInCluster)).Int()
coresInNode = app.Flag("cores-in-node", "Number of cores in each node").Short('c').Default(fmt.Sprintf("%d", defaultNumberOfCoresInNode)).Int()
Expand All @@ -44,6 +46,7 @@ var (
clusterCQLVersion = app.Flag("cql-version", "The CQL version to use").Short('l').Default("3.0.0").String()
clusterPageSize = app.Flag("cluster-page-size", "Page size of results").Short('p').Default("5000").Int()
keyspace = app.Flag("keyspace", "Keyspace to use").Short('k').Default("clio_fh").String()
resume = app.Flag("resume", "Whether to resume deletion from the previous command due to something crashing").Default("false").Bool()

userName = app.Flag("username", "Username to use when connecting to the cluster").String()
password = app.Flag("password", "Password to use when connecting to the cluster").String()
Expand All @@ -56,9 +59,11 @@ var (
skipLedgerTransactionsTable = app.Flag("skip-ledger-transactions", "Whether to skip deletion from ledger_transactions table").Default("false").Bool()
skipLedgersTable = app.Flag("skip-ledgers", "Whether to skip deletion from ledgers table").Default("false").Bool()
skipWriteLatestLedger = app.Flag("skip-write-latest-ledger", "Whether to skip writing the latest ledger index").Default("false").Bool()
skipAccTransactionsTable = app.Flag("skip-account-transactions", "Whether to skip deletion from account_transactions table").Default("false").Bool()

workerCount = 1 // the calculated number of parallel goroutines the client should run
ranges []*util.TokenRange // the calculated ranges to be executed in parallel
workerCount = 1 // the calculated number of parallel goroutines the client should run
ranges []*util.TokenRange // the calculated ranges to be executed in parallel
ledgerOrTokenRange *util.StoredRange // mapping of startRange -> endRange. Used for resume deletion
)

func main() {
Expand All @@ -70,6 +75,11 @@ func main() {
log.Fatal(err)
}

cmd := strings.Join(os.Args[1:], " ")
if *resume {
prepareResume(&cmd)
}

clioCass := cass.NewClioCass(&cass.Settings{
SkipSuccessorTable: *skipSuccessorTable,
SkipObjectsTable: *skipObjectsTable,
Expand All @@ -79,8 +89,11 @@ func main() {
SkipLedgerTransactionsTable: *skipLedgerHashesTable,
SkipLedgersTable: *skipLedgersTable,
SkipWriteLatestLedger: *skipWriteLatestLedger,
SkipAccTransactionsTable: *skipAccTransactionsTable,
WorkerCount: workerCount,
Ranges: ranges}, cluster)
Ranges: ranges,
RangesRead: ledgerOrTokenRange,
Command: cmd}, cluster)

switch command {
case deleteAfter.FullCommand():
Expand Down Expand Up @@ -157,6 +170,7 @@ Skip deletion of:
- diff table : %t
- ledger_transactions table : %t
- ledgers table : %t
- account_tx table : %t

Will update ledger_range : %t

Expand All @@ -179,7 +193,9 @@ Will update ledger_range : %t
*skipDiffTable,
*skipLedgerTransactionsTable,
*skipLedgersTable,
!*skipWriteLatestLedger)
*skipAccTransactionsTable,
!*skipWriteLatestLedger,
)

fmt.Println(runParameters)
}
Expand Down Expand Up @@ -208,3 +224,97 @@ func prepareDb(dbHosts *string) (*gocql.ClusterConfig, error) {

return cluster, nil
}

func prepareResume(cmd *string) {
// format of file continue.txt is
/*
Previous user command (must match the same command to resume deletion)
Table name (ie. objects, ledger_hashes etc)
Values of token_ranges (each pair of values seperated line by line)
*/

file, err := os.Open("continue.txt")
if err != nil {
log.Fatal("continue.txt does not exist. Aborted")
}
defer file.Close()

if err != nil {
log.Fatalf("Failed to open file: %v", err)
}
scanner := bufio.NewScanner(file)
scanner.Scan()

// --resume must be last flag passed; so can check command matches
if os.Args[len(os.Args)-1] != "--resume" {
log.Fatal("--resume must be the last flag passed")
}

// get rid of --resume at the end
*cmd = strings.Join(os.Args[1:len(os.Args)-1], " ")

// makes sure command that got aborted matches the user command they enter
if scanner.Text() != *cmd {
log.Fatalf("File continue.txt has %s command stored. \n You provided %s which does not match. \n Aborting...", scanner.Text(), *cmd)
}

scanner.Scan()
// skip the neccessary tables based on where the program aborted
// for example if account_tx, all tables before account_tx
// should be already deleted so we skip for deletion
tableFound := false
switch scanner.Text() {
case "account_tx":
*skipLedgersTable = true
fallthrough
case "ledgers":
*skipLedgerTransactionsTable = true
fallthrough
case "ledger_transactions":
*skipDiffTable = true
fallthrough
case "diff":
*skipTransactionsTable = true
fallthrough
case "transactions":
*skipLedgerHashesTable = true
fallthrough
case "ledger_hashes":
*skipObjectsTable = true
fallthrough
case "objects":
*skipSuccessorTable = true
fallthrough
case "successor":
tableFound = true
}

if !tableFound {
log.Fatalf("Invalid table: %s", scanner.Text())
}

scanner.Scan()
rangeRead := make(map[int64]int64)

// now go through all the ledger range and load it to a set
for scanner.Scan() {
line := scanner.Text()
tokenRange := strings.Split(line, ",")
if len(tokenRange) != 2 {
log.Fatalf("Range is not two integers. %s . Aborting...", tokenRange)
}
startStr := strings.TrimSpace(tokenRange[0])
endStr := strings.TrimSpace(tokenRange[1])

// convert string to int64
start, err1 := strconv.ParseInt(startStr, 10, 64)
end, err2 := strconv.ParseInt(endStr, 10, 64)

if err1 != nil || err2 != nil {
log.Fatalf("Error converting integer: %s, %s", err1, err2)
}
rangeRead[start] = end
}
ledgerOrTokenRange = &util.StoredRange{}
ledgerOrTokenRange.TokenRange = rangeRead
}
Loading
Loading