diff --git a/cluster/points_writer.go b/cluster/points_writer.go index 75054f9dfbd..8b55ab45fcf 100644 --- a/cluster/points_writer.go +++ b/cluster/points_writer.go @@ -233,7 +233,11 @@ func (w *PointsWriter) writeToShard(shard *meta.ShardInfo, database, retentionPo } // response channel for each shard writer go routine - ch := make(chan error, len(shard.Owners)) + type AsyncWriteResult struct { + Owner meta.ShardOwner + Err error + } + ch := make(chan *AsyncWriteResult, len(shard.Owners)) for _, owner := range shard.Owners { go func(shardID uint64, owner meta.ShardOwner, points []tsdb.Point) { @@ -244,12 +248,12 @@ func (w *PointsWriter) writeToShard(shard *meta.ShardInfo, database, retentionPo if err == tsdb.ErrShardNotFound { err = w.TSDBStore.CreateShard(database, retentionPolicy, shardID) if err != nil { - ch <- err + ch <- &AsyncWriteResult{owner, err} return } err = w.TSDBStore.WriteToShard(shardID, points) } - ch <- err + ch <- &AsyncWriteResult{owner, err} return } @@ -262,11 +266,11 @@ func (w *PointsWriter) writeToShard(shard *meta.ShardInfo, database, retentionPo // be considered a successful write so send nil to the response channel // otherwise, let the original error propogate to the response channel if hherr == nil && consistency == ConsistencyLevelAny { - ch <- nil + ch <- &AsyncWriteResult{owner, nil} return } } - ch <- err + ch <- &AsyncWriteResult{owner, err} }(shard.ID, owner, points) } @@ -274,32 +278,32 @@ func (w *PointsWriter) writeToShard(shard *meta.ShardInfo, database, retentionPo var wrote int timeout := time.After(w.WriteTimeout) var writeError error - for _, owner := range shard.Owners { + for range shard.Owners { select { case <-w.closing: return ErrWriteFailed case <-timeout: // return timeout error to caller return ErrTimeout - case err := <-ch: + case result := <-ch: // If the write returned an error, continue to the next response - if err != nil { - w.Logger.Printf("write failed for shard %d on node %d: %v", shard.ID, owner.NodeID, err) + if result.Err != nil { + w.Logger.Printf("write failed for shard %d on node %d: %v", shard.ID, result.Owner.NodeID, result.Err) // Keep track of the first error we see to return back to the client if writeError == nil { - writeError = err + writeError = result.Err } continue } wrote += 1 - } - } - // We wrote the required consistency level - if wrote >= required { - return nil + // We wrote the required consistency level + if wrote >= required { + return nil + } + } } if wrote > 0 { diff --git a/cmd/influx/main.go b/cmd/influx/main.go index 24af8efa4ec..25e81006d96 100644 --- a/cmd/influx/main.go +++ b/cmd/influx/main.go @@ -17,6 +17,7 @@ import ( "text/tabwriter" "github.com/influxdb/influxdb/client" + "github.com/influxdb/influxdb/cluster" "github.com/influxdb/influxdb/importer/v8" "github.com/peterh/liner" ) @@ -36,24 +37,25 @@ const ( ) type CommandLine struct { - Client *client.Client - Line *liner.State - Host string - Port int - Username string - Password string - Database string - Ssl bool - RetentionPolicy string - Version string - Pretty bool // controls pretty print for json - Format string // controls the output format. Valid values are json, csv, or column - Execute string - ShowVersion bool - Import bool - PPS int // Controls how many points per second the import will allow via throttling - Path string - Compressed bool + Client *client.Client + Line *liner.State + Host string + Port int + Username string + Password string + Database string + Ssl bool + RetentionPolicy string + Version string + Pretty bool // controls pretty print for json + Format string // controls the output format. Valid values are json, csv, or column + WriteConsistency string + Execute string + ShowVersion bool + Import bool + PPS int // Controls how many points per second the import will allow via throttling + Path string + Compressed bool } func main() { @@ -67,6 +69,7 @@ func main() { fs.StringVar(&c.Database, "database", c.Database, "Database to connect to the server.") fs.BoolVar(&c.Ssl, "ssl", false, "Use https for connecting to cluster.") fs.StringVar(&c.Format, "format", defaultFormat, "Format specifies the format of the server responses: json, csv, or column.") + fs.StringVar(&c.WriteConsistency, "consistency", "any", "Set write consistency level: any, one, quorum, or all.") fs.BoolVar(&c.Pretty, "pretty", false, "Turns on pretty print for the json format.") fs.StringVar(&c.Execute, "execute", c.Execute, "Execute command and quit.") fs.BoolVar(&c.ShowVersion, "version", false, "Displays the InfluxDB version.") @@ -96,6 +99,8 @@ func main() { Execute command and quit. -format 'json|csv|column' Format specifies the format of the server responses: json, csv, or column. + -consistency 'any|one|quorum|all' + Set write consistency level: any, one, quorum, or all -pretty Turns on pretty print for the json format. -import @@ -244,6 +249,8 @@ func (c *CommandLine) ParseCommand(cmd string) bool { c.help() case strings.HasPrefix(lcmd, "format"): c.SetFormat(cmd) + case strings.HasPrefix(lcmd, "consistency"): + c.SetWriteConsistency(cmd) case strings.HasPrefix(lcmd, "settings"): c.Settings() case strings.HasPrefix(lcmd, "pretty"): @@ -358,6 +365,20 @@ func (c *CommandLine) SetFormat(cmd string) { } } +func (c *CommandLine) SetWriteConsistency(cmd string) { + // Remove the "consistency" keyword if it exists + cmd = strings.TrimSpace(strings.Replace(cmd, "consistency", "", -1)) + // normalize cmd + cmd = strings.ToLower(cmd) + + _, err := cluster.ParseConsistencyLevel(cmd) + if err != nil { + fmt.Printf("Unknown consistency level %q. Please use any, one, quorum, or all.\n", cmd) + return + } + c.WriteConsistency = cmd +} + // isWhitespace returns true if the rune is a space, tab, or newline. func isWhitespace(ch rune) bool { return ch == ' ' || ch == '\t' || ch == '\n' } @@ -444,7 +465,7 @@ func (c *CommandLine) Insert(stmt string) error { Database: c.Database, RetentionPolicy: c.RetentionPolicy, Precision: "n", - WriteConsistency: client.ConsistencyAny, + WriteConsistency: c.WriteConsistency, }) if err != nil { fmt.Printf("ERR: %s\n", err) @@ -641,6 +662,7 @@ func (c *CommandLine) Settings() { fmt.Fprintf(w, "Database\t%s\n", c.Database) fmt.Fprintf(w, "Pretty\t%v\n", c.Pretty) fmt.Fprintf(w, "Format\t%s\n", c.Format) + fmt.Fprintf(w, "Write Consistency\t%s\n", c.WriteConsistency) fmt.Fprintln(w) w.Flush() } @@ -652,6 +674,7 @@ func (c *CommandLine) help() { pretty toggle pretty print use set current databases format set the output format: json, csv, or column + consistency set write consistency level: any, one, quorum, or all settings output the current settings for the shell exit quit the influx shell