From 984faf673191dc6cec53b786c6c64bc84cd931d1 Mon Sep 17 00:00:00 2001 From: Injun Song Date: Thu, 16 Jan 2025 00:57:49 +0900 Subject: [PATCH] refactor(logstream): use beginLLSN in Replicate method This commit replaces llsnList with beginLLSN in the Replicate method. Since LLSNs in llsnList are strictly sequential, beginLLSN and the length of dataList are sufficient. The corresponding tests have been updated to reflect this change. The llsn field in ReplicateRequest has been deprecated and will be removed soon. --- internal/storagenode/logstream/executor.go | 12 +- .../storagenode/logstream/executor_test.go | 24 +-- .../storagenode/logstream/replicate_client.go | 10 +- internal/storagenode/replication_server.go | 2 +- proto/snpb/replicator.pb.go | 179 +++++++++++------- proto/snpb/replicator.proto | 8 + 6 files changed, 143 insertions(+), 92 deletions(-) diff --git a/internal/storagenode/logstream/executor.go b/internal/storagenode/logstream/executor.go index 4c57b1023..4174bbe81 100644 --- a/internal/storagenode/logstream/executor.go +++ b/internal/storagenode/logstream/executor.go @@ -160,7 +160,7 @@ func NewExecutor(opts ...ExecutorOption) (lse *Executor, err error) { return lse, err } -func (lse *Executor) Replicate(ctx context.Context, llsnList []types.LLSN, dataList [][]byte) error { +func (lse *Executor) Replicate(ctx context.Context, beginLLSN types.LLSN, dataList [][]byte) error { lse.inflight.Add(1) defer lse.inflight.Add(-1) @@ -178,7 +178,7 @@ func (lse *Executor) Replicate(ctx context.Context, llsnList []types.LLSN, dataL var preparationDuration time.Duration startTime := time.Now() dataBytes := int64(0) - batchSize := len(llsnList) + batchSize := len(dataList) defer func() { if lse.lsm == nil { return @@ -190,11 +190,11 @@ func (lse *Executor) Replicate(ctx context.Context, llsnList []types.LLSN, dataL lse.lsm.ReplicatePreparationMicro.Add(preparationDuration.Microseconds()) }() - oldLLSN, newLLSN := llsnList[0], llsnList[batchSize-1]+1 + oldLLSN, newLLSN := beginLLSN, beginLLSN+types.LLSN(batchSize) wb := lse.stg.NewWriteBatch() cwts := newListQueue() - for i := 0; i < len(llsnList); i++ { - _ = wb.Set(llsnList[i], dataList[i]) + for i := 0; i < batchSize; i++ { + _ = wb.Set(beginLLSN+types.LLSN(i), dataList[i]) dataBytes += int64(len(dataList[i])) cwts.PushFront(newCommitWaitTask(nil)) } @@ -329,7 +329,7 @@ func (lse *Executor) Unseal(_ context.Context, replicas []varlogpb.LogStreamRepl replica: replicas[i], rpcConn: rpcConn, queueCapacity: lse.replicateClientQueueCapacity, - //grpcDialOptions: lse.replicateClientGRPCOptions, + // grpcDialOptions: lse.replicateClientGRPCOptions, lse: lse, logger: lse.logger.Named("replicate client"), }) diff --git a/internal/storagenode/logstream/executor_test.go b/internal/storagenode/logstream/executor_test.go index 3384ce885..6eb3e6570 100644 --- a/internal/storagenode/logstream/executor_test.go +++ b/internal/storagenode/logstream/executor_test.go @@ -84,7 +84,7 @@ func TestExecutor_Closed(t *testing.T) { _, err := lse.Append(context.Background(), TestNewBatchData(t, 1, 0)) assert.ErrorIs(t, err, verrors.ErrClosed) - err = lse.Replicate(context.Background(), []types.LLSN{1}, TestNewBatchData(t, 1, 0)) + err = lse.Replicate(context.Background(), types.LLSN(1), TestNewBatchData(t, 1, 0)) assert.ErrorIs(t, err, verrors.ErrClosed) _, _, err = lse.Seal(context.Background(), types.MinGLSN) @@ -182,7 +182,7 @@ func TestExecutor_Sealing(t *testing.T) { assert.Equal(t, varlogpb.LogStreamStatusSealing, st) assert.Equal(t, executorStateSealing, lse.esm.load()) - err = lse.Replicate(context.Background(), []types.LLSN{1}, TestNewBatchData(t, 1, 0)) + err = lse.Replicate(context.Background(), types.LLSN(1), TestNewBatchData(t, 1, 0)) assert.ErrorIs(t, err, verrors.ErrSealed) }, }, @@ -286,7 +286,7 @@ func TestExecutor_Sealed(t *testing.T) { _, err = lse.Append(context.Background(), TestNewBatchData(t, 1, 0)) assert.ErrorIs(t, err, verrors.ErrSealed) - err = lse.Replicate(context.Background(), []types.LLSN{1}, TestNewBatchData(t, 1, 0)) + err = lse.Replicate(context.Background(), types.LLSN(1), TestNewBatchData(t, 1, 0)) assert.ErrorIs(t, err, verrors.ErrSealed) } @@ -616,22 +616,18 @@ func TestExecutor_Replicate(t *testing.T) { // primary if tc.isErr { - err := lse.Replicate(context.Background(), []types.LLSN{1}, [][]byte{nil}) + err := lse.Replicate(context.Background(), types.LLSN(1), [][]byte{nil}) assert.Error(t, err) return } // backup - var llsn types.LLSN + llsn := types.MinLLSN for _, batchLen := range batchlet.LengthClasses { dataList := TestNewBatchData(t, batchLen, 0) - llsnList := make([]types.LLSN, batchLen) - for i := 0; i < batchLen; i++ { - llsn++ - llsnList[i] = llsn - } - err := lse.Replicate(context.Background(), llsnList, dataList) + err := lse.Replicate(context.Background(), llsn, dataList) assert.NoError(t, err) + llsn += types.LLSN(len(dataList)) } // Commit @@ -875,7 +871,7 @@ func TestExecutor_ReplicateSeal(t *testing.T) { go func() { defer wg.Done() for llsn := lastLLSN + 1; llsn < types.MaxLLSN; llsn++ { - err := lse.Replicate(context.Background(), []types.LLSN{llsn}, [][]byte{nil}) + err := lse.Replicate(context.Background(), llsn, [][]byte{nil}) if err != nil { break } @@ -935,7 +931,7 @@ func TestExecutor_ReplicateSeal(t *testing.T) { go func() { defer wg.Done() for llsn := lastLLSN + 1; llsn < types.MaxLLSN; llsn++ { - err := lse.Replicate(context.Background(), []types.LLSN{llsn}, [][]byte{nil}) + err := lse.Replicate(context.Background(), llsn, [][]byte{nil}) if err != nil { break } @@ -3091,7 +3087,6 @@ func TestExecutorSyncInit(t *testing.T) { rpt.UncommittedLLSNLength == 0 && rpt.HighWatermark == types.GLSN(dstLastLSN) && rpt.Version == types.Version(1) - }, time.Second, 10*time.Millisecond) wg.Wait() @@ -3500,7 +3495,6 @@ func TestExecutorSyncReplicate(t *testing.T) { rpt.UncommittedLLSNLength == 0 && rpt.HighWatermark == types.GLSN(numLogs) && rpt.Version == types.Version(1) - }, time.Second, 10*time.Millisecond) wg.Wait() diff --git a/internal/storagenode/logstream/replicate_client.go b/internal/storagenode/logstream/replicate_client.go index c125b81bb..5209a0d58 100644 --- a/internal/storagenode/logstream/replicate_client.go +++ b/internal/storagenode/logstream/replicate_client.go @@ -26,7 +26,7 @@ type replicateClient struct { rpcClient snpb.ReplicatorClient streamClient snpb.Replicator_ReplicateClient - //req *snpb.ReplicateRequest + // req *snpb.ReplicateRequest } // newReplicateClient creates a new client to replicate logs to backup replica. @@ -52,7 +52,7 @@ func newReplicateClient(ctx context.Context, cfg replicateClientConfig) (*replic replicateClientConfig: cfg, queue: make(chan *replicateTask, cfg.queueCapacity), runner: runner.New("replicate client", cfg.logger), - //rpcConn: rpcConn, + // rpcConn: rpcConn, rpcClient: rpcClient, streamClient: streamClient, // NOTE: To reuse the request struct, we need to initialize the field LLSN. @@ -135,11 +135,15 @@ func (rc *replicateClient) sendLoop(ctx context.Context) { func (rc *replicateClient) sendLoopInternal(_ context.Context, rt *replicateTask, req *snpb.ReplicateRequest) error { // Remove maxAppendSubBatchSize, since rt already has batched data. startTime := time.Now() + // TODO(jun): Since (snpb.ReplicateRequest).LLSN is deprecated, it will disappear soon. // NOTE: We need to copy the LLSN array, since the array is reused. req.LLSN = req.LLSN[0:len(rt.llsnList)] copy(req.LLSN, rt.llsnList) - //req.LLSN = rt.llsnList + // req.LLSN = rt.llsnList req.Data = rt.dataList + if len(rt.llsnList) > 0 { + req.BeginLLSN = rt.llsnList[0] + } rt.release() err := rc.streamClient.Send(req) inflight := rc.inflight.Add(-1) diff --git a/internal/storagenode/replication_server.go b/internal/storagenode/replication_server.go index 12274f50a..07f957356 100644 --- a/internal/storagenode/replication_server.go +++ b/internal/storagenode/replication_server.go @@ -173,7 +173,7 @@ func (rs *replicationServer) replicate(ctx context.Context, requestC <-chan *rep lse.Metrics().ReplicateServerOperations.Add(1) - err = lse.Replicate(ctx, rst.req.LLSN, rst.req.Data) + err = lse.Replicate(ctx, rst.req.BeginLLSN, rst.req.Data) if err != nil { rst.release() return diff --git a/proto/snpb/replicator.pb.go b/proto/snpb/replicator.pb.go index 139a31eb6..709bfb0f7 100644 --- a/proto/snpb/replicator.pb.go +++ b/proto/snpb/replicator.pb.go @@ -70,8 +70,12 @@ func (SyncState) EnumDescriptor() ([]byte, []int) { type ReplicateRequest struct { TopicID github_com_kakao_varlog_pkg_types.TopicID `protobuf:"varint,1,opt,name=topic_id,json=topicId,proto3,casttype=github.com/kakao/varlog/pkg/types.TopicID" json:"topic_id,omitempty"` LogStreamID github_com_kakao_varlog_pkg_types.LogStreamID `protobuf:"varint,2,opt,name=log_stream_id,json=logStreamId,proto3,casttype=github.com/kakao/varlog/pkg/types.LogStreamID" json:"log_stream_id,omitempty"` - LLSN []github_com_kakao_varlog_pkg_types.LLSN `protobuf:"varint,3,rep,packed,name=llsn,proto3,casttype=github.com/kakao/varlog/pkg/types.LLSN" json:"llsn,omitempty"` - Data [][]byte `protobuf:"bytes,4,rep,name=data,proto3" json:"data,omitempty"` + // LLSN is a list of local log sequence numbers where the log entries are + // replicated. The primary replica fills this field for backward + // compatibility, but it will disappear soon. + LLSN []github_com_kakao_varlog_pkg_types.LLSN `protobuf:"varint,3,rep,packed,name=llsn,proto3,casttype=github.com/kakao/varlog/pkg/types.LLSN" json:"llsn,omitempty"` // Deprecated: Do not use. + Data [][]byte `protobuf:"bytes,4,rep,name=data,proto3" json:"data,omitempty"` + BeginLLSN github_com_kakao_varlog_pkg_types.LLSN `protobuf:"varint,5,opt,name=begin_llsn,json=beginLlsn,proto3,casttype=github.com/kakao/varlog/pkg/types.LLSN" json:"begin_llsn,omitempty"` } func (m *ReplicateRequest) Reset() { *m = ReplicateRequest{} } @@ -121,6 +125,7 @@ func (m *ReplicateRequest) GetLogStreamID() github_com_kakao_varlog_pkg_types.Lo return 0 } +// Deprecated: Do not use. func (m *ReplicateRequest) GetLLSN() []github_com_kakao_varlog_pkg_types.LLSN { if m != nil { return m.LLSN @@ -135,6 +140,13 @@ func (m *ReplicateRequest) GetData() [][]byte { return nil } +func (m *ReplicateRequest) GetBeginLLSN() github_com_kakao_varlog_pkg_types.LLSN { + if m != nil { + return m.BeginLLSN + } + return 0 +} + type ReplicateResponse struct { } @@ -664,71 +676,73 @@ func init() { func init() { proto.RegisterFile("proto/snpb/replicator.proto", fileDescriptor_85705cb817486b63) } var fileDescriptor_85705cb817486b63 = []byte{ - // 1012 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe4, 0x56, 0xcf, 0x6f, 0xe3, 0x44, - 0x14, 0x8e, 0x13, 0x67, 0x9b, 0xbc, 0xb4, 0x25, 0x9d, 0xb2, 0x34, 0x04, 0x6a, 0x67, 0x83, 0x84, - 0xc2, 0x8f, 0x8d, 0xa5, 0xae, 0x58, 0x96, 0x6a, 0xa5, 0x85, 0x96, 0xb4, 0x44, 0x0a, 0x6d, 0x35, - 0xae, 0x10, 0x82, 0x43, 0x71, 0x9d, 0x59, 0x63, 0xd5, 0xf1, 0x18, 0x7b, 0x82, 0xe8, 0x7f, 0x80, - 0x7a, 0x42, 0xdc, 0x2b, 0x2a, 0x51, 0x21, 0x8e, 0xdc, 0x58, 0xfe, 0x83, 0x1e, 0xf7, 0xc8, 0x29, - 0x12, 0xe9, 0x85, 0x3f, 0x01, 0xed, 0x09, 0xcd, 0x78, 0xec, 0xa4, 0xc9, 0x96, 0xb6, 0x82, 0x1b, - 0x37, 0x7b, 0xde, 0xf7, 0xbe, 0x79, 0xf3, 0xbe, 0xef, 0x8d, 0x0d, 0xaf, 0x04, 0x21, 0x65, 0xd4, - 0x88, 0xfc, 0x60, 0xdf, 0x08, 0x49, 0xe0, 0xb9, 0xb6, 0xc5, 0x68, 0xd8, 0x14, 0xab, 0xa8, 0xf4, - 0xb5, 0x15, 0x7a, 0xd4, 0x69, 0xf2, 0x68, 0x55, 0x77, 0x28, 0x75, 0x3c, 0x62, 0x88, 0xd0, 0x7e, - 0xff, 0xb1, 0xc1, 0xdc, 0x1e, 0x89, 0x98, 0xd5, 0x0b, 0x62, 0x74, 0xf5, 0xae, 0xe3, 0xb2, 0x2f, - 0xfb, 0xfb, 0x4d, 0x9b, 0xf6, 0x0c, 0x87, 0x3a, 0x74, 0x84, 0xe4, 0x6f, 0xf1, 0x3e, 0xfc, 0x49, - 0xc2, 0x97, 0x62, 0xf2, 0x60, 0xdf, 0xe8, 0x11, 0x66, 0x75, 0x2d, 0x66, 0xc5, 0x81, 0xfa, 0xaf, - 0x59, 0x28, 0x63, 0x59, 0x0a, 0xc1, 0xe4, 0xab, 0x3e, 0x89, 0x18, 0x32, 0xa1, 0xc0, 0x68, 0xe0, - 0xda, 0x7b, 0x6e, 0xb7, 0xa2, 0xd4, 0x94, 0x46, 0x7e, 0xed, 0xc1, 0x70, 0xa0, 0xcf, 0xec, 0xf2, - 0xb5, 0xf6, 0x87, 0xcf, 0x06, 0xfa, 0x1b, 0x63, 0xbb, 0x1f, 0x58, 0x07, 0x16, 0x35, 0x62, 0x7e, - 0x23, 0x38, 0x70, 0x0c, 0x76, 0x18, 0x90, 0xa8, 0x29, 0xc1, 0x78, 0x46, 0x30, 0xb5, 0xbb, 0xa8, - 0x0b, 0x73, 0x1e, 0x75, 0xf6, 0x22, 0x16, 0x12, 0xab, 0xc7, 0x99, 0xb3, 0x82, 0xf9, 0xfd, 0xe1, - 0x40, 0x2f, 0x75, 0xa8, 0x63, 0x8a, 0x75, 0xc1, 0x7e, 0xf7, 0x6a, 0xf6, 0xb1, 0x04, 0x5c, 0xf2, - 0xd2, 0x97, 0x2e, 0xda, 0x00, 0xd5, 0xf3, 0x22, 0xbf, 0x92, 0xab, 0xe5, 0x1a, 0xea, 0xda, 0xca, - 0x70, 0xa0, 0xab, 0x9d, 0x8e, 0xb9, 0xf5, 0x6c, 0xa0, 0xbf, 0x7e, 0x0d, 0xd6, 0x8e, 0xb9, 0x85, - 0x45, 0x3e, 0x42, 0xa0, 0xf2, 0x2e, 0x55, 0xd4, 0x5a, 0xae, 0x31, 0x8b, 0xc5, 0xf3, 0xea, 0xec, - 0x93, 0x13, 0x5d, 0xf9, 0xf3, 0x44, 0x57, 0xfe, 0x3a, 0xd1, 0x95, 0xfa, 0x22, 0x2c, 0x8c, 0x35, - 0x2e, 0x0a, 0xa8, 0x1f, 0x91, 0xfa, 0xa9, 0x02, 0xb3, 0xe6, 0xa1, 0x6f, 0xef, 0xd0, 0xc8, 0x65, - 0x2e, 0xf5, 0xd3, 0x7a, 0x78, 0x1b, 0xff, 0x4d, 0x3d, 0x1b, 0xa0, 0x3a, 0x9c, 0x27, 0x3b, 0xe2, - 0xd9, 0xbc, 0x36, 0xcf, 0xa6, 0xe0, 0xe1, 0xf9, 0xab, 0x2a, 0xaf, 0xbf, 0xfe, 0x44, 0x81, 0x22, - 0x2f, 0x13, 0x5b, 0xbe, 0x43, 0xd0, 0x27, 0x00, 0x8f, 0xdd, 0x30, 0x62, 0x7b, 0x63, 0x95, 0xbe, - 0x3b, 0x1c, 0xe8, 0xc5, 0x0d, 0xbe, 0x7a, 0xc3, 0x72, 0x8b, 0x82, 0xaa, 0xc3, 0x6b, 0x36, 0xa1, - 0xe8, 0x59, 0x09, 0x6d, 0x5c, 0xf8, 0xfd, 0xe1, 0x40, 0x2f, 0x74, 0xac, 0x1b, 0xb3, 0x16, 0x38, - 0x11, 0x27, 0xad, 0xff, 0x90, 0x83, 0x17, 0x78, 0xe9, 0x6d, 0xdf, 0x65, 0x89, 0x5f, 0x3f, 0x07, - 0xb0, 0xbd, 0x7e, 0xc4, 0x48, 0x38, 0x72, 0xec, 0x43, 0x7e, 0x80, 0xf5, 0x78, 0x55, 0xb8, 0xea, - 0xad, 0xab, 0xb7, 0x4a, 0xe1, 0xb8, 0x28, 0xf9, 0xda, 0x5d, 0xf4, 0x08, 0x6e, 0x45, 0xb4, 0x1f, - 0xda, 0x44, 0x1c, 0xa1, 0xb4, 0x72, 0xa7, 0x29, 0x07, 0x35, 0x19, 0xa9, 0x91, 0x19, 0xa5, 0x1f, - 0xd6, 0xd4, 0xb3, 0x81, 0x9e, 0xc1, 0x32, 0x0d, 0xb5, 0xa1, 0xd4, 0x25, 0x11, 0x73, 0x7d, 0x8b, - 0x3b, 0xa2, 0x92, 0xbb, 0x19, 0xcb, 0x78, 0x2e, 0x5a, 0x81, 0x7c, 0xc8, 0x25, 0xab, 0xa8, 0x82, - 0xe4, 0xa5, 0xe6, 0xd8, 0x9d, 0xd1, 0x4c, 0x05, 0x95, 0x99, 0x31, 0x14, 0x51, 0x58, 0x14, 0x2a, - 0xd8, 0xb4, 0xd7, 0x73, 0x19, 0x23, 0xdd, 0x58, 0x8f, 0xbc, 0xd0, 0xe3, 0xd1, 0x70, 0xa0, 0x2f, - 0x70, 0x3d, 0xd6, 0x93, 0xe8, 0x0d, 0x85, 0x59, 0xf0, 0x2e, 0x24, 0x73, 0x85, 0x36, 0xa0, 0x3c, - 0x12, 0x28, 0x9e, 0x8b, 0x51, 0xe1, 0xca, 0xb5, 0x0b, 0xaf, 0xff, 0xa1, 0x00, 0xf0, 0x90, 0xc9, - 0x2c, 0xd6, 0x8f, 0xd0, 0xdb, 0x90, 0x8f, 0x98, 0xc5, 0x62, 0x8a, 0xf9, 0xe7, 0x50, 0x70, 0x1c, - 0xc1, 0x31, 0x08, 0xbd, 0x03, 0x79, 0x61, 0x44, 0x29, 0xda, 0xcb, 0x53, 0xe8, 0x64, 0x42, 0x93, - 0x3d, 0x05, 0x1a, 0xdd, 0x03, 0x95, 0x1f, 0x48, 0x8a, 0x74, 0x65, 0x96, 0x00, 0xa3, 0xf7, 0x60, - 0xc6, 0xee, 0x87, 0x21, 0xf1, 0x99, 0xd4, 0xe5, 0xca, 0xbc, 0x04, 0x5f, 0xff, 0x5e, 0x81, 0x92, - 0x88, 0x5b, 0x87, 0x1e, 0xb5, 0xba, 0xa8, 0x05, 0xf3, 0xb1, 0x4e, 0x7b, 0x36, 0xf5, 0x19, 0xf9, - 0x86, 0xc9, 0x86, 0x69, 0x53, 0x76, 0x89, 0x7b, 0xbe, 0x1e, 0xa3, 0xf0, 0x9c, 0x3d, 0xfe, 0x8a, - 0xee, 0x43, 0x91, 0xdf, 0xb5, 0xc4, 0x67, 0xe1, 0xe1, 0x64, 0x07, 0xc6, 0x0d, 0xd7, 0xe2, 0x00, - 0x5c, 0xf0, 0xe4, 0xd3, 0xaa, 0x7a, 0xc6, 0x6f, 0x87, 0xdf, 0xb2, 0xf0, 0xa2, 0xd0, 0x64, 0xf2, - 0xbb, 0xf0, 0xbf, 0x99, 0xb3, 0x07, 0x30, 0x13, 0xc4, 0x8a, 0x48, 0x45, 0x2b, 0xd3, 0x8a, 0xc6, - 0xf1, 0x44, 0x50, 0x09, 0xaf, 0x7f, 0x04, 0xb7, 0x27, 0x5a, 0x27, 0x27, 0xc0, 0x80, 0x5b, 0x91, - 0x30, 0xb2, 0x54, 0x74, 0xe9, 0xb9, 0xfe, 0xed, 0x47, 0x58, 0xc2, 0xde, 0xfc, 0x49, 0xde, 0xd1, - 0xc2, 0xd6, 0x68, 0x19, 0xf2, 0x2d, 0x8c, 0xb7, 0x71, 0x39, 0x53, 0x45, 0x47, 0xc7, 0xb5, 0xf9, - 0x34, 0xd2, 0x0a, 0x43, 0x1a, 0xa2, 0x06, 0x94, 0xda, 0x5b, 0x7b, 0x3b, 0x78, 0x7b, 0x13, 0xb7, - 0x4c, 0xb3, 0xac, 0x54, 0x97, 0x8e, 0x8e, 0x6b, 0x8b, 0x29, 0xa8, 0xed, 0xef, 0x84, 0xd4, 0x09, - 0x49, 0x14, 0xa1, 0xd7, 0xa0, 0xb0, 0xbe, 0xfd, 0xf1, 0x4e, 0xa7, 0xb5, 0xdb, 0x2a, 0x67, 0xab, - 0xb7, 0x8f, 0x8e, 0x6b, 0x0b, 0x29, 0x6c, 0x9d, 0xf6, 0x02, 0x8f, 0xc4, 0xbb, 0x99, 0xbb, 0x1f, - 0xe0, 0xdd, 0x72, 0x6e, 0x62, 0x37, 0x93, 0x59, 0x21, 0xab, 0xce, 0x7e, 0xfb, 0xa3, 0x96, 0xf9, - 0xf9, 0x54, 0xcb, 0xfc, 0x72, 0xaa, 0x29, 0x2b, 0xe7, 0x59, 0x00, 0x9c, 0xfe, 0xcd, 0xa0, 0x2d, - 0x28, 0xa6, 0xa7, 0x47, 0xcb, 0x17, 0x4e, 0x39, 0x69, 0xa8, 0xaa, 0x76, 0x59, 0x58, 0x7e, 0x4e, - 0x33, 0x0d, 0x05, 0xb5, 0xa1, 0x90, 0x5c, 0x27, 0xe8, 0xd5, 0xa9, 0xa6, 0x8d, 0x7d, 0x06, 0xaa, - 0xcb, 0x97, 0x44, 0x13, 0x32, 0xf4, 0x29, 0xcc, 0x5d, 0x10, 0x07, 0xdd, 0x99, 0xbe, 0x87, 0x26, - 0x4b, 0xac, 0xff, 0x13, 0x24, 0x65, 0xfe, 0x02, 0x16, 0x2f, 0x84, 0x62, 0x87, 0xfd, 0x67, 0xfc, - 0x0d, 0x65, 0xed, 0xe1, 0xd9, 0x50, 0x53, 0x9e, 0x0e, 0x35, 0xe5, 0xbb, 0x73, 0x2d, 0x73, 0x72, - 0xae, 0x29, 0x4f, 0xcf, 0xb5, 0xcc, 0xef, 0xe7, 0x5a, 0xe6, 0xb3, 0xfa, 0xa5, 0x03, 0x97, 0xfe, - 0x6d, 0xee, 0xdf, 0x12, 0xcf, 0xf7, 0xfe, 0x0e, 0x00, 0x00, 0xff, 0xff, 0x1f, 0x25, 0x79, 0x9a, - 0x82, 0x0a, 0x00, 0x00, + // 1042 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe4, 0x56, 0x4f, 0x6f, 0xe3, 0x44, + 0x14, 0x8f, 0x1b, 0xa7, 0x4d, 0x5e, 0xda, 0x92, 0x4e, 0x59, 0x1a, 0x02, 0xb5, 0xb3, 0x41, 0x42, + 0xe1, 0xcf, 0x26, 0x52, 0x17, 0x96, 0xa5, 0x5a, 0x69, 0x21, 0x25, 0xed, 0x46, 0x0a, 0x6d, 0x35, + 0xae, 0x10, 0x82, 0x43, 0x70, 0x92, 0x59, 0x63, 0xd5, 0xf1, 0x18, 0xcf, 0x04, 0xd1, 0x6f, 0x80, + 0x7a, 0x02, 0xee, 0x15, 0x95, 0xa8, 0x10, 0x47, 0x8e, 0xcb, 0x37, 0xe8, 0x71, 0x8f, 0x9c, 0x22, + 0x91, 0x5e, 0xf8, 0x08, 0x68, 0x4f, 0x68, 0xc6, 0x93, 0x3f, 0x4d, 0xb6, 0xb4, 0x91, 0xb8, 0xed, + 0xcd, 0x9e, 0xf7, 0x7b, 0xbf, 0xf7, 0xe7, 0xf7, 0xde, 0xd8, 0xf0, 0x5a, 0x10, 0x52, 0x4e, 0xcb, + 0xcc, 0x0f, 0x9a, 0xe5, 0x90, 0x04, 0x9e, 0xdb, 0xb2, 0x39, 0x0d, 0x4b, 0xf2, 0x14, 0xa5, 0xbf, + 0xb5, 0x43, 0x8f, 0x3a, 0x25, 0x61, 0xcd, 0x99, 0x0e, 0xa5, 0x8e, 0x47, 0xca, 0xd2, 0xd4, 0xec, + 0x3e, 0x2e, 0x73, 0xb7, 0x43, 0x18, 0xb7, 0x3b, 0x41, 0x84, 0xce, 0xdd, 0x71, 0x5c, 0xfe, 0x75, + 0xb7, 0x59, 0x6a, 0xd1, 0x4e, 0xd9, 0xa1, 0x0e, 0x1d, 0x21, 0xc5, 0x5b, 0x14, 0x47, 0x3c, 0x29, + 0xf8, 0x5a, 0x44, 0x1e, 0x34, 0xcb, 0x1d, 0xc2, 0xed, 0xb6, 0xcd, 0xed, 0xc8, 0x50, 0xf8, 0x31, + 0x0e, 0x19, 0xac, 0x52, 0x21, 0x98, 0x7c, 0xd3, 0x25, 0x8c, 0x23, 0x0b, 0x92, 0x9c, 0x06, 0x6e, + 0xab, 0xe1, 0xb6, 0xb3, 0x5a, 0x5e, 0x2b, 0x26, 0x2a, 0xf7, 0xfb, 0x3d, 0x73, 0xe1, 0x40, 0x9c, + 0xd5, 0x3e, 0x79, 0xd6, 0x33, 0xdf, 0x1a, 0x8b, 0x7e, 0x68, 0x1f, 0xda, 0xb4, 0x1c, 0xf1, 0x97, + 0x83, 0x43, 0xa7, 0xcc, 0x8f, 0x02, 0xc2, 0x4a, 0x0a, 0x8c, 0x17, 0x24, 0x53, 0xad, 0x8d, 0xda, + 0xb0, 0xe4, 0x51, 0xa7, 0xc1, 0x78, 0x48, 0xec, 0x8e, 0x60, 0x9e, 0x93, 0xcc, 0x1f, 0xf5, 0x7b, + 0x66, 0xba, 0x4e, 0x1d, 0x4b, 0x9e, 0x4b, 0xf6, 0x3b, 0xd7, 0xb3, 0x8f, 0x39, 0xe0, 0xb4, 0x37, + 0x7c, 0x69, 0xa3, 0x47, 0xa0, 0x7b, 0x1e, 0xf3, 0xb3, 0xf1, 0x7c, 0xbc, 0xa8, 0x57, 0xde, 0xeb, + 0xf7, 0x4c, 0xbd, 0x5e, 0xb7, 0x76, 0x9f, 0xf5, 0xcc, 0x37, 0x6f, 0xc0, 0x5a, 0xb7, 0x76, 0xb3, + 0x1a, 0x96, 0x0c, 0x08, 0x81, 0x2e, 0xfa, 0x94, 0xd5, 0xf3, 0xf1, 0xe2, 0x22, 0x96, 0xcf, 0xe8, + 0x33, 0x80, 0x26, 0x71, 0x5c, 0xbf, 0x21, 0x63, 0x24, 0xf2, 0x5a, 0x51, 0xaf, 0x7c, 0xd0, 0xef, + 0x99, 0xa9, 0x8a, 0x38, 0x9d, 0x2d, 0x10, 0x4e, 0x49, 0xaa, 0xba, 0xc7, 0xfc, 0xcd, 0xc5, 0x27, + 0xa7, 0xa6, 0xf6, 0xf7, 0xa9, 0xa9, 0xfd, 0x73, 0x6a, 0x6a, 0x85, 0x55, 0x58, 0x19, 0x93, 0x84, + 0x05, 0xd4, 0x67, 0xa4, 0x70, 0xa6, 0xc1, 0xa2, 0x75, 0xe4, 0xb7, 0xf6, 0x29, 0x73, 0xb9, 0x4b, + 0x7d, 0xb4, 0xad, 0x2a, 0xd5, 0x64, 0x16, 0x1b, 0xb3, 0x57, 0xaa, 0xea, 0xdc, 0x06, 0xdd, 0x11, + 0x3c, 0x73, 0x23, 0x9e, 0x9d, 0x1b, 0xf3, 0xec, 0x48, 0x1e, 0xe1, 0xbf, 0xa9, 0x8b, 0xfc, 0x0b, + 0x4f, 0x34, 0x48, 0x89, 0x34, 0xb1, 0xed, 0x3b, 0x44, 0xf4, 0xeb, 0xb1, 0x1b, 0x32, 0xde, 0x18, + 0xcb, 0x54, 0xf6, 0x6b, 0x5b, 0x9c, 0xce, 0xda, 0x2f, 0x49, 0x25, 0xfa, 0x85, 0x2c, 0x48, 0x79, + 0xf6, 0x80, 0x36, 0x4a, 0xfc, 0x5e, 0xbf, 0x67, 0x26, 0xeb, 0xf6, 0xcc, 0xac, 0x49, 0x41, 0x24, + 0x48, 0x0b, 0x3f, 0xc7, 0xe1, 0x25, 0x91, 0x7a, 0xcd, 0x77, 0xf9, 0x60, 0x13, 0xbe, 0x04, 0x68, + 0x79, 0x5d, 0xc6, 0x49, 0x38, 0xda, 0x85, 0x07, 0xa2, 0x80, 0xad, 0xe8, 0x54, 0xce, 0xeb, 0x3b, + 0xd7, 0x87, 0x1a, 0xc2, 0x71, 0x4a, 0xf1, 0xd5, 0xda, 0xe8, 0x21, 0xcc, 0x33, 0xda, 0x0d, 0x5b, + 0x44, 0x96, 0x90, 0xde, 0xb8, 0x5d, 0x52, 0x57, 0xc0, 0x60, 0x59, 0x47, 0x63, 0xae, 0xe6, 0xa1, + 0xa2, 0x9f, 0xf7, 0xcc, 0x18, 0x56, 0x6e, 0xa8, 0x06, 0xe9, 0x36, 0x61, 0xdc, 0xf5, 0x6d, 0x31, + 0x11, 0xd9, 0xf8, 0x6c, 0x2c, 0xe3, 0xbe, 0x68, 0x03, 0x12, 0xa1, 0x90, 0x2c, 0xab, 0x4b, 0x92, + 0x57, 0x4a, 0x63, 0xb7, 0x51, 0x69, 0x28, 0xa8, 0xf2, 0x8c, 0xa0, 0x88, 0xc2, 0xaa, 0x54, 0xa1, + 0x45, 0x3b, 0x1d, 0x97, 0x73, 0xd2, 0x1e, 0x5f, 0x8b, 0x87, 0xfd, 0x9e, 0xb9, 0x22, 0xf4, 0xd8, + 0x1a, 0x58, 0x67, 0x14, 0x66, 0xc5, 0xbb, 0xe4, 0x2c, 0x14, 0xda, 0x86, 0xcc, 0x48, 0xa0, 0x68, + 0x2f, 0x46, 0x89, 0x6b, 0x37, 0x4e, 0xbc, 0xf0, 0x97, 0x06, 0x20, 0x4c, 0x16, 0xb7, 0x79, 0x97, + 0xa1, 0x77, 0x21, 0xc1, 0xb8, 0xcd, 0x23, 0x8a, 0xe5, 0xe7, 0x50, 0x08, 0x1c, 0xc1, 0x11, 0x08, + 0xbd, 0x0f, 0x09, 0x39, 0x88, 0x4a, 0xb4, 0x57, 0xa7, 0xd0, 0x83, 0x0d, 0x1d, 0xc4, 0x94, 0x68, + 0x74, 0x17, 0x74, 0x51, 0x90, 0x12, 0xe9, 0x5a, 0x2f, 0x09, 0x46, 0x1f, 0xc2, 0x42, 0xab, 0x1b, + 0x86, 0xc4, 0xe7, 0x4a, 0x97, 0x6b, 0xfd, 0x06, 0xf8, 0xc2, 0x4f, 0x1a, 0xa4, 0xa5, 0xdd, 0x3e, + 0xf2, 0xa8, 0xdd, 0x46, 0x55, 0x58, 0x8e, 0x74, 0x6a, 0xb4, 0xa8, 0xcf, 0xc9, 0x77, 0x5c, 0x35, + 0xcc, 0x98, 0x1a, 0x97, 0xa8, 0xe7, 0x5b, 0x11, 0x0a, 0x2f, 0xb5, 0xc6, 0x5f, 0xd1, 0x3d, 0x48, + 0x89, 0x5b, 0x9c, 0xf8, 0x3c, 0x3c, 0x9a, 0xec, 0xc0, 0xf8, 0xc0, 0x55, 0x05, 0x00, 0x27, 0x3d, + 0xf5, 0xb4, 0xa9, 0x9f, 0x8b, 0xdb, 0xe1, 0x8f, 0x39, 0x78, 0x59, 0x6a, 0x32, 0xf9, 0xc5, 0x79, + 0x61, 0xf6, 0xec, 0x3e, 0x2c, 0x04, 0x91, 0x22, 0x4a, 0xd1, 0xec, 0xb4, 0xa2, 0x91, 0x7d, 0x20, + 0xa8, 0x82, 0x17, 0x1e, 0xc1, 0xad, 0x89, 0xd6, 0xa9, 0x0d, 0x28, 0xc3, 0x3c, 0x93, 0x83, 0xac, + 0x14, 0x5d, 0x7b, 0xee, 0xfc, 0x76, 0x19, 0x56, 0xb0, 0xb7, 0x7f, 0x55, 0x77, 0xb4, 0x1c, 0x6b, + 0xb4, 0x0e, 0x89, 0x2a, 0xc6, 0x7b, 0x38, 0x13, 0xcb, 0xa1, 0xe3, 0x93, 0xfc, 0xf2, 0xd0, 0x52, + 0x0d, 0x43, 0x1a, 0xa2, 0x22, 0xa4, 0x6b, 0xbb, 0x8d, 0x7d, 0xbc, 0xb7, 0x83, 0xab, 0x96, 0x95, + 0xd1, 0x72, 0x6b, 0xc7, 0x27, 0xf9, 0xd5, 0x21, 0xa8, 0xe6, 0xef, 0x87, 0xd4, 0x09, 0x09, 0x63, + 0xe8, 0x0d, 0x48, 0x6e, 0xed, 0x7d, 0xba, 0x5f, 0xaf, 0x1e, 0x54, 0x33, 0x73, 0xb9, 0x5b, 0xc7, + 0x27, 0xf9, 0x95, 0x21, 0x6c, 0x8b, 0x76, 0x02, 0x8f, 0x44, 0xd1, 0xac, 0x83, 0x8f, 0xf1, 0x41, + 0x26, 0x3e, 0x11, 0xcd, 0xe2, 0x76, 0xc8, 0x73, 0x8b, 0xdf, 0xff, 0x62, 0xc4, 0x7e, 0x3b, 0x33, + 0x62, 0xbf, 0x9f, 0x19, 0xda, 0xc6, 0xc5, 0x1c, 0x00, 0x1e, 0xfe, 0x27, 0xa1, 0x5d, 0x48, 0x0d, + 0xab, 0x47, 0xeb, 0x97, 0xaa, 0x9c, 0x1c, 0xa8, 0x9c, 0x71, 0x95, 0x59, 0x7d, 0x4e, 0x63, 0x45, + 0x0d, 0xd5, 0x20, 0x39, 0xb8, 0x4e, 0xd0, 0xeb, 0x53, 0x4d, 0x1b, 0xfb, 0x0c, 0xe4, 0xd6, 0xaf, + 0xb0, 0x0e, 0xc8, 0xd0, 0xe7, 0xb0, 0x74, 0x49, 0x1c, 0x74, 0x7b, 0xfa, 0x1e, 0x9a, 0x4c, 0xb1, + 0xf0, 0x5f, 0x90, 0x21, 0xf3, 0x57, 0xb0, 0x7a, 0xc9, 0x14, 0x4d, 0xd8, 0xff, 0xc6, 0x5f, 0xd4, + 0x2a, 0x0f, 0xce, 0xfb, 0x86, 0xf6, 0xb4, 0x6f, 0x68, 0x3f, 0x5c, 0x18, 0xb1, 0xd3, 0x0b, 0x43, + 0x7b, 0x7a, 0x61, 0xc4, 0xfe, 0xbc, 0x30, 0x62, 0x5f, 0x14, 0xae, 0x5c, 0xb8, 0xe1, 0x7f, 0x6c, + 0x73, 0x5e, 0x3e, 0xdf, 0xfd, 0x37, 0x00, 0x00, 0xff, 0xff, 0x92, 0x00, 0xbc, 0x2a, 0xdc, 0x0a, + 0x00, 0x00, } func (x SyncState) String() string { @@ -779,6 +793,9 @@ func (this *ReplicateRequest) Equal(that interface{}) bool { return false } } + if this.BeginLLSN != that1.BeginLLSN { + return false + } return true } func (this *SyncPosition) Equal(that interface{}) bool { @@ -1178,6 +1195,11 @@ func (m *ReplicateRequest) MarshalToSizedBuffer(dAtA []byte) (int, error) { _ = i var l int _ = l + if m.BeginLLSN != 0 { + i = encodeVarintReplicator(dAtA, i, uint64(m.BeginLLSN)) + i-- + dAtA[i] = 0x28 + } if len(m.Data) > 0 { for iNdEx := len(m.Data) - 1; iNdEx >= 0; iNdEx-- { i -= len(m.Data[iNdEx]) @@ -1636,6 +1658,7 @@ func NewPopulatedReplicateRequest(r randyReplicator, easy bool) *ReplicateReques this.Data[i][j] = byte(r.Intn(256)) } } + this.BeginLLSN = github_com_kakao_varlog_pkg_types.LLSN(uint64(r.Uint32())) if !easy && r.Intn(10) != 0 { } return this @@ -1738,6 +1761,9 @@ func (m *ReplicateRequest) ProtoSize() (n int) { n += 1 + l + sovReplicator(uint64(l)) } } + if m.BeginLLSN != 0 { + n += 1 + sovReplicator(uint64(m.BeginLLSN)) + } return n } @@ -2080,6 +2106,25 @@ func (m *ReplicateRequest) Unmarshal(dAtA []byte) error { m.Data = append(m.Data, make([]byte, postIndex-iNdEx)) copy(m.Data[len(m.Data)-1], dAtA[iNdEx:postIndex]) iNdEx = postIndex + case 5: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field BeginLLSN", wireType) + } + m.BeginLLSN = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowReplicator + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.BeginLLSN |= github_com_kakao_varlog_pkg_types.LLSN(b&0x7F) << shift + if b < 0x80 { + break + } + } default: iNdEx = preIndex skippy, err := skipReplicator(dAtA[iNdEx:]) diff --git a/proto/snpb/replicator.proto b/proto/snpb/replicator.proto index 0e8c3a637..6b4048cbc 100644 --- a/proto/snpb/replicator.proto +++ b/proto/snpb/replicator.proto @@ -29,11 +29,19 @@ message ReplicateRequest { (gogoproto.casttype) = "github.com/kakao/varlog/pkg/types.LogStreamID", (gogoproto.customname) = "LogStreamID" ]; + // LLSN is a list of local log sequence numbers where the log entries are + // replicated. The primary replica fills this field for backward + // compatibility, but it will disappear soon. repeated uint64 llsn = 3 [ + deprecated = true, (gogoproto.casttype) = "github.com/kakao/varlog/pkg/types.LLSN", (gogoproto.customname) = "LLSN" ]; repeated bytes data = 4; + uint64 begin_llsn = 5 [ + (gogoproto.casttype) = "github.com/kakao/varlog/pkg/types.LLSN", + (gogoproto.customname) = "BeginLLSN" + ]; } message ReplicateResponse {}