Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: list-bucket API supports pagination when filtering by org #22674

Merged
merged 1 commit into from
Oct 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 32 additions & 1 deletion tenant/storage_bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,14 +227,41 @@ func (s *Store) listBucketsByOrg(ctx context.Context, tx kv.Tx, orgID platform.I
return nil, err
}

cursor, err := idx.ForwardCursor(key, kv.WithCursorPrefix(key))
start := key
opts := []kv.CursorOption{kv.WithCursorPrefix(key)}
if o.Descending {
// To list in descending order, we have to find the last entry prefixed
// by the org ID. AFAICT the only way to do this given our current indexing
// scheme is to iterate through all entries in the org once, remembering the
// last-seen key.
start, err = func() ([]byte, error) {
cursor, err := idx.ForwardCursor(start, opts...)
if err != nil {
return nil, err
}
defer cursor.Close()

lastKey := start
for k, _ := cursor.Next(); k != nil; k, _ = cursor.Next() {
lastKey = k
}
return lastKey, nil
}()
if err != nil {
return nil, err
}
// Once we've found the end, walk backwards from it on the next iteration.
opts = append(opts, kv.WithCursorDirection(kv.CursorDescending))
}
cursor, err := idx.ForwardCursor(start, opts...)
if err != nil {
return nil, err
}
defer cursor.Close()

count := 0
bs := []*influxdb.Bucket{}
searchingForAfter := o.After != nil
for k, v := cursor.Next(); k != nil; k, v = cursor.Next() {
if o.Offset != 0 && count < o.Offset {
count++
Expand All @@ -251,6 +278,10 @@ func (s *Store) listBucketsByOrg(ctx context.Context, tx kv.Tx, orgID platform.I
Err: err,
}
}
if searchingForAfter {
searchingForAfter = id != *o.After
continue
}
b, err := s.GetBucket(ctx, tx, id)
if err != nil {
return nil, err
Expand Down
78 changes: 78 additions & 0 deletions tenant/storage_bucket_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,84 @@ func TestBucket(t *testing.T) {
assert.Equal(t, expected, found)
},
},
{
name: "list in org with pagination",
setup: simpleSetup,
results: func(t *testing.T, store *tenant.Store, tx kv.Tx) {
allBuckets := testBuckets(10, withCrudLog)
orgID := secondOrgID
allInOrg := []*influxdb.Bucket{
allBuckets[9], // id 10 => 000a which is alphabetically first
allBuckets[1],
allBuckets[3],
allBuckets[5],
allBuckets[7],
}

buckets, err := store.ListBuckets(context.Background(), tx, tenant.BucketFilter{OrganizationID: &orgID})
require.NoError(t, err)
require.Equal(t, allInOrg, buckets)

// Test pagination using `after` and `limit`.
afterBuckets, err := store.ListBuckets(
context.Background(), tx,
tenant.BucketFilter{OrganizationID: &orgID},
influxdb.FindOptions{After: &allInOrg[1].ID, Limit: 2},
)
require.NoError(t, err)
assert.Equal(t, allInOrg[2:4], afterBuckets)

// Test pagination using `offset` and `limit`.
offsetBuckets, err := store.ListBuckets(
context.Background(), tx,
tenant.BucketFilter{OrganizationID: &orgID},
influxdb.FindOptions{Offset: 3, Limit: 1},
)
require.NoError(t, err)
assert.Equal(t, allInOrg[3:4], offsetBuckets)
},
},
{
name: "list descending in org with pagination",
setup: simpleSetup,
results: func(t *testing.T, store *tenant.Store, tx kv.Tx) {
allBuckets := testBuckets(10, withCrudLog)
orgID := secondOrgID
allInOrg := []*influxdb.Bucket{
allBuckets[7],
allBuckets[5],
allBuckets[3],
allBuckets[1],
allBuckets[9], // id 10 => 000a which is alphabetically first
}

buckets, err := store.ListBuckets(
context.Background(), tx,
tenant.BucketFilter{OrganizationID: &orgID},
influxdb.FindOptions{Descending: true},
)
require.NoError(t, err)
require.Equal(t, allInOrg, buckets)

// Test pagination using `after` and `limit`.
afterBuckets, err := store.ListBuckets(
context.Background(), tx,
tenant.BucketFilter{OrganizationID: &orgID},
influxdb.FindOptions{After: &allInOrg[1].ID, Limit: 2, Descending: true},
)
require.NoError(t, err)
assert.Equal(t, allInOrg[2:4], afterBuckets)

// Test pagination using `offset` and `limit`.
offsetBuckets, err := store.ListBuckets(
context.Background(), tx,
tenant.BucketFilter{OrganizationID: &orgID},
influxdb.FindOptions{Offset: 3, Limit: 1, Descending: true},
)
require.NoError(t, err)
assert.Equal(t, allInOrg[3:4], offsetBuckets)
},
},
{
name: "update",
setup: simpleSetup,
Expand Down