Skip to content

Commit

Permalink
fix(block-builder): min job size support in planning
Browse files Browse the repository at this point in the history
  • Loading branch information
owen-d committed Jan 6, 2025
1 parent 5757404 commit a508c2d
Show file tree
Hide file tree
Showing 3 changed files with 74 additions and 17 deletions.
18 changes: 16 additions & 2 deletions pkg/blockbuilder/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,15 +149,27 @@ func (s *BlockScheduler) runOnce(ctx context.Context) error {

s.publishLagMetrics(lag)

jobs, err := s.planner.Plan(ctx, 1) // TODO(owen-d): parallelize work within a partition
// TODO(owen-d): parallelize work within a partition
// TODO(owen-d): skip small jobs unless they're stale,
// e.g. a partition which is no longer being written to shouldn't be orphaned
jobs, err := s.planner.Plan(ctx, 1, 0)
if err != nil {
level.Error(s.logger).Log("msg", "failed to plan jobs", "err", err)
}
level.Info(s.logger).Log("msg", "planned jobs", "count", len(jobs))

for _, job := range jobs {
// TODO: end offset keeps moving each time we plan jobs, maybe we should not use it as part of the job ID

added, status, err := s.idempotentEnqueue(job)
level.Info(s.logger).Log(
"msg", "enqueued job",
"added", added,
"status", status.String(),
"err", err,
"partition", job.Job.Partition(),
"num_offsets", job.Offsets().Max-job.Offsets().Min,
)

// if we've either added or encountered an error, move on; we're done this cycle
if added || err != nil {
Expand Down Expand Up @@ -253,7 +265,9 @@ func (s *BlockScheduler) HandleCompleteJob(ctx context.Context, job *types.Job,

// TODO(owen-d): cleaner way to enqueue next job for this partition,
// don't make it part of the response cycle to job completion, etc.
jobs, err := s.planner.Plan(ctx, 1)
// NB(owen-d): only immediately enqueue another job for this partition if]
// the job is full. Otherwise, we'd repeatedly enqueue tiny jobs with a few records.
jobs, err := s.planner.Plan(ctx, 1, int(s.cfg.TargetRecordCount))
if err != nil {
level.Error(logger).Log("msg", "failed to plan subsequent jobs", "err", err)
}
Expand Down
11 changes: 9 additions & 2 deletions pkg/blockbuilder/scheduler/strategy.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type OffsetReader interface {

type Planner interface {
Name() string
Plan(ctx context.Context, maxJobsPerPartition int) ([]*JobWithMetadata, error)
Plan(ctx context.Context, maxJobsPerPartition int, minOffsetsPerJob int) ([]*JobWithMetadata, error)
}

const (
Expand Down Expand Up @@ -51,7 +51,8 @@ func (p *RecordCountPlanner) Name() string {
return RecordCountStrategy
}

func (p *RecordCountPlanner) Plan(ctx context.Context, maxJobsPerPartition int) ([]*JobWithMetadata, error) {
func (p *RecordCountPlanner) Plan(ctx context.Context, maxJobsPerPartition int, minOffsetsPerJob int) ([]*JobWithMetadata, error) {
level.Info(p.logger).Log("msg", "planning jobs", "max_jobs_per_partition", maxJobsPerPartition, "target_record_count", p.targetRecordCount, "lookback_period", p.lookbackPeriod.String())
offsets, err := p.offsetReader.GroupLag(ctx, p.lookbackPeriod)
if err != nil {
level.Error(p.logger).Log("msg", "failed to get group lag", "err", err)
Expand Down Expand Up @@ -80,6 +81,12 @@ func (p *RecordCountPlanner) Plan(ctx context.Context, maxJobsPerPartition int)
}

currentEnd := min(currentStart+p.targetRecordCount, endOffset)

// Skip creating job if it's smaller than minimum size
if currentEnd-currentStart < int64(minOffsetsPerJob) {
break
}

job := NewJobWithMetadata(
types.NewJob(partitionOffset.Partition, types.Offsets{
Min: currentStart,
Expand Down
62 changes: 49 additions & 13 deletions pkg/blockbuilder/scheduler/strategy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,16 @@ func compareJobs(t *testing.T, expected, actual *JobWithMetadata) {

func TestRecordCountPlanner_Plan(t *testing.T) {
for _, tc := range []struct {
name string
recordCount int64
expectedJobs []*JobWithMetadata
groupLag map[int32]kadm.GroupMemberLag
name string
recordCount int64
minOffsetsPerJob int
expectedJobs []*JobWithMetadata
groupLag map[int32]kadm.GroupMemberLag
}{
{
name: "single partition, single job",
recordCount: 100,
name: "single partition, single job",
recordCount: 100,
minOffsetsPerJob: 0,
groupLag: map[int32]kadm.GroupMemberLag{
0: {
Commit: kadm.Offset{
Expand All @@ -57,8 +59,9 @@ func TestRecordCountPlanner_Plan(t *testing.T) {
},
},
{
name: "single partition, multiple jobs",
recordCount: 50,
name: "single partition, multiple jobs",
recordCount: 50,
minOffsetsPerJob: 0,
groupLag: map[int32]kadm.GroupMemberLag{
0: {
Commit: kadm.Offset{
Expand All @@ -82,8 +85,9 @@ func TestRecordCountPlanner_Plan(t *testing.T) {
},
},
{
name: "multiple partitions",
recordCount: 100,
name: "multiple partitions",
recordCount: 100,
minOffsetsPerJob: 0,
groupLag: map[int32]kadm.GroupMemberLag{
0: {
Commit: kadm.Offset{
Expand Down Expand Up @@ -120,8 +124,9 @@ func TestRecordCountPlanner_Plan(t *testing.T) {
},
},
{
name: "no lag",
recordCount: 100,
name: "no lag",
recordCount: 100,
minOffsetsPerJob: 0,
groupLag: map[int32]kadm.GroupMemberLag{
0: {
Commit: kadm.Offset{
Expand All @@ -135,6 +140,37 @@ func TestRecordCountPlanner_Plan(t *testing.T) {
},
expectedJobs: nil,
},
{
name: "skip small jobs",
recordCount: 100,
minOffsetsPerJob: 40,
groupLag: map[int32]kadm.GroupMemberLag{
0: {
Commit: kadm.Offset{
At: 100,
},
End: kadm.ListedOffset{
Offset: 130, // Only 30 records available, less than minimum
},
Partition: 0,
},
1: {
Commit: kadm.Offset{
At: 200,
},
End: kadm.ListedOffset{
Offset: 300, // 100 records available, more than minimum
},
Partition: 1,
},
},
expectedJobs: []*JobWithMetadata{
NewJobWithMetadata(
types.NewJob(1, types.Offsets{Min: 201, Max: 300}),
99, // priority is total remaining: 300-201
),
},
},
} {
t.Run(tc.name, func(t *testing.T) {
mockReader := &mockOffsetReader{
Expand All @@ -147,7 +183,7 @@ func TestRecordCountPlanner_Plan(t *testing.T) {
}
require.NoError(t, cfg.Validate())
planner := NewRecordCountPlanner(mockReader, tc.recordCount, time.Hour, log.NewNopLogger())
jobs, err := planner.Plan(context.Background(), 0)
jobs, err := planner.Plan(context.Background(), 0, tc.minOffsetsPerJob)
require.NoError(t, err)

require.Equal(t, len(tc.expectedJobs), len(jobs))
Expand Down

0 comments on commit a508c2d

Please sign in to comment.