Skip to content

Commit

Permalink
Merge pull request #3901 from takayuki/unblock-relaxed-write-consiste…
Browse files Browse the repository at this point in the history
…ncy-level

Unblock relaxed write consistency level
  • Loading branch information
jwilder committed Sep 2, 2015
2 parents aeee505 + da8efa5 commit 1464ee5
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 34 deletions.
34 changes: 19 additions & 15 deletions cluster/points_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,11 @@ func (w *PointsWriter) writeToShard(shard *meta.ShardInfo, database, retentionPo
}

// response channel for each shard writer go routine
ch := make(chan error, len(shard.Owners))
type AsyncWriteResult struct {
Owner meta.ShardOwner
Err error
}
ch := make(chan *AsyncWriteResult, len(shard.Owners))

for _, owner := range shard.Owners {
go func(shardID uint64, owner meta.ShardOwner, points []tsdb.Point) {
Expand All @@ -244,12 +248,12 @@ func (w *PointsWriter) writeToShard(shard *meta.ShardInfo, database, retentionPo
if err == tsdb.ErrShardNotFound {
err = w.TSDBStore.CreateShard(database, retentionPolicy, shardID)
if err != nil {
ch <- err
ch <- &AsyncWriteResult{owner, err}
return
}
err = w.TSDBStore.WriteToShard(shardID, points)
}
ch <- err
ch <- &AsyncWriteResult{owner, err}
return
}

Expand All @@ -262,44 +266,44 @@ func (w *PointsWriter) writeToShard(shard *meta.ShardInfo, database, retentionPo
// be considered a successful write so send nil to the response channel
// otherwise, let the original error propogate to the response channel
if hherr == nil && consistency == ConsistencyLevelAny {
ch <- nil
ch <- &AsyncWriteResult{owner, nil}
return
}
}
ch <- err
ch <- &AsyncWriteResult{owner, err}

}(shard.ID, owner, points)
}

var wrote int
timeout := time.After(w.WriteTimeout)
var writeError error
for _, owner := range shard.Owners {
for range shard.Owners {
select {
case <-w.closing:
return ErrWriteFailed
case <-timeout:
// return timeout error to caller
return ErrTimeout
case err := <-ch:
case result := <-ch:
// If the write returned an error, continue to the next response
if err != nil {
w.Logger.Printf("write failed for shard %d on node %d: %v", shard.ID, owner.NodeID, err)
if result.Err != nil {
w.Logger.Printf("write failed for shard %d on node %d: %v", shard.ID, result.Owner.NodeID, result.Err)

// Keep track of the first error we see to return back to the client
if writeError == nil {
writeError = err
writeError = result.Err
}
continue
}

wrote += 1
}
}

// We wrote the required consistency level
if wrote >= required {
return nil
// We wrote the required consistency level
if wrote >= required {
return nil
}
}
}

if wrote > 0 {
Expand Down
61 changes: 42 additions & 19 deletions cmd/influx/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"text/tabwriter"

"github.com/influxdb/influxdb/client"
"github.com/influxdb/influxdb/cluster"
"github.com/influxdb/influxdb/importer/v8"
"github.com/peterh/liner"
)
Expand All @@ -36,24 +37,25 @@ const (
)

type CommandLine struct {
Client *client.Client
Line *liner.State
Host string
Port int
Username string
Password string
Database string
Ssl bool
RetentionPolicy string
Version string
Pretty bool // controls pretty print for json
Format string // controls the output format. Valid values are json, csv, or column
Execute string
ShowVersion bool
Import bool
PPS int // Controls how many points per second the import will allow via throttling
Path string
Compressed bool
Client *client.Client
Line *liner.State
Host string
Port int
Username string
Password string
Database string
Ssl bool
RetentionPolicy string
Version string
Pretty bool // controls pretty print for json
Format string // controls the output format. Valid values are json, csv, or column
WriteConsistency string
Execute string
ShowVersion bool
Import bool
PPS int // Controls how many points per second the import will allow via throttling
Path string
Compressed bool
}

func main() {
Expand All @@ -67,6 +69,7 @@ func main() {
fs.StringVar(&c.Database, "database", c.Database, "Database to connect to the server.")
fs.BoolVar(&c.Ssl, "ssl", false, "Use https for connecting to cluster.")
fs.StringVar(&c.Format, "format", defaultFormat, "Format specifies the format of the server responses: json, csv, or column.")
fs.StringVar(&c.WriteConsistency, "consistency", "any", "Set write consistency level: any, one, quorum, or all.")
fs.BoolVar(&c.Pretty, "pretty", false, "Turns on pretty print for the json format.")
fs.StringVar(&c.Execute, "execute", c.Execute, "Execute command and quit.")
fs.BoolVar(&c.ShowVersion, "version", false, "Displays the InfluxDB version.")
Expand Down Expand Up @@ -96,6 +99,8 @@ func main() {
Execute command and quit.
-format 'json|csv|column'
Format specifies the format of the server responses: json, csv, or column.
-consistency 'any|one|quorum|all'
Set write consistency level: any, one, quorum, or all
-pretty
Turns on pretty print for the json format.
-import
Expand Down Expand Up @@ -244,6 +249,8 @@ func (c *CommandLine) ParseCommand(cmd string) bool {
c.help()
case strings.HasPrefix(lcmd, "format"):
c.SetFormat(cmd)
case strings.HasPrefix(lcmd, "consistency"):
c.SetWriteConsistency(cmd)
case strings.HasPrefix(lcmd, "settings"):
c.Settings()
case strings.HasPrefix(lcmd, "pretty"):
Expand Down Expand Up @@ -358,6 +365,20 @@ func (c *CommandLine) SetFormat(cmd string) {
}
}

func (c *CommandLine) SetWriteConsistency(cmd string) {
// Remove the "consistency" keyword if it exists
cmd = strings.TrimSpace(strings.Replace(cmd, "consistency", "", -1))
// normalize cmd
cmd = strings.ToLower(cmd)

_, err := cluster.ParseConsistencyLevel(cmd)
if err != nil {
fmt.Printf("Unknown consistency level %q. Please use any, one, quorum, or all.\n", cmd)
return
}
c.WriteConsistency = cmd
}

// isWhitespace returns true if the rune is a space, tab, or newline.
func isWhitespace(ch rune) bool { return ch == ' ' || ch == '\t' || ch == '\n' }

Expand Down Expand Up @@ -444,7 +465,7 @@ func (c *CommandLine) Insert(stmt string) error {
Database: c.Database,
RetentionPolicy: c.RetentionPolicy,
Precision: "n",
WriteConsistency: client.ConsistencyAny,
WriteConsistency: c.WriteConsistency,
})
if err != nil {
fmt.Printf("ERR: %s\n", err)
Expand Down Expand Up @@ -641,6 +662,7 @@ func (c *CommandLine) Settings() {
fmt.Fprintf(w, "Database\t%s\n", c.Database)
fmt.Fprintf(w, "Pretty\t%v\n", c.Pretty)
fmt.Fprintf(w, "Format\t%s\n", c.Format)
fmt.Fprintf(w, "Write Consistency\t%s\n", c.WriteConsistency)
fmt.Fprintln(w)
w.Flush()
}
Expand All @@ -652,6 +674,7 @@ func (c *CommandLine) help() {
pretty toggle pretty print
use <db_name> set current databases
format <format> set the output format: json, csv, or column
consistency <level> set write consistency level: any, one, quorum, or all
settings output the current settings for the shell
exit quit the influx shell
Expand Down

0 comments on commit 1464ee5

Please sign in to comment.