Skip to content

Commit

Permalink
fix(storagenode): fix concurrency bugs of settings for storage and ex…
Browse files Browse the repository at this point in the history
…ecutor

This patch fixed concurrency bugs in the storage setting and executor setting. Issue #262 describes
it concretely.

Resolves #262
  • Loading branch information
ijsong committed Dec 10, 2022
1 parent 6f36b0d commit fdd1781
Showing 1 changed file with 11 additions and 6 deletions.
17 changes: 11 additions & 6 deletions internal/storagenode/storagenode.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,17 +306,20 @@ func (sn *StorageNode) runLogStreamReplica(_ context.Context, tpid types.TopicID
return nil, err
}

stg, err := storage.New(append(
sn.defaultStorageOptions,
stgOpts := make([]storage.Option, len(sn.defaultStorageOptions))
copy(stgOpts, sn.defaultStorageOptions)
stgOpts = append(stgOpts,
storage.WithPath(lsPath),
storage.WithLogger(sn.logger.Named("storage").With(zap.String("path", lsPath))),
)...)
)
stg, err := storage.New(stgOpts...)
if err != nil {
return nil, err
}

lse, err := logstream.NewExecutor(append(
sn.defaultLogStreamExecutorOptions,
lseOpts := make([]logstream.ExecutorOption, len(sn.defaultLogStreamExecutorOptions))
copy(lseOpts, sn.defaultLogStreamExecutorOptions)
lseOpts = append(lseOpts,
logstream.WithStorageNodeID(sn.snid),
logstream.WithTopicID(tpid),
logstream.WithLogStreamID(lsid),
Expand All @@ -327,7 +330,9 @@ func (sn *StorageNode) runLogStreamReplica(_ context.Context, tpid types.TopicID
grpc.WithWriteBufferSize(int(sn.replicateClientWriteBufferSize)),
),
logstream.WithLogStreamMetrics(lsm),
)...)
)

lse, err := logstream.NewExecutor(lseOpts...)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit fdd1781

Please sign in to comment.