From a508c2d2d3e9a018745dbeaeb5663cc0ec3b74d1 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Mon, 6 Jan 2025 13:43:46 -0800 Subject: [PATCH] fix(block-builder): min job size support in planning --- pkg/blockbuilder/scheduler/scheduler.go | 18 +++++- pkg/blockbuilder/scheduler/strategy.go | 11 +++- pkg/blockbuilder/scheduler/strategy_test.go | 62 ++++++++++++++++----- 3 files changed, 74 insertions(+), 17 deletions(-) diff --git a/pkg/blockbuilder/scheduler/scheduler.go b/pkg/blockbuilder/scheduler/scheduler.go index 74b507dde74cb..97e9d888fe07d 100644 --- a/pkg/blockbuilder/scheduler/scheduler.go +++ b/pkg/blockbuilder/scheduler/scheduler.go @@ -149,15 +149,27 @@ func (s *BlockScheduler) runOnce(ctx context.Context) error { s.publishLagMetrics(lag) - jobs, err := s.planner.Plan(ctx, 1) // TODO(owen-d): parallelize work within a partition + // TODO(owen-d): parallelize work within a partition + // TODO(owen-d): skip small jobs unless they're stale, + // e.g. a partition which is no longer being written to shouldn't be orphaned + jobs, err := s.planner.Plan(ctx, 1, 0) if err != nil { level.Error(s.logger).Log("msg", "failed to plan jobs", "err", err) } + level.Info(s.logger).Log("msg", "planned jobs", "count", len(jobs)) for _, job := range jobs { // TODO: end offset keeps moving each time we plan jobs, maybe we should not use it as part of the job ID added, status, err := s.idempotentEnqueue(job) + level.Info(s.logger).Log( + "msg", "enqueued job", + "added", added, + "status", status.String(), + "err", err, + "partition", job.Job.Partition(), + "num_offsets", job.Offsets().Max-job.Offsets().Min, + ) // if we've either added or encountered an error, move on; we're done this cycle if added || err != nil { @@ -253,7 +265,9 @@ func (s *BlockScheduler) HandleCompleteJob(ctx context.Context, job *types.Job, // TODO(owen-d): cleaner way to enqueue next job for this partition, // don't make it part of the response cycle to job completion, etc. - jobs, err := s.planner.Plan(ctx, 1) + // NB(owen-d): only immediately enqueue another job for this partition if] + // the job is full. Otherwise, we'd repeatedly enqueue tiny jobs with a few records. + jobs, err := s.planner.Plan(ctx, 1, int(s.cfg.TargetRecordCount)) if err != nil { level.Error(logger).Log("msg", "failed to plan subsequent jobs", "err", err) } diff --git a/pkg/blockbuilder/scheduler/strategy.go b/pkg/blockbuilder/scheduler/strategy.go index 78468bbea97ae..6e2f0acb38ced 100644 --- a/pkg/blockbuilder/scheduler/strategy.go +++ b/pkg/blockbuilder/scheduler/strategy.go @@ -19,7 +19,7 @@ type OffsetReader interface { type Planner interface { Name() string - Plan(ctx context.Context, maxJobsPerPartition int) ([]*JobWithMetadata, error) + Plan(ctx context.Context, maxJobsPerPartition int, minOffsetsPerJob int) ([]*JobWithMetadata, error) } const ( @@ -51,7 +51,8 @@ func (p *RecordCountPlanner) Name() string { return RecordCountStrategy } -func (p *RecordCountPlanner) Plan(ctx context.Context, maxJobsPerPartition int) ([]*JobWithMetadata, error) { +func (p *RecordCountPlanner) Plan(ctx context.Context, maxJobsPerPartition int, minOffsetsPerJob int) ([]*JobWithMetadata, error) { + level.Info(p.logger).Log("msg", "planning jobs", "max_jobs_per_partition", maxJobsPerPartition, "target_record_count", p.targetRecordCount, "lookback_period", p.lookbackPeriod.String()) offsets, err := p.offsetReader.GroupLag(ctx, p.lookbackPeriod) if err != nil { level.Error(p.logger).Log("msg", "failed to get group lag", "err", err) @@ -80,6 +81,12 @@ func (p *RecordCountPlanner) Plan(ctx context.Context, maxJobsPerPartition int) } currentEnd := min(currentStart+p.targetRecordCount, endOffset) + + // Skip creating job if it's smaller than minimum size + if currentEnd-currentStart < int64(minOffsetsPerJob) { + break + } + job := NewJobWithMetadata( types.NewJob(partitionOffset.Partition, types.Offsets{ Min: currentStart, diff --git a/pkg/blockbuilder/scheduler/strategy_test.go b/pkg/blockbuilder/scheduler/strategy_test.go index bba62df867c8e..b94ebd5624727 100644 --- a/pkg/blockbuilder/scheduler/strategy_test.go +++ b/pkg/blockbuilder/scheduler/strategy_test.go @@ -30,14 +30,16 @@ func compareJobs(t *testing.T, expected, actual *JobWithMetadata) { func TestRecordCountPlanner_Plan(t *testing.T) { for _, tc := range []struct { - name string - recordCount int64 - expectedJobs []*JobWithMetadata - groupLag map[int32]kadm.GroupMemberLag + name string + recordCount int64 + minOffsetsPerJob int + expectedJobs []*JobWithMetadata + groupLag map[int32]kadm.GroupMemberLag }{ { - name: "single partition, single job", - recordCount: 100, + name: "single partition, single job", + recordCount: 100, + minOffsetsPerJob: 0, groupLag: map[int32]kadm.GroupMemberLag{ 0: { Commit: kadm.Offset{ @@ -57,8 +59,9 @@ func TestRecordCountPlanner_Plan(t *testing.T) { }, }, { - name: "single partition, multiple jobs", - recordCount: 50, + name: "single partition, multiple jobs", + recordCount: 50, + minOffsetsPerJob: 0, groupLag: map[int32]kadm.GroupMemberLag{ 0: { Commit: kadm.Offset{ @@ -82,8 +85,9 @@ func TestRecordCountPlanner_Plan(t *testing.T) { }, }, { - name: "multiple partitions", - recordCount: 100, + name: "multiple partitions", + recordCount: 100, + minOffsetsPerJob: 0, groupLag: map[int32]kadm.GroupMemberLag{ 0: { Commit: kadm.Offset{ @@ -120,8 +124,9 @@ func TestRecordCountPlanner_Plan(t *testing.T) { }, }, { - name: "no lag", - recordCount: 100, + name: "no lag", + recordCount: 100, + minOffsetsPerJob: 0, groupLag: map[int32]kadm.GroupMemberLag{ 0: { Commit: kadm.Offset{ @@ -135,6 +140,37 @@ func TestRecordCountPlanner_Plan(t *testing.T) { }, expectedJobs: nil, }, + { + name: "skip small jobs", + recordCount: 100, + minOffsetsPerJob: 40, + groupLag: map[int32]kadm.GroupMemberLag{ + 0: { + Commit: kadm.Offset{ + At: 100, + }, + End: kadm.ListedOffset{ + Offset: 130, // Only 30 records available, less than minimum + }, + Partition: 0, + }, + 1: { + Commit: kadm.Offset{ + At: 200, + }, + End: kadm.ListedOffset{ + Offset: 300, // 100 records available, more than minimum + }, + Partition: 1, + }, + }, + expectedJobs: []*JobWithMetadata{ + NewJobWithMetadata( + types.NewJob(1, types.Offsets{Min: 201, Max: 300}), + 99, // priority is total remaining: 300-201 + ), + }, + }, } { t.Run(tc.name, func(t *testing.T) { mockReader := &mockOffsetReader{ @@ -147,7 +183,7 @@ func TestRecordCountPlanner_Plan(t *testing.T) { } require.NoError(t, cfg.Validate()) planner := NewRecordCountPlanner(mockReader, tc.recordCount, time.Hour, log.NewNopLogger()) - jobs, err := planner.Plan(context.Background(), 0) + jobs, err := planner.Plan(context.Background(), 0, tc.minOffsetsPerJob) require.NoError(t, err) require.Equal(t, len(tc.expectedJobs), len(jobs))