Skip to content

Commit

Permalink
Merge #86841
Browse files Browse the repository at this point in the history
86841: workload/ycsb: give each worker a separate sql.Conn r=nvanbenschoten a=nvanbenschoten

This commit gives ycsb the same treatment we gave kv in #30811 (4 years
ago!). It removes a throughput bottleneck in the workload client by
giving each ycsb worker goroutine its own `sql.Conn`, instead of having
them all pull from a sufficiently sized `sql.DB` pool. Doing so avoids
mutex contention.

This commit also goes a step further than #30811 by creating multiple
`sql.DB` objects and pulling only a bounded number of connections from
each. This further avoids mutex contention and removes the next
bottleneck.

Without these changes, a ycsb driver on a 64 vCPU machine could push
about 200k qps before hitting a ceiling. With it, I've seen the driver
push upt to about 500k qps before CRDB itself became the bottlenck.

Release justification: workload only.

Co-authored-by: Nathan VanBenschoten <nvanbenschoten@gmail.com>
  • Loading branch information
craig[bot] and nvanbenschoten committed Aug 29, 2022
2 parents 21365bf + 566bb16 commit e48f99b
Showing 1 changed file with 95 additions and 47 deletions.
142 changes: 95 additions & 47 deletions pkg/workload/ycsb/ycsb.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,21 +368,11 @@ func (g *ycsb) Ops(
if err != nil {
return workload.QueryLoad{}, err
}
db, err := gosql.Open(`cockroach`, strings.Join(urls, ` `))
if err != nil {
return workload.QueryLoad{}, err
}
// Allow a maximum of concurrency+1 connections to the database.
db.SetMaxOpenConns(g.connFlags.Concurrency + 1)
db.SetMaxIdleConns(g.connFlags.Concurrency + 1)

readStmt, err := db.Prepare(`SELECT * FROM usertable WHERE ycsb_key = $1`)
if err != nil {
return workload.QueryLoad{}, err
}
const readStmtStr = `SELECT * FROM usertable WHERE ycsb_key = $1`

readFieldForUpdateStmts := make([]*gosql.Stmt, numTableFields)
for i := 0; i < numTableFields; i++ {
readFieldForUpdateStmtStrs := make([]string, numTableFields)
for i := range readFieldForUpdateStmtStrs {
var q string
if g.json {
q = fmt.Sprintf(`SELECT field->>'field%d' FROM usertable WHERE ycsb_key = $1`, i)
Expand All @@ -392,22 +382,14 @@ func (g *ycsb) Ops(
if g.sfu {
q = fmt.Sprintf(`%s FOR UPDATE`, q)
}

stmt, err := db.Prepare(q)
if err != nil {
return workload.QueryLoad{}, err
}
readFieldForUpdateStmts[i] = stmt
readFieldForUpdateStmtStrs[i] = q
}

scanStmt, err := db.Prepare(`SELECT * FROM usertable WHERE ycsb_key >= $1 LIMIT $2`)
if err != nil {
return workload.QueryLoad{}, err
}
const scanStmtStr = `SELECT * FROM usertable WHERE ycsb_key >= $1 LIMIT $2`

var insertStmt *gosql.Stmt
var insertStmtStr string
if g.json {
insertStmt, err = db.Prepare(`INSERT INTO usertable VALUES ($1, json_build_object(
insertStmtStr = `INSERT INTO usertable VALUES ($1, json_build_object(
'field0', $2:::text,
'field1', $3:::text,
'field2', $4:::text,
Expand All @@ -418,31 +400,20 @@ func (g *ycsb) Ops(
'field7', $9:::text,
'field8', $10:::text,
'field9', $11:::text
))`)
))`
} else {
insertStmt, err = db.Prepare(`INSERT INTO usertable VALUES (
insertStmtStr = `INSERT INTO usertable VALUES (
$1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11
)`)
}
if err != nil {
return workload.QueryLoad{}, err
)`
}

updateStmts := make([]*gosql.Stmt, numTableFields)
var updateStmtStrs []string
if g.json {
stmt, err := db.Prepare(`UPDATE usertable SET field = field || $2 WHERE ycsb_key = $1`)
if err != nil {
return workload.QueryLoad{}, err
}
updateStmts[0] = stmt
updateStmtStrs = []string{`UPDATE usertable SET field = field || $2 WHERE ycsb_key = $1`}
} else {
for i := 0; i < numTableFields; i++ {
q := fmt.Sprintf(`UPDATE usertable SET field%d = $2 WHERE ycsb_key = $1`, i)
stmt, err := db.Prepare(q)
if err != nil {
return workload.QueryLoad{}, err
}
updateStmts[i] = stmt
updateStmtStrs = make([]string, numTableFields)
for i := range updateStmtStrs {
updateStmtStrs[i] = fmt.Sprintf(`UPDATE usertable SET field%d = $2 WHERE ycsb_key = $1`, i)
}
}

Expand Down Expand Up @@ -483,12 +454,62 @@ func (g *ycsb) Ops(
}

ql := workload.QueryLoad{SQLDatabase: sqlDatabase}
var db *gosql.DB
const connsPerDB = 1000
for i := 0; i < g.connFlags.Concurrency; i++ {
// Give each ycsbWorker worker its own SQL connection and prepare statements
// using this connection. This avoids lock contention in the sql.Rows
// objects they produce.
//
// Additionally, create a new sql.DB for every connsPerDB connections. Even
// with each worker having its own connection, there is some lock contention
// between connections from the same DB (see DB.addDep and DB.removeDep), so
// this avoids additional lock contention when the load generator uses many
// concurrent workers.
if i%connsPerDB == 0 {
db, err = gosql.Open(`cockroach`, strings.Join(urls, ` `))
if err != nil {
return workload.QueryLoad{}, err
}
}
conn, err := db.Conn(ctx)
if err != nil {
return workload.QueryLoad{}, err
}
readStmt, err := conn.PrepareContext(ctx, readStmtStr)
if err != nil {
return workload.QueryLoad{}, err
}
readFieldForUpdateStmts := make([]*gosql.Stmt, len(readFieldForUpdateStmtStrs))
for i, q := range readFieldForUpdateStmtStrs {
stmt, err := conn.PrepareContext(ctx, q)
if err != nil {
return workload.QueryLoad{}, err
}
readFieldForUpdateStmts[i] = stmt
}
scanStmt, err := conn.PrepareContext(ctx, scanStmtStr)
if err != nil {
return workload.QueryLoad{}, err
}
insertStmt, err := conn.PrepareContext(ctx, insertStmtStr)
if err != nil {
return workload.QueryLoad{}, err
}
updateStmts := make([]*gosql.Stmt, len(updateStmtStrs))
for i, q := range updateStmtStrs {
stmt, err := conn.PrepareContext(ctx, q)
if err != nil {
return workload.QueryLoad{}, err
}
updateStmts[i] = stmt
}

rng := rand.New(rand.NewSource(g.seed + uint64(i)))
w := &ycsbWorker{
config: g,
hists: reg.GetHandle(),
db: db,
conn: conn,
readStmt: readStmt,
readFieldForUpdateStmts: readFieldForUpdateStmts,
scanStmt: scanStmt,
Expand All @@ -515,7 +536,7 @@ type randGenerator interface {
type ycsbWorker struct {
config *ycsb
hists *histogram.Histograms
db *gosql.DB
conn *gosql.Conn
// Statement to read all the fields of a row. Used for read requests.
readStmt *gosql.Stmt
// Statements to read a specific field of a row in preparation for
Expand Down Expand Up @@ -755,13 +776,40 @@ func (yw *ycsbWorker) scanRows(ctx context.Context) error {
})
}

// stdlibTxnAdapter is copied from github.com/cockroachdb/cockroach-go/v2/crdb.
// TODO(nvanbenschoten): the type should be exported or we should expose a way
// to pass a *sql.Conn to crdb.ExecuteTx.
type stdlibTxnAdapter struct {
tx *gosql.Tx
}

// Exec is part of the tx interface.
func (tx stdlibTxnAdapter) Exec(ctx context.Context, q string, args ...interface{}) error {
_, err := tx.tx.ExecContext(ctx, q, args...)
return err
}

// Commit is part of the tx interface.
func (tx stdlibTxnAdapter) Commit(context.Context) error {
return tx.tx.Commit()
}

// Rollback is part of the tx interface.
func (tx stdlibTxnAdapter) Rollback(context.Context) error {
return tx.tx.Rollback()
}

func (yw *ycsbWorker) readModifyWriteRow(ctx context.Context) error {
key := yw.nextReadKey()
newValue := yw.randString(fieldLength)
fieldIdx := yw.rng.Intn(numTableFields)
var args [2]interface{}
args[0] = key
err := crdb.ExecuteTx(ctx, yw.db, nil, func(tx *gosql.Tx) error {
tx, err := yw.conn.BeginTx(ctx, nil)
if err != nil {
return err
}
err = crdb.ExecuteInTx(ctx, stdlibTxnAdapter{tx}, func() error {
var oldValue []byte
readStmt := yw.readFieldForUpdateStmts[fieldIdx]
if err := tx.StmtContext(ctx, readStmt).QueryRowContext(ctx, key).Scan(&oldValue); err != nil {
Expand Down

0 comments on commit e48f99b

Please sign in to comment.