Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
30482: workload/kv: print the highest sequence number r=andreimatei a=andreimatei

At the end of the kv workload, print the sequnence number of the highest
row written so that it can be used in a next run: for example, you might
want to prepopulate the db with --read_percent=0 and then perform only
reads with --read_percent=100.
If you don't pass --write-seq (or you don't know what to put in it), all
the reads from --read_percent=100 would try to read a non-exitent row
with sentinel sequence number 0.

Release note: None

30882: workload: actually prepare writeStmt with Conn r=nvanbenschoten a=nvanbenschoten

See #30811 (comment).

Release note: None

30944: workload: teach interleavedpartitioned about retryable transactions r=BramGruneir a=BramGruneir

Also some other speedups and cleanups while I was in there.

Fixes #28567.

Release note: None

Co-authored-by: Andrei Matei <andrei@cockroachlabs.com>
Co-authored-by: Nathan VanBenschoten <nvanbenschoten@gmail.com>
Co-authored-by: Bram Gruneir <bram@cockroachlabs.com>
  • Loading branch information
4 people committed Oct 4, 2018
4 parents c466b9b + 24a44d6 + fb197ea + 4a4113a commit 752d6c5
Show file tree
Hide file tree
Showing 4 changed files with 232 additions and 139 deletions.
17 changes: 11 additions & 6 deletions pkg/workload/cli/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,8 @@ func runRun(gen workload.Generator, urls []string, dbName string) error {
rampDone = make(chan struct{})
}

