Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: prevent silently dropped writes with overlapping shards #21951

Merged
merged 3 commits into from
Jul 28, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ Because of the version bump to `go`, the macOS build for this release requires a
1. [21898](https://github.com/influxdata/influxdb/pull/21898): Removed unused `chronograf-migator` package & chronograf API service, and updated various "chronograf" references.
1. [21919](https://github.com/influxdata/influxdb/pull/21919): Fix display and parsing of interactive `influx` CLI prompts in PowerShell.
1. [21941](https://github.com/influxdata/influxdb/pull/21941): Upgrade to golang-jwt 3.2.1.
1. [21951](https://github.com/influxdata/influxdb/pull/21951): Prevent silently dropped writes when there are overlapping shards.

## v2.0.7 [2021-06-04]
----------------------
Expand Down
42 changes: 42 additions & 0 deletions cmd/influxd/launcher/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,3 +327,45 @@ func TestLauncher_UpdateRetentionPolicy(t *testing.T) {
})
}
}

func TestLauncher_OverlappingShards(t *testing.T) {
l := launcher.RunAndSetupNewLauncherOrFail(ctx, t)
defer l.ShutdownOrFail(t, ctx)

bkt := influxdb.Bucket{Name: "test", ShardGroupDuration: time.Hour, OrgID: l.Org.ID}
require.NoError(t, l.BucketService(t).CreateBucket(ctx, &bkt))

req := l.MustNewHTTPRequest("POST", fmt.Sprintf("/api/v2/write?org=%s&bucket=%s", l.Org.ID, bkt.ID),
"m,s=0 n=0 1626416520000000000\nm,s=0 n=1 1626420120000000000\n")
resp, err := nethttp.DefaultClient.Do(req)
require.NoError(t, err)
require.NoError(t, resp.Body.Close())

newDur := humanize.Day
_, err = l.BucketService(t).UpdateBucket(ctx, bkt.ID, influxdb.BucketUpdate{ShardGroupDuration: &newDur})
require.NoError(t, err)

req = l.MustNewHTTPRequest("POST", fmt.Sprintf("/api/v2/write?org=%s&bucket=%s", l.Org.ID, bkt.ID),
// NOTE: The 3rd point's timestamp is chronologically earlier than the other two points, but it
// must come after the others in the request to trigger the overlapping-shard bug. If it comes
// first in the request, the bug is avoided because:
// 1. The point-writer sees there is no shard for the earlier point, and creates a new 24h shard-group
// 2. The new 24 group covers the timestamps of the remaining 2 points, so the writer doesn't bother looking
// for existing shards that also cover the timestamp
// 3. With only 1 shard mapped to the 3 points, there is no overlap to trigger the bug
"m,s=0 n=0 1626416520000000000\nm,s=0 n=1 1626420120000000000\nm,s=1 n=1 1626412920000000000\n")
resp, err = nethttp.DefaultClient.Do(req)
require.NoError(t, err)
require.NoError(t, resp.Body.Close())

query := `from(bucket:"test") |> range(start:2000-01-01T00:00:00Z,stop:2050-01-01T00:00:00Z)` +
` |> drop(columns:["_start","_stop"])`
exp := `,result,table,_time,_value,_field,_measurement,s` + "\r\n" +
`,_result,0,2021-07-16T06:22:00Z,0,n,m,0` + "\r\n" +
`,_result,0,2021-07-16T07:22:00Z,1,n,m,0` + "\r\n" +
`,_result,1,2021-07-16T05:22:00Z,1,n,m,1` + "\r\n\r\n"

buf, err := http.SimpleQuery(l.URL(), query, l.Org.Name, l.Auth.Token)
require.NoError(t, err)
require.Equal(t, exp, string(buf))
}
80 changes: 66 additions & 14 deletions v1/coordinator/points_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ func (w *PointsWriter) MapShards(wp *WritePointsRequest) (*ShardMapping, error)
}

// Holds all the shard groups and shards that are required for writes.
list := make(sgList, 0, 8)
list := sgList{items: make(meta.ShardGroupInfos, 0, 8)}
min := time.Unix(0, models.MinNanoTime)
if rp.Duration > 0 {
min = time.Now().Add(-rp.Duration)
Expand All @@ -219,7 +219,7 @@ func (w *PointsWriter) MapShards(wp *WritePointsRequest) (*ShardMapping, error)
if sg == nil {
return nil, errors.New("nil shard group")
}
list = list.Append(*sg)
list.Add(*sg)
}

mapping := NewShardMapping(len(wp.Points))
Expand All @@ -241,10 +241,21 @@ func (w *PointsWriter) MapShards(wp *WritePointsRequest) (*ShardMapping, error)

// sgList is a wrapper around a meta.ShardGroupInfos where we can also check
// if a given time is covered by any of the shard groups in the list.
type sgList meta.ShardGroupInfos
type sgList struct {
items meta.ShardGroupInfos

// needsSort indicates if items has been modified without a sort operation.
needsSort bool

// earliest is the last begin time of any item in items.
earliest time.Time

// latest is the greatest end time of any item in items.
latest time.Time
}

