Skip to content

Commit

Permalink
Fixing race condition in LeaserCheckpointer where it can fail with a …
Browse files Browse the repository at this point in the history
…ContainerAlreadyExists error #253

There's a slight race condition between checking the store exists and the container being created.
    
We can handle it easily if we just allow ContainerAlreadyExists to be considered successful. Also, since the storage tests were failing (unrelated to my change) in race detection I also fixed that as well.

Fixes #252
Fixes #225
  • Loading branch information
richardpark-msft authored Apr 28, 2022
1 parent 73b7c0f commit e473b7e
Show file tree
Hide file tree
Showing 7 changed files with 78 additions and 25 deletions.
4 changes: 4 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Change Log

## `v3.3.17`

- Fixing issue where the LeaserCheckpointer could fail with a "ContainerAlreadyExists" error. (#253)

## `v3.3.16`

- Exporting a subset of AMQP message properties for the Dapr project.
Expand Down
2 changes: 1 addition & 1 deletion eng/azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ jobs:
go get github.com/AlekSi/gocov-xml
go get -u github.com/matm/gocov-html
go get -u golang.org/x/lint/golint
go get github.com/fzipp/gocyclo/cmd/gocyclo
go get github.com/fzipp/gocyclo/cmd/gocyclo@v0.3.1
workingDirectory: '$(sdkPath)'
displayName: 'Install Dependencies'
- script: |
Expand Down
10 changes: 5 additions & 5 deletions eng/integration-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ steps:
go get github.com/axw/gocov/gocov
go get github.com/AlekSi/gocov-xml
go get -u github.com/matm/gocov-html
go get github.com/fzipp/gocyclo/cmd/gocyclo
go get github.com/fzipp/gocyclo/cmd/gocyclo@v0.3.1
go get golang.org/x/lint/golint
displayName: 'Install Dependencies'
- script: |
Expand All @@ -47,10 +47,10 @@ steps:
gocov-html < coverage.json > coverage.html
displayName: 'Run Integration Tests'
env:
ARM_SUBSCRIPTION_ID: $(go-live-azure-subscription-id)
ARM_CLIENT_ID: $(go-live-eh-azure-client-id)
ARM_CLIENT_SECRET: $(go-live-eh-azure-client-secret)
ARM_TENANT_ID: $(go-live-tenant-id)
ARM_SUBSCRIPTION_ID: $(azure-subscription-id)
ARM_CLIENT_ID: $(aad-azure-sdk-test-client-id)
ARM_CLIENT_SECRET: $(aad-azure-sdk-test-client-secret)
ARM_TENANT_ID: $(aad-azure-sdk-test-tenant-id)
- task: PublishTestResults@2
inputs:
Expand Down
11 changes: 5 additions & 6 deletions eph/leasedReceiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (

"github.com/devigned/tab"

"github.com/Azure/azure-event-hubs-go/v3"
eventhub "github.com/Azure/azure-event-hubs-go/v3"
)

type (
Expand All @@ -58,11 +58,10 @@ func (lr *leasedReceiver) Run(ctx context.Context) error {
epoch := lr.lease.GetEpoch()
lr.dlog(ctx, "running...")

go func() {
ctx, done := context.WithCancel(context.Background())
lr.done = done
lr.periodicallyRenewLease(ctx)
}()
renewLeaseCtx, cancelRenewLease := context.WithCancel(context.Background())
lr.done = cancelRenewLease

go lr.periodicallyRenewLease(renewLeaseCtx)

opts := []eventhub.ReceiveOption{eventhub.ReceiveWithEpoch(epoch)}
if lr.processor.consumerGroup != "" {
Expand Down
39 changes: 27 additions & 12 deletions storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,15 @@ import (
"encoding/json"
"errors"
"io/ioutil"
"net/http"
"net/url"
"sync"
"time"

"github.com/Azure/azure-amqp-common-go/v3/uuid"
"github.com/devigned/tab"

"github.com/Azure/azure-event-hubs-go/v3"
eventhub "github.com/Azure/azure-event-hubs-go/v3"
"github.com/Azure/azure-event-hubs-go/v3/eph"
"github.com/Azure/azure-event-hubs-go/v3/persist"

Expand Down Expand Up @@ -144,20 +145,22 @@ func (sl *LeaserCheckpointer) StoreExists(ctx context.Context) (bool, error) {
span, ctx := startConsumerSpanFromContext(ctx, "storage.LeaserCheckpointer.StoreExists")
defer span.End()

opts := azblob.ListContainersSegmentOptions{
Prefix: sl.containerName,
}
res, err := sl.serviceURL.ListContainersSegment(ctx, azblob.Marker{}, opts)
if err != nil {
return false, err
containerURL := sl.serviceURL.NewContainerURL(sl.containerName)
_, err := containerURL.GetProperties(ctx, azblob.LeaseAccessConditions{})

if err == nil {
return true, nil
}

for _, container := range res.ContainerItems {
if container.Name == sl.containerName {
return true, nil
var respErr azblob.ResponseError

if errors.As(err, &respErr) {
if respErr.Response().StatusCode == http.StatusNotFound {
return false, nil
}
}
return false, nil

return false, err
}

// EnsureStore creates the container if it does not exist
Expand All @@ -175,9 +178,21 @@ func (sl *LeaserCheckpointer) EnsureStore(ctx context.Context) error {
if !ok {
containerURL := sl.serviceURL.NewContainerURL(sl.containerName)
_, err := containerURL.Create(ctx, azblob.Metadata{}, azblob.PublicAccessNone)

if err != nil {
return err
var storageErr azblob.StorageError

if errors.As(err, &storageErr) {
// we're okay if the container has been created - we're basically racing against
// other LeaserCheckpointers.
if storageErr.ServiceCode() != azblob.ServiceCodeContainerAlreadyExists {
return err
}
} else {
return err
}
}

sl.containerURL = &containerURL
}
return nil
Expand Down
35 changes: 35 additions & 0 deletions storage/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,15 @@ package storage
import (
"context"
"strings"
"sync"
"time"

"github.com/Azure/azure-amqp-common-go/v3/aad"
"github.com/Azure/azure-storage-blob-go/azblob"
"github.com/stretchr/testify/assert"

"github.com/Azure/azure-event-hubs-go/v3/eph"
"github.com/Azure/azure-event-hubs-go/v3/internal/test"
)

const (
Expand Down Expand Up @@ -64,6 +66,35 @@ func (ts *testSuite) TestLeaserStoreCreation() {
ts.True(exists)
}

func (ts *testSuite) TestLeaserStoreCreationConcurrent() {
wg := sync.WaitGroup{}

containerName := test.RandomString("concurrent-container", 4)

// do a simple test that ensures we don't die just because we raced with
// other leasers to create the storage container.
for i := 0; i < 100; i++ {
wg.Add(1)

go func(i int) {
defer wg.Done()

leaser, _ := ts.newLeaserWithContainerName(containerName)

err := leaser.EnsureStore(context.Background())
ts.Require().NoError(err)
}(i)
}

wg.Wait()

leaser, del := ts.newLeaserWithContainerName(containerName)
defer del()
exists, err := leaser.StoreExists(context.Background())
ts.NoError(err)
ts.True(exists)
}

func (ts *testSuite) TestLeaserLeaseEnsure() {
leaser, del := ts.leaserWithEPH()
defer del()
Expand Down Expand Up @@ -189,6 +220,10 @@ func (ts *testSuite) leaserWithEPH() (*LeaserCheckpointer, func()) {

func (ts *testSuite) newLeaser() (*LeaserCheckpointer, func()) {
containerName := strings.ToLower(ts.RandomName("stortest", 4))
return ts.newLeaserWithContainerName(containerName)
}

func (ts *testSuite) newLeaserWithContainerName(containerName string) (*LeaserCheckpointer, func()) {
cred, err := NewAADSASCredential(ts.SubscriptionID, ts.ResourceGroupName, ts.AccountName, containerName, AADSASCredentialWithEnvironmentVars())
ts.Require().NoError(err)
leaser, err := NewStorageLeaserCheckpointer(cred, ts.AccountName, containerName, ts.Env)
Expand Down
2 changes: 1 addition & 1 deletion version.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,5 @@ package eventhub

const (
// Version is the semantic version number
Version = "3.3.13"
Version = "3.3.17"
)

0 comments on commit e473b7e

Please sign in to comment.