Skip to content

Commit

Permalink
Merge #30811
Browse files Browse the repository at this point in the history
30811: workload: give each kvOp a separate sql.Conn r=nvanbenschoten a=nvanbenschoten

In #26178, we saw that throughput hit a cliff while running `kv` at
high concurrency levels. We spent a while debugging the issue, but
nothing stood out in the `cockroach` process. Eventually, I installed
pprof http handlers in `workload` (#30810). The CPU and heap profiles
looked fine but the mutex profile revealed that **99.94%** of mutex
contention was in `sql.(*Rows).Next`.

It turns out that this method manipulates a lock that's scoped to
the same degree as its prepared statement. Since `readStmt` was
prepared on the `sql.DB`, all kvOps were contending on the same lock
in `sql.(*Rows).Next`.

The fix is to give each `kvOp` its own `sql.Conn` and prepare the
statement with a connection-level scope. There are probably other places
in `workload` that could use the same kind of change.

Before this change, `kv100 --concurrency=400` in the configuration
discussed in #26178 topped out at around 80,000 qps. After this change,
it tops out at around 250,000 qps.

Release note: None

Co-authored-by: Nathan VanBenschoten <nvanbenschoten@gmail.com>
  • Loading branch information
craig[bot] and nvanbenschoten committed Oct 1, 2018
2 parents 5cd4bf0 + b9ffacb commit 42d7905
Showing 1 changed file with 21 additions and 14 deletions.
35 changes: 21 additions & 14 deletions pkg/workload/kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package kv

import (
"bytes"
"context"
"crypto/sha1"
gosql "database/sql"
Expand Down Expand Up @@ -125,6 +124,7 @@ func (w *kv) Tables() []workload.Table {

// Ops implements the Opser interface.
func (w *kv) Ops(urls []string, reg *workload.HistogramRegistry) (workload.QueryLoad, error) {
ctx := context.Background()
sqlDatabase, err := workload.SanitizeUrls(w, w.connFlags.DBOverride, urls)
if err != nil {
return workload.QueryLoad{}, err
Expand All @@ -144,7 +144,7 @@ func (w *kv) Ops(urls []string, reg *workload.HistogramRegistry) (workload.Query
}
}

var buf bytes.Buffer
var buf strings.Builder
buf.WriteString(`SELECT k, v FROM kv WHERE k IN (`)
for i := 0; i < w.batchSize; i++ {
if i > 0 {
Expand All @@ -153,34 +153,41 @@ func (w *kv) Ops(urls []string, reg *workload.HistogramRegistry) (workload.Query
fmt.Fprintf(&buf, `$%d`, i+1)
}
buf.WriteString(`)`)
readStmt, err := db.Prepare(buf.String())
if err != nil {
return workload.QueryLoad{}, err
}
readStmtStr := buf.String()

buf.Reset()
buf.WriteString(`UPSERT INTO kv (k, v) VALUES`)

for i := 0; i < w.batchSize; i++ {
j := i * 2
if i > 0 {
buf.WriteString(", ")
}
fmt.Fprintf(&buf, ` ($%d, $%d)`, j+1, j+2)
}

writeStmt, err := db.Prepare(buf.String())
if err != nil {
return workload.QueryLoad{}, err
}
writeStmtStr := buf.String()

ql := workload.QueryLoad{SQLDatabase: sqlDatabase}
seq := &sequence{config: w, val: w.writeSeq}
for i := 0; i < w.connFlags.Concurrency; i++ {
// Give each kvOp worker its own SQL connection and prepare statements
// using this connection. This avoids lock contention in the sql.Rows
// objects they produce.
conn, err := db.Conn(ctx)
if err != nil {
return workload.QueryLoad{}, err
}
readStmt, err := conn.PrepareContext(ctx, readStmtStr)
if err != nil {
return workload.QueryLoad{}, err
}
writeStmt, err := db.PrepareContext(ctx, writeStmtStr)
if err != nil {
return workload.QueryLoad{}, err
}
op := kvOp{
config: w,
hists: reg.GetHandle(),
db: db,
conn: conn,
readStmt: readStmt,
writeStmt: writeStmt,
}
Expand All @@ -197,7 +204,7 @@ func (w *kv) Ops(urls []string, reg *workload.HistogramRegistry) (workload.Query
type kvOp struct {
config *kv
hists *workload.Histograms
db *gosql.DB
conn *gosql.Conn
readStmt *gosql.Stmt
writeStmt *gosql.Stmt
g keyGenerator
Expand Down

0 comments on commit 42d7905

Please sign in to comment.