func (l sgList) Covers(t time.Time) bool {
if len(l) == 0 {
if len(l.items) == 0 {
return false
}
return l.ShardGroupAt(t) != nil
Expand All @@ -260,20 +271,61 @@ func (l sgList) Covers(t time.Time) bool {
// - a shard group with the earliest end time;
// - (assuming identical end times) the shard group with the earliest start time.
func (l sgList) ShardGroupAt(t time.Time) *meta.ShardGroupInfo {
idx := sort.Search(len(l), func(i int) bool { return l[i].EndTime.After(t) })

// We couldn't find a shard group the point falls into.
if idx == len(l) || t.Before(l[idx].StartTime) {
if l.items.Len() == 0 {
return nil
}
return &l[idx]

// find the earliest shardgroup that could contain this point using binary search.
if l.needsSort {
sort.Sort(l.items)
l.needsSort = false
}
idx := sort.Search(l.items.Len(), func(i int) bool { return l.items[i].EndTime.After(t) })

// Check if sort.Search actually found the proper shard. It feels like we should also
// be checking l.items[idx].EndTime, but sort.Search was looking at that field for us.
if idx == l.items.Len() || t.Before(l.items[idx].StartTime) {
// This could mean we are looking for a time not in the list, or we have
// overlaping shards. Overlapping shards do not work with binary searches
// on 1d arrays. You have to use an interval tree, but that's a lot of
// work for what is hopefully a rare event. Instead, we'll check if t
// should be in l, and perform a linear search if it is. This way we'll
// do the correct thing, it may just take a little longer. If we don't
// do this, then we may non-silently drop writes we should have accepted.

if t.Before(l.earliest) || t.After(l.latest) {
// t is not in range, we can avoid going through the linear search.
return nil
}

// Oh no, we've probably got overlapping shards. Perform a linear search.
for idx = 0; idx < l.items.Len(); idx++ {
if l.items[idx].Contains(t) {
// Found it!
break
}
}
if idx == l.items.Len() {
// We did not find a shard which contained t. This is very strange.
return nil
}
}

return &l.items[idx]
}

// Append appends a shard group to the list, and returns a sorted list.
func (l sgList) Append(sgi meta.ShardGroupInfo) sgList {
next := append(l, sgi)
sort.Sort(meta.ShardGroupInfos(next))
return next
// Add appends a shard group to the list, updating the earliest/latest times of the list if needed.
func (l *sgList) Add(sgi meta.ShardGroupInfo) {
l.items = append(l.items, sgi)
l.needsSort = true

// Update our earliest and latest times for l.items
if l.earliest.IsZero() || l.earliest.After(sgi.StartTime) {
l.earliest = sgi.StartTime
}
if l.latest.IsZero() || l.latest.Before(sgi.EndTime) {
l.latest = sgi.EndTime
}
}

// WritePoints writes the data to the underlying storage. consistencyLevel and user are only used for clustered scenarios
Expand Down
53 changes: 52 additions & 1 deletion v1/coordinator/points_writer_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@ package coordinator
import (
"testing"
"time"

"github.com/influxdata/influxdb/v2/v1/services/meta"
"github.com/stretchr/testify/require"
)

func TestSgList_ShardGroupAt(t *testing.T) {
Expand All @@ -11,14 +14,18 @@ func TestSgList_ShardGroupAt(t *testing.T) {
return base.Add(time.Duration(24*n) * time.Hour)
}

list := sgList{
items := meta.ShardGroupInfos{
{ID: 1, StartTime: day(0), EndTime: day(1)},
{ID: 2, StartTime: day(1), EndTime: day(2)},
{ID: 3, StartTime: day(2), EndTime: day(3)},
// SG day 3 to day 4 missing...
{ID: 4, StartTime: day(4), EndTime: day(5)},
{ID: 5, StartTime: day(5), EndTime: day(6)},
}
var list sgList
for _, i := range items {
list.Add(i)
}

examples := []struct {
T time.Time
Expand All @@ -44,3 +51,47 @@ func TestSgList_ShardGroupAt(t *testing.T) {
}
}
}

func TestSgList_ShardGroupAtOverlapping(t *testing.T) {
base := time.Date(2016, 10, 19, 0, 0, 0, 0, time.UTC)
hour := func(n int) time.Time {
return base.Add(time.Duration(n) * time.Hour)
}
day := func(n int) time.Time {
return base.Add(time.Duration(24*n) * time.Hour)
}

items := meta.ShardGroupInfos{
{ID: 1, StartTime: hour(5), EndTime: hour(6)},
{ID: 2, StartTime: hour(6), EndTime: hour(7)},
// Day-long shard overlaps with the two hour-long shards.
{ID: 3, StartTime: base, EndTime: day(1)},
}
var list sgList
for _, i := range items {
list.Add(i)
}

examples := []struct {
T time.Time
ShardGroupID uint64 // 0 will indicate we don't expect a shard group
}{
{T: base.Add(-time.Minute), ShardGroupID: 0}, // Before any SG
{T: base, ShardGroupID: 3},
{T: hour(5), ShardGroupID: 1},
{T: hour(7).Add(-time.Minute), ShardGroupID: 2},
{T: hour(8), ShardGroupID: 3},
{T: day(2), ShardGroupID: 0}, // No matching SG
}

for _, example := range examples {
t.Run(example.T.String(), func(t *testing.T) {
sg := list.ShardGroupAt(example.T)
var id uint64
if sg != nil {
id = sg.ID
}
require.Equal(t, example.ShardGroupID, id)
})
}
}