diff --git a/pkg/workload/ycsb/ycsb.go b/pkg/workload/ycsb/ycsb.go index 10e8a08671a3..e8f3bb4ef2c7 100644 --- a/pkg/workload/ycsb/ycsb.go +++ b/pkg/workload/ycsb/ycsb.go @@ -368,21 +368,11 @@ func (g *ycsb) Ops( if err != nil { return workload.QueryLoad{}, err } - db, err := gosql.Open(`cockroach`, strings.Join(urls, ` `)) - if err != nil { - return workload.QueryLoad{}, err - } - // Allow a maximum of concurrency+1 connections to the database. - db.SetMaxOpenConns(g.connFlags.Concurrency + 1) - db.SetMaxIdleConns(g.connFlags.Concurrency + 1) - readStmt, err := db.Prepare(`SELECT * FROM usertable WHERE ycsb_key = $1`) - if err != nil { - return workload.QueryLoad{}, err - } + const readStmtStr = `SELECT * FROM usertable WHERE ycsb_key = $1` - readFieldForUpdateStmts := make([]*gosql.Stmt, numTableFields) - for i := 0; i < numTableFields; i++ { + readFieldForUpdateStmtStrs := make([]string, numTableFields) + for i := range readFieldForUpdateStmtStrs { var q string if g.json { q = fmt.Sprintf(`SELECT field->>'field%d' FROM usertable WHERE ycsb_key = $1`, i) @@ -392,22 +382,14 @@ func (g *ycsb) Ops( if g.sfu { q = fmt.Sprintf(`%s FOR UPDATE`, q) } - - stmt, err := db.Prepare(q) - if err != nil { - return workload.QueryLoad{}, err - } - readFieldForUpdateStmts[i] = stmt + readFieldForUpdateStmtStrs[i] = q } - scanStmt, err := db.Prepare(`SELECT * FROM usertable WHERE ycsb_key >= $1 LIMIT $2`) - if err != nil { - return workload.QueryLoad{}, err - } + const scanStmtStr = `SELECT * FROM usertable WHERE ycsb_key >= $1 LIMIT $2` - var insertStmt *gosql.Stmt + var insertStmtStr string if g.json { - insertStmt, err = db.Prepare(`INSERT INTO usertable VALUES ($1, json_build_object( + insertStmtStr = `INSERT INTO usertable VALUES ($1, json_build_object( 'field0', $2:::text, 'field1', $3:::text, 'field2', $4:::text, @@ -418,31 +400,20 @@ func (g *ycsb) Ops( 'field7', $9:::text, 'field8', $10:::text, 'field9', $11:::text - ))`) + ))` } else { - insertStmt, err = db.Prepare(`INSERT INTO usertable VALUES ( + insertStmtStr = `INSERT INTO usertable VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11 - )`) - } - if err != nil { - return workload.QueryLoad{}, err + )` } - updateStmts := make([]*gosql.Stmt, numTableFields) + var updateStmtStrs []string if g.json { - stmt, err := db.Prepare(`UPDATE usertable SET field = field || $2 WHERE ycsb_key = $1`) - if err != nil { - return workload.QueryLoad{}, err - } - updateStmts[0] = stmt + updateStmtStrs = []string{`UPDATE usertable SET field = field || $2 WHERE ycsb_key = $1`} } else { - for i := 0; i < numTableFields; i++ { - q := fmt.Sprintf(`UPDATE usertable SET field%d = $2 WHERE ycsb_key = $1`, i) - stmt, err := db.Prepare(q) - if err != nil { - return workload.QueryLoad{}, err - } - updateStmts[i] = stmt + updateStmtStrs = make([]string, numTableFields) + for i := range updateStmtStrs { + updateStmtStrs[i] = fmt.Sprintf(`UPDATE usertable SET field%d = $2 WHERE ycsb_key = $1`, i) } } @@ -483,12 +454,62 @@ func (g *ycsb) Ops( } ql := workload.QueryLoad{SQLDatabase: sqlDatabase} + var db *gosql.DB + const connsPerDB = 1000 for i := 0; i < g.connFlags.Concurrency; i++ { + // Give each ycsbWorker worker its own SQL connection and prepare statements + // using this connection. This avoids lock contention in the sql.Rows + // objects they produce. + // + // Additionally, create a new sql.DB for every connsPerDB connections. Even + // with each worker having its own connection, there is some lock contention + // between connections from the same DB (see DB.addDep and DB.removeDep), so + // this avoids additional lock contention when the load generator uses many + // concurrent workers. + if i%connsPerDB == 0 { + db, err = gosql.Open(`cockroach`, strings.Join(urls, ` `)) + if err != nil { + return workload.QueryLoad{}, err + } + } + conn, err := db.Conn(ctx) + if err != nil { + return workload.QueryLoad{}, err + } + readStmt, err := conn.PrepareContext(ctx, readStmtStr) + if err != nil { + return workload.QueryLoad{}, err + } + readFieldForUpdateStmts := make([]*gosql.Stmt, len(readFieldForUpdateStmtStrs)) + for i, q := range readFieldForUpdateStmtStrs { + stmt, err := conn.PrepareContext(ctx, q) + if err != nil { + return workload.QueryLoad{}, err + } + readFieldForUpdateStmts[i] = stmt + } + scanStmt, err := conn.PrepareContext(ctx, scanStmtStr) + if err != nil { + return workload.QueryLoad{}, err + } + insertStmt, err := conn.PrepareContext(ctx, insertStmtStr) + if err != nil { + return workload.QueryLoad{}, err + } + updateStmts := make([]*gosql.Stmt, len(updateStmtStrs)) + for i, q := range updateStmtStrs { + stmt, err := conn.PrepareContext(ctx, q) + if err != nil { + return workload.QueryLoad{}, err + } + updateStmts[i] = stmt + } + rng := rand.New(rand.NewSource(g.seed + uint64(i))) w := &ycsbWorker{ config: g, hists: reg.GetHandle(), - db: db, + conn: conn, readStmt: readStmt, readFieldForUpdateStmts: readFieldForUpdateStmts, scanStmt: scanStmt, @@ -515,7 +536,7 @@ type randGenerator interface { type ycsbWorker struct { config *ycsb hists *histogram.Histograms - db *gosql.DB + conn *gosql.Conn // Statement to read all the fields of a row. Used for read requests. readStmt *gosql.Stmt // Statements to read a specific field of a row in preparation for @@ -755,13 +776,40 @@ func (yw *ycsbWorker) scanRows(ctx context.Context) error { }) } +// stdlibTxnAdapter is copied from github.com/cockroachdb/cockroach-go/v2/crdb. +// TODO(nvanbenschoten): the type should be exported or we should expose a way +// to pass a *sql.Conn to crdb.ExecuteTx. +type stdlibTxnAdapter struct { + tx *gosql.Tx +} + +// Exec is part of the tx interface. +func (tx stdlibTxnAdapter) Exec(ctx context.Context, q string, args ...interface{}) error { + _, err := tx.tx.ExecContext(ctx, q, args...) + return err +} + +// Commit is part of the tx interface. +func (tx stdlibTxnAdapter) Commit(context.Context) error { + return tx.tx.Commit() +} + +// Rollback is part of the tx interface. +func (tx stdlibTxnAdapter) Rollback(context.Context) error { + return tx.tx.Rollback() +} + func (yw *ycsbWorker) readModifyWriteRow(ctx context.Context) error { key := yw.nextReadKey() newValue := yw.randString(fieldLength) fieldIdx := yw.rng.Intn(numTableFields) var args [2]interface{} args[0] = key - err := crdb.ExecuteTx(ctx, yw.db, nil, func(tx *gosql.Tx) error { + tx, err := yw.conn.BeginTx(ctx, nil) + if err != nil { + return err + } + err = crdb.ExecuteInTx(ctx, stdlibTxnAdapter{tx}, func() error { var oldValue []byte readStmt := yw.readFieldForUpdateStmts[fieldIdx] if err := tx.StmtContext(ctx, readStmt).QueryRowContext(ctx, key).Scan(&oldValue); err != nil {