Skip to content

Commit

Permalink
Handle concurrent access to a pipeline's status field (#1794)
Browse files Browse the repository at this point in the history
  • Loading branch information
hariso authored Aug 21, 2024
1 parent 1ed6db3 commit 38ca22d
Show file tree
Hide file tree
Showing 21 changed files with 602 additions and 484 deletions.
6 changes: 3 additions & 3 deletions pkg/orchestrator/connectors.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (c *ConnectorOrchestrator) Create(
if pl.ProvisionedBy != pipeline.ProvisionTypeAPI {
return nil, cerrors.Errorf("cannot add a connector to the pipeline %q: %w", pl.ID, ErrImmutableProvisionedByConfig)
}
if pl.Status == pipeline.StatusRunning {
if pl.GetStatus() == pipeline.StatusRunning {
return nil, cerrors.Errorf("cannot create connector: %w", pipeline.ErrPipelineRunning)
}

Expand Down Expand Up @@ -124,7 +124,7 @@ func (c *ConnectorOrchestrator) Delete(ctx context.Context, id string) error {
if err != nil {
return err
}
if pl.Status == pipeline.StatusRunning {
if pl.GetStatus() == pipeline.StatusRunning {
return pipeline.ErrPipelineRunning
}
err = c.connectors.Delete(ctx, id, c.connectorPlugins)
Expand Down Expand Up @@ -171,7 +171,7 @@ func (c *ConnectorOrchestrator) Update(ctx context.Context, id string, config co
if err != nil {
return nil, err
}
if pl.Status == pipeline.StatusRunning {
if pl.GetStatus() == pipeline.StatusRunning {
return nil, pipeline.ErrPipelineRunning
}

Expand Down
61 changes: 36 additions & 25 deletions pkg/orchestrator/connectors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ func TestConnectorOrchestrator_Create_Success(t *testing.T) {
plsMock, consMock, procsMock, connPluginMock, procPluginMock := newMockServices(t)

pl := &pipeline.Instance{
ID: uuid.NewString(),
Status: pipeline.StatusSystemStopped,
ID: uuid.NewString(),
}
pl.SetStatus(pipeline.StatusSystemStopped)

want := &connector.Instance{
ID: uuid.NewString(),
Type: connector.TypeSource,
Expand Down Expand Up @@ -110,9 +111,9 @@ func TestConnectorOrchestrator_Create_PipelineRunning(t *testing.T) {
plsMock, consMock, procsMock, connPluginMock, procPluginMock := newMockServices(t)

pl := &pipeline.Instance{
ID: uuid.NewString(),
Status: pipeline.StatusRunning,
ID: uuid.NewString(),
}
pl.SetStatus(pipeline.StatusRunning)

plsMock.EXPECT().
Get(gomock.AssignableToTypeOf(ctxType), pl.ID).
Expand All @@ -133,9 +134,9 @@ func TestConnectorOrchestrator_Create_PipelineProvisionByConfig(t *testing.T) {

pl := &pipeline.Instance{
ID: uuid.NewString(),
Status: pipeline.StatusRunning,
ProvisionedBy: pipeline.ProvisionTypeConfig,
}
pl.SetStatus(pipeline.StatusRunning)

plsMock.EXPECT().
Get(gomock.AssignableToTypeOf(ctxType), pl.ID).
Expand All @@ -155,9 +156,10 @@ func TestConnectorOrchestrator_Create_CreateConnectorError(t *testing.T) {
plsMock, consMock, procsMock, connPluginMock, procPluginMock := newMockServices(t)

pl := &pipeline.Instance{
ID: uuid.NewString(),
Status: pipeline.StatusSystemStopped,
ID: uuid.NewString(),
}
pl.SetStatus(pipeline.StatusSystemStopped)

config := connector.Config{}
wantErr := cerrors.New("test error")
plsMock.EXPECT().
Expand Down Expand Up @@ -195,9 +197,10 @@ func TestConnectorOrchestrator_Create_AddConnectorError(t *testing.T) {
plsMock, consMock, procsMock, connPluginMock, procPluginMock := newMockServices(t)

pl := &pipeline.Instance{
ID: uuid.NewString(),
Status: pipeline.StatusSystemStopped,
ID: uuid.NewString(),
}
pl.SetStatus(pipeline.StatusSystemStopped)

conn := &connector.Instance{
ID: uuid.NewString(),
Type: connector.TypeSource,
Expand Down Expand Up @@ -257,9 +260,10 @@ func TestConnectorOrchestrator_Delete_Success(t *testing.T) {
plsMock, consMock, procsMock, connPluginMock, procPluginMock := newMockServices(t)

pl := &pipeline.Instance{
ID: uuid.NewString(),
Status: pipeline.StatusSystemStopped,
ID: uuid.NewString(),
}
pl.SetStatus(pipeline.StatusSystemStopped)

conn := &connector.Instance{
ID: uuid.NewString(),
PipelineID: pl.ID,
Expand Down Expand Up @@ -308,9 +312,10 @@ func TestConnectorOrchestrator_Delete_PipelineRunning(t *testing.T) {
plsMock, consMock, procsMock, connPluginMock, procPluginMock := newMockServices(t)

pl := &pipeline.Instance{
ID: uuid.NewString(),
Status: pipeline.StatusRunning,
ID: uuid.NewString(),
}
pl.SetStatus(pipeline.StatusRunning)

conn := &connector.Instance{
ID: uuid.NewString(),
PipelineID: pl.ID,
Expand All @@ -336,9 +341,10 @@ func TestConnectorOrchestrator_Delete_ProcessorAttached(t *testing.T) {
plsMock, consMock, procsMock, connPluginMock, procPluginMock := newMockServices(t)

pl := &pipeline.Instance{
ID: uuid.NewString(),
Status: pipeline.StatusRunning,
ID: uuid.NewString(),
}
pl.SetStatus(pipeline.StatusRunning)

conn := &connector.Instance{
ID: uuid.NewString(),
PipelineID: pl.ID,
Expand All @@ -362,9 +368,10 @@ func TestConnectorOrchestrator_Delete_Fail(t *testing.T) {
plsMock, consMock, procsMock, connPluginMock, procPluginMock := newMockServices(t)

pl := &pipeline.Instance{
ID: uuid.NewString(),
Status: pipeline.StatusSystemStopped,
ID: uuid.NewString(),
}
pl.SetStatus(pipeline.StatusSystemStopped)

conn := &connector.Instance{
ID: uuid.NewString(),
PipelineID: pl.ID,
Expand Down Expand Up @@ -394,9 +401,10 @@ func TestConnectorOrchestrator_Delete_RemoveConnectorFailed(t *testing.T) {
plsMock, consMock, procsMock, connPluginMock, procPluginMock := newMockServices(t)

pl := &pipeline.Instance{
ID: uuid.NewString(),
Status: pipeline.StatusSystemStopped,
ID: uuid.NewString(),
}
pl.SetStatus(pipeline.StatusSystemStopped)

conn := &connector.Instance{
ID: uuid.NewString(),
Plugin: "test-plugin",
Expand Down Expand Up @@ -442,9 +450,10 @@ func TestConnectorOrchestrator_Update_Success(t *testing.T) {
plsMock, consMock, procsMock, connPluginMock, procPluginMock := newMockServices(t)

pl := &pipeline.Instance{
ID: uuid.NewString(),
Status: pipeline.StatusSystemStopped,
ID: uuid.NewString(),
}
pl.SetStatus(pipeline.StatusSystemStopped)

conn := &connector.Instance{
ID: uuid.NewString(),
Plugin: "test-plugin",
Expand Down Expand Up @@ -511,9 +520,10 @@ func TestConnectorOrchestrator_Update_PipelineRunning(t *testing.T) {
plsMock, consMock, procsMock, connPluginMock, procPluginMock := newMockServices(t)

pl := &pipeline.Instance{
ID: uuid.NewString(),
Status: pipeline.StatusRunning,
ID: uuid.NewString(),
}
pl.SetStatus(pipeline.StatusRunning)

conn := &connector.Instance{
ID: uuid.NewString(),
PipelineID: pl.ID,
Expand All @@ -540,9 +550,10 @@ func TestConnectorOrchestrator_Update_Fail(t *testing.T) {
plsMock, consMock, procsMock, connPluginMock, procPluginMock := newMockServices(t)

pl := &pipeline.Instance{
ID: uuid.NewString(),
Status: pipeline.StatusSystemStopped,
ID: uuid.NewString(),
}
pl.SetStatus(pipeline.StatusSystemStopped)

conn := &connector.Instance{
ID: uuid.NewString(),
Type: connector.TypeDestination,
Expand Down
6 changes: 3 additions & 3 deletions pkg/orchestrator/pipelines.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func (po *PipelineOrchestrator) Update(ctx context.Context, id string, cfg pipel
return nil, cerrors.Errorf("pipeline %q cannot be updated: %w", pl.ID, ErrImmutableProvisionedByConfig)
}
// TODO lock pipeline
if pl.Status == pipeline.StatusRunning {
if pl.GetStatus() == pipeline.StatusRunning {
return nil, pipeline.ErrPipelineRunning
}
return po.pipelines.Update(ctx, pl.ID, cfg)
Expand All @@ -72,7 +72,7 @@ func (po *PipelineOrchestrator) Delete(ctx context.Context, id string) error {
if pl.ProvisionedBy != pipeline.ProvisionTypeAPI {
return cerrors.Errorf("pipeline %q cannot be deleted: %w", pl.ID, ErrImmutableProvisionedByConfig)
}
if pl.Status == pipeline.StatusRunning {
if pl.GetStatus() == pipeline.StatusRunning {
return pipeline.ErrPipelineRunning
}
if len(pl.ConnectorIDs) != 0 {
Expand All @@ -94,7 +94,7 @@ func (po *PipelineOrchestrator) UpdateDLQ(ctx context.Context, id string, dlq pi
return nil, cerrors.Errorf("pipeline %q cannot be updated: %w", pl.ID, ErrImmutableProvisionedByConfig)
}
// TODO lock pipeline
if pl.Status == pipeline.StatusRunning {
if pl.GetStatus() == pipeline.StatusRunning {
return nil, pipeline.ErrPipelineRunning
}

Expand Down
Loading

0 comments on commit 38ca22d

Please sign in to comment.