Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

workload/ycsb: give each worker a separate sql.Conn #86841

Merged
merged 1 commit into from
Aug 29, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 95 additions & 47 deletions pkg/workload/ycsb/ycsb.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,21 +368,11 @@ func (g *ycsb) Ops(
if err != nil {
return workload.QueryLoad{}, err
}
db, err := gosql.Open(`cockroach`, strings.Join(urls, ` `))
if err != nil {
return workload.QueryLoad{}, err
}
// Allow a maximum of concurrency+1 connections to the database.
db.SetMaxOpenConns(g.connFlags.Concurrency + 1)
db.SetMaxIdleConns(g.connFlags.Concurrency + 1)

readStmt, err := db.Prepare(`SELECT * FROM usertable WHERE ycsb_key = $1`)
if err != nil {
return workload.QueryLoad{}, err
}
const readStmtStr = `SELECT * FROM usertable WHERE ycsb_key = $1`

readFieldForUpdateStmts := make([]*gosql.Stmt, numTableFields)
for i := 0; i < numTableFields; i++ {
readFieldForUpdateStmtStrs := make([]string, numTableFields)
for i := range readFieldForUpdateStmtStrs {
var q string
if g.json {
q = fmt.Sprintf(`SELECT field->>'field%d' FROM usertable WHERE ycsb_key = $1`, i)
Expand All @@ -392,22 +382,14 @@ func (g *ycsb) Ops(
if g.sfu {
q = fmt.Sprintf(`%s FOR UPDATE`, q)
}

stmt, err := db.Prepare(q)
if err != nil {
return workload.QueryLoad{}, err
}
readFieldForUpdateStmts[i] = stmt
readFieldForUpdateStmtStrs[i] = q
}

scanStmt, err := db.Prepare(`SELECT * FROM usertable WHERE ycsb_key >= $1 LIMIT $2`)
if err != nil {
return workload.QueryLoad{}, err
}
const scanStmtStr = `SELECT * FROM usertable WHERE ycsb_key >= $1 LIMIT $2`

var insertStmt *gosql.Stmt
var insertStmtStr string
if g.json {
insertStmt, err = db.Prepare(`INSERT INTO usertable VALUES ($1, json_build_object(
insertStmtStr = `INSERT INTO usertable VALUES ($1, json_build_object(
'field0', $2:::text,
'field1', $3:::text,
'field2', $4:::text,
Expand All @@ -418,31 +400,20 @@ func (g *ycsb) Ops(
'field7', $9:::text,
'field8', $10:::text,
'field9', $11:::text
))`)
))`
} else {
insertStmt, err = db.Prepare(`INSERT INTO usertable VALUES (
insertStmtStr = `INSERT INTO usertable VALUES (
$1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11
)`)
}
if err != nil {
return workload.QueryLoad{}, err
)`
}

updateStmts := make([]*gosql.Stmt, numTableFields)
var updateStmtStrs []string
if g.json {
stmt, err := db.Prepare(`UPDATE usertable SET field = field || $2 WHERE ycsb_key = $1`)
if err != nil {
return workload.QueryLoad{}, err
}
updateStmts[0] = stmt
updateStmtStrs = []string{`UPDATE usertable SET field = field || $2 WHERE ycsb_key = $1`}
} else {
for i := 0; i < numTableFields; i++ {
q := fmt.Sprintf(`UPDATE usertable SET field%d = $2 WHERE ycsb_key = $1`, i)
stmt, err := db.Prepare(q)
if err != nil {
return workload.QueryLoad{}, err
}
updateStmts[i] = stmt
updateStmtStrs = make([]string, numTableFields)
for i := range updateStmtStrs {
updateStmtStrs[i] = fmt.Sprintf(`UPDATE usertable SET field%d = $2 WHERE ycsb_key = $1`, i)
}
}

Expand Down Expand Up @@ -483,12 +454,62 @@ func (g *ycsb) Ops(
}

ql := workload.QueryLoad{SQLDatabase: sqlDatabase}
var db *gosql.DB
const connsPerDB = 1000
for i := 0; i < g.connFlags.Concurrency; i++ {
// Give each ycsbWorker worker its own SQL connection and prepare statements
// using this connection. This avoids lock contention in the sql.Rows
// objects they produce.
//
// Additionally, create a new sql.DB for every connsPerDB connections. Even
// with each worker having its own connection, there is some lock contention
// between connections from the same DB (see DB.addDep and DB.removeDep), so
// this avoids additional lock contention when the load generator uses many
// concurrent workers.
if i%connsPerDB == 0 {
db, err = gosql.Open(`cockroach`, strings.Join(urls, ` `))
if err != nil {
return workload.QueryLoad{}, err
}
}
conn, err := db.Conn(ctx)
if err != nil {
return workload.QueryLoad{}, err
}
readStmt, err := conn.PrepareContext(ctx, readStmtStr)
if err != nil {
return workload.QueryLoad{}, err
}
readFieldForUpdateStmts := make([]*gosql.Stmt, len(readFieldForUpdateStmtStrs))
for i, q := range readFieldForUpdateStmtStrs {
stmt, err := conn.PrepareContext(ctx, q)
if err != nil {
return workload.QueryLoad{}, err
}
readFieldForUpdateStmts[i] = stmt
}
scanStmt, err := conn.PrepareContext(ctx, scanStmtStr)
if err != nil {
return workload.QueryLoad{}, err
}
insertStmt, err := conn.PrepareContext(ctx, insertStmtStr)
if err != nil {
return workload.QueryLoad{}, err
}
updateStmts := make([]*gosql.Stmt, len(updateStmtStrs))
for i, q := range updateStmtStrs {
stmt, err := conn.PrepareContext(ctx, q)
if err != nil {
return workload.QueryLoad{}, err
}
updateStmts[i] = stmt
}

rng := rand.New(rand.NewSource(g.seed + uint64(i)))
w := &ycsbWorker{
config: g,
hists: reg.GetHandle(),
db: db,
conn: conn,
readStmt: readStmt,
readFieldForUpdateStmts: readFieldForUpdateStmts,
scanStmt: scanStmt,
Expand All @@ -515,7 +536,7 @@ type randGenerator interface {
type ycsbWorker struct {
config *ycsb
hists *histogram.Histograms
db *gosql.DB
conn *gosql.Conn
// Statement to read all the fields of a row. Used for read requests.
readStmt *gosql.Stmt
// Statements to read a specific field of a row in preparation for
Expand Down Expand Up @@ -755,13 +776,40 @@ func (yw *ycsbWorker) scanRows(ctx context.Context) error {
})
}

// stdlibTxnAdapter is copied from github.com/cockroachdb/cockroach-go/v2/crdb.
// TODO(nvanbenschoten): the type should be exported or we should expose a way
// to pass a *sql.Conn to crdb.ExecuteTx.
type stdlibTxnAdapter struct {
tx *gosql.Tx
}

// Exec is part of the tx interface.
func (tx stdlibTxnAdapter) Exec(ctx context.Context, q string, args ...interface{}) error {
_, err := tx.tx.ExecContext(ctx, q, args...)
return err
}

// Commit is part of the tx interface.
func (tx stdlibTxnAdapter) Commit(context.Context) error {
return tx.tx.Commit()
}

// Rollback is part of the tx interface.
func (tx stdlibTxnAdapter) Rollback(context.Context) error {
return tx.tx.Rollback()
}

func (yw *ycsbWorker) readModifyWriteRow(ctx context.Context) error {
key := yw.nextReadKey()
newValue := yw.randString(fieldLength)
fieldIdx := yw.rng.Intn(numTableFields)
var args [2]interface{}
args[0] = key
err := crdb.ExecuteTx(ctx, yw.db, nil, func(tx *gosql.Tx) error {
tx, err := yw.conn.BeginTx(ctx, nil)
if err != nil {
return err
}
err = crdb.ExecuteInTx(ctx, stdlibTxnAdapter{tx}, func() error {
var oldValue []byte
readStmt := yw.readFieldForUpdateStmts[fieldIdx]
if err := tx.StmtContext(ctx, readStmt).QueryRowContext(ctx, key).Scan(&oldValue); err != nil {
Expand Down