workersCtx, cancelWorkers := context.WithCancel(ctx)
defer cancelWorkers()
var wg sync.WaitGroup
wg.Add(len(ops.WorkerFns))
go func() {
Expand All @@ -331,24 +333,23 @@ func runRun(gen workload.Generator, urls []string, dbName string) error {
var rampCtx context.Context
if rampDone != nil {
var cancel func()
rampCtx, cancel = context.WithTimeout(ctx, *ramp)
rampCtx, cancel = context.WithTimeout(workersCtx, *ramp)
defer cancel()
}

for i, workFn := range ops.WorkerFns {
i, workFn := i, workFn
go func() {
go func(i int, workFn func(context.Context) error) {
// If a ramp period was specified, start all of the workers
// gradually with a new context.
if rampCtx != nil {
rampPerWorker := *ramp / time.Duration(len(ops.WorkerFns))
time.Sleep(time.Duration(i) * rampPerWorker)
workerRun(rampCtx, errCh, nil, limiter, workFn)
workerRun(rampCtx, errCh, nil /* wg */, limiter, workFn)
}

// Start worker again, this time with the main context.
workerRun(ctx, errCh, &wg, limiter, workFn)
}()
workerRun(workersCtx, errCh, &wg, limiter, workFn)
}(i, workFn)
}

if rampCtx != nil {
Expand Down Expand Up @@ -432,6 +433,10 @@ func runRun(gen workload.Generator, urls []string, dbName string) error {
})

case <-done:
cancelWorkers()
if ops.Close != nil {
ops.Close(ctx)
}
const totalHeader = "\n_elapsed___errors_____ops(total)___ops/sec(cum)__avg(ms)__p50(ms)__p95(ms)__p99(ms)_pMax(ms)"
fmt.Println(totalHeader + `__total`)
startElapsed := timeutil.Since(start)
Expand Down
224 changes: 115 additions & 109 deletions pkg/workload/interleavedpartitioned/interleavedpartitioned.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"strings"
"time"

"github.com/cockroachdb/cockroach-go/crdb"
"github.com/pkg/errors"
"github.com/spf13/pflag"

Expand Down Expand Up @@ -398,8 +399,7 @@ func (w *interleavedPartitioned) Ops(
}
w.currentDelete++

_, err = deleteStatement.ExecContext(ctx, w.rowsPerDelete)
if err != nil {
if _, err = deleteStatement.ExecContext(ctx, w.rowsPerDelete); err != nil {
return err
}
elapsed := timeutil.Since(start)
Expand All @@ -410,111 +410,105 @@ func (w *interleavedPartitioned) Ops(

if opRand < w.insertPercent {
start := timeutil.Now()

tx, err := db.Begin()
insertStatement, err := db.Prepare(insertQuery)
if err != nil {
return err
}

insertStatement, err := db.Prepare(insertQuery)
insertCustomerStatement, err := db.Prepare(insertQueryCustomers)
if err != nil {
return err
}
sessionID := w.randomSessionID(rng, w.pickLocality(rng, w.insertLocalPercent))
args := []interface{}{
sessionID, // session_id
randString(rng, 100), // affiliate
randString(rng, 50), // channel
randString(rng, 20), // language
randString(rng, 20), // status
randString(rng, 50), // platform
randString(rng, 100), // query_id
}
_, err = tx.StmtContext(ctx, insertStatement).Exec(args...)
insertDeviceStatement, err := db.Prepare(insertQueryDevices)
if err != nil {
return err
}
for i := 0; i < w.customersPerSession; i++ {
insertCustomerStatement, err := db.Prepare(insertQueryCustomers)
if err != nil {
return err
}
args := []interface{}{
sessionID,
randString(rng, 50),
randString(rng, 50),
}
_, err = tx.StmtContext(ctx, insertCustomerStatement).Exec(args...)
if err != nil {
return err
}
}
for i := 0; i < w.devicesPerSession; i++ {
insertDeviceStatement, err := db.Prepare(insertQueryDevices)
if err != nil {
return err
}
args := []interface{}{
randString(rng, 100),
sessionID,
randString(rng, 50), // device_id
randString(rng, 50), // name
randString(rng, 50), // make
randString(rng, 50), // macaddress
randString(rng, 50), // model
randString(rng, 50), // serialno
}
_, err = tx.StmtContext(ctx, insertDeviceStatement).Exec(args...)
if err != nil {
return err
}
}
for i := 0; i < w.variantsPerSession; i++ {
insertVariantStatement, err := db.Prepare(insertQueryVariants)
if err != nil {
return err
}
args := []interface{}{
sessionID,
randString(rng, 50),
randString(rng, 50),
}
_, err = tx.StmtContext(ctx, insertVariantStatement).Exec(args...)
if err != nil {
return err
}
insertVariantStatement, err := db.Prepare(insertQueryVariants)
if err != nil {
return err
}
for i := 0; i < w.parametersPerSession; i++ {
insertParameterStatement, err := db.Prepare(insertQueryParameters)
if err != nil {
return err
}
args := []interface{}{
sessionID,
randString(rng, 50),
randString(rng, 50),
}
_, err = tx.StmtContext(ctx, insertParameterStatement).Exec(args...)
if err != nil {
return err
}
insertParameterStatement, err := db.Prepare(insertQueryParameters)
if err != nil {
return err
}
for i := 0; i < w.queriesPerSession; i++ {
insertQueryStatement, err := db.Prepare(insertQueryQuery)
if err != nil {
return err
}
args := []interface{}{
sessionID,
randString(rng, 50),
}
_, err = tx.StmtContext(ctx, insertQueryStatement).Exec(args...)
if err != nil {
return err
}
insertQueryStatement, err := db.Prepare(insertQueryQuery)
if err != nil {
return err
}
if err := tx.Commit(); err != nil {
return nil
if err := crdb.ExecuteTx(
context.Background(),
db,
nil, /* txopts */
func(tx *gosql.Tx) error {
sessionID := w.randomSessionID(rng, w.pickLocality(rng, w.insertLocalPercent))
args := []interface{}{
sessionID, // session_id
randString(rng, 100), // affiliate
randString(rng, 50), // channel
randString(rng, 20), // language
randString(rng, 20), // status
randString(rng, 50), // platform
randString(rng, 100), // query_id
}
if _, err = tx.StmtContext(ctx, insertStatement).Exec(args...); err != nil {
return err
}
for i := 0; i < w.customersPerSession; i++ {
args := []interface{}{
sessionID,
randString(rng, 50),
randString(rng, 50),
}
if _, err = tx.StmtContext(ctx, insertCustomerStatement).Exec(args...); err != nil {
return err
}
}
for i := 0; i < w.devicesPerSession; i++ {
args := []interface{}{
randString(rng, 100),
sessionID,
randString(rng, 50), // device_id
randString(rng, 50), // name
randString(rng, 50), // make
randString(rng, 50), // macaddress
randString(rng, 50), // model
randString(rng, 50), // serialno
}
if _, err = tx.StmtContext(ctx, insertDeviceStatement).Exec(args...); err != nil {
return err
}
}
for i := 0; i < w.variantsPerSession; i++ {
args := []interface{}{
sessionID,
randString(rng, 50),
randString(rng, 50),
}
if _, err = tx.StmtContext(ctx, insertVariantStatement).Exec(args...); err != nil {
return err
}
}
for i := 0; i < w.parametersPerSession; i++ {
args := []interface{}{
sessionID,
randString(rng, 50),
randString(rng, 50),
}
if _, err = tx.StmtContext(ctx, insertParameterStatement).Exec(args...); err != nil {
return err
}
}
for i := 0; i < w.queriesPerSession; i++ {
args := []interface{}{
sessionID,
randString(rng, 50),
}
if _, err = tx.StmtContext(ctx, insertQueryStatement).Exec(args...); err != nil {
return err
}
}
return nil
}); err != nil {
return err
}
elapsed := timeutil.Since(start)
hists.Get(`insert`).Record(elapsed)
Expand All @@ -525,16 +519,21 @@ func (w *interleavedPartitioned) Ops(
sessionID,
}
start := timeutil.Now()
for _, query := range retrieveQueries {
retrieveStatement, err := db.Prepare(query)
retrieveStatements := make([]*gosql.Stmt, len(retrieveQueries))
for i, query := range retrieveQueries {
var err error
retrieveStatements[i], err = db.Prepare(query)
if err != nil {
return err
}
_, err = retrieveStatement.ExecContext(ctx, args...)
if err != nil {
}

for i := range retrieveQueries {
if _, err = retrieveStatements[i].ExecContext(ctx, args...); err != nil {
return err
}
}

elapsed := timeutil.Since(start)
hists.Get(`retrieve`).Record(elapsed)
return nil
Expand All @@ -544,12 +543,10 @@ func (w *interleavedPartitioned) Ops(
sessionID,
}
start := timeutil.Now()
for _, query := range retrieveQueries {
retrieveStatement, err := db.Prepare(query)
if err != nil {
return err
}
_, err = retrieveStatement.ExecContext(ctx, retrieveArgs...)
retrieveStatements := make([]*gosql.Stmt, len(retrieveQueries))
for i, query := range retrieveQueries {
var err error
retrieveStatements[i], err = db.Prepare(query)
if err != nil {
return err
}
Expand All @@ -558,17 +555,26 @@ func (w *interleavedPartitioned) Ops(
if err != nil {
return err
}
updateStatement2, err := db.Prepare(updateQuery2)
if err != nil {
return err
}

for i := range retrieveQueries {
if _, err = retrieveStatements[i].ExecContext(ctx, retrieveArgs...); err != nil {
return err
}
}
if _, err = updateStatement1.ExecContext(ctx, randString(rng, 100), sessionID); err != nil {
return err
}
updateStatement2, err := db.Prepare(updateQuery2)
if err != nil {
if _, err = updateStatement2.ExecContext(ctx, randString(rng, 20), sessionID); err != nil {
return err
}
_, err = updateStatement2.ExecContext(ctx, randString(rng, 20), sessionID)

elapsed := timeutil.Since(start)
hists.Get(`updates`).Record(elapsed)
return err
return nil
}

return nil
Expand Down
Loading

0 comments on commit 752d6c5

Please sign in to comment.