From d04a0dc1efb5c057e89c66fb8086b4d9f1c122a7 Mon Sep 17 00:00:00 2001 From: Arkadip Date: Tue, 12 Aug 2025 17:52:25 +0530 Subject: [PATCH 1/7] feat: introduce `CommitLogReaderInitializer` interface for initialization logic --- .../datastax/oss/cdc/agent/CommitLogReaderInitializer.java | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 agent/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderInitializer.java diff --git a/agent/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderInitializer.java b/agent/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderInitializer.java new file mode 100644 index 00000000..3f5df342 --- /dev/null +++ b/agent/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderInitializer.java @@ -0,0 +1,5 @@ +package com.datastax.oss.cdc.agent; + +public interface CommitLogReaderInitializer { + void initialize(AgentConfig config, CommitLogReaderService commitLogReaderService) throws Exception; +} From fd0259ce806c8bdfb3fa7c0d22e8d93359dfeeeb Mon Sep 17 00:00:00 2001 From: Arkadip Date: Tue, 12 Aug 2025 17:52:35 +0530 Subject: [PATCH 2/7] feat: implement `CommitLogReaderInitializerImpl` for agent initialization across multiple versions --- .../agent/CommitLogReaderInitializerImpl.java | 8 +++++ .../agent/CommitLogReaderInitializerImpl.java | 32 +++++++++++++++++++ .../agent/CommitLogReaderInitializerImpl.java | 32 +++++++++++++++++++ 3 files changed, 72 insertions(+) create mode 100644 agent-c3/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderInitializerImpl.java create mode 100644 agent-c4/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderInitializerImpl.java create mode 100644 agent-dse4/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderInitializerImpl.java diff --git a/agent-c3/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderInitializerImpl.java b/agent-c3/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderInitializerImpl.java new file mode 100644 index 00000000..6259e1ce --- /dev/null +++ b/agent-c3/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderInitializerImpl.java @@ -0,0 +1,8 @@ +package com.datastax.oss.cdc.agent; + +public class CommitLogReaderInitializerImpl implements CommitLogReaderInitializer { + @Override + public void initialize(AgentConfig config, CommitLogReaderService commitLogReaderService) throws Exception { + + } +} diff --git a/agent-c4/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderInitializerImpl.java b/agent-c4/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderInitializerImpl.java new file mode 100644 index 00000000..edeb9b17 --- /dev/null +++ b/agent-c4/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderInitializerImpl.java @@ -0,0 +1,32 @@ +package com.datastax.oss.cdc.agent; + +import java.io.File; +import java.io.IOException; + +import static com.datastax.oss.cdc.agent.CommitLogReaderService.ARCHIVE_FOLDER; +import static com.datastax.oss.cdc.agent.CommitLogReaderService.ERROR_FOLDER; + +public class CommitLogReaderInitializerImpl implements CommitLogReaderInitializer { + @Override + public void initialize(AgentConfig config, CommitLogReaderService commitLogReaderService) throws Exception { + File relocationDir = new File(config.cdcWorkingDir); + if (!relocationDir.exists()) { + if (!relocationDir.mkdir()) { + throw new IOException("Failed to create " + config.cdcWorkingDir); + } + } + + File archiveDir = new File(relocationDir, ARCHIVE_FOLDER); + if (!archiveDir.exists()) { + if (!archiveDir.mkdir()) { + throw new IOException("Failed to create " + archiveDir); + } + } + File errorDir = new File(relocationDir, ERROR_FOLDER); + if (!errorDir.exists()) { + if (!errorDir.mkdir()) { + throw new IOException("Failed to create " + errorDir); + } + } + } +} diff --git a/agent-dse4/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderInitializerImpl.java b/agent-dse4/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderInitializerImpl.java new file mode 100644 index 00000000..9914acec --- /dev/null +++ b/agent-dse4/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderInitializerImpl.java @@ -0,0 +1,32 @@ +package com.datastax.oss.cdc.agent; + +import java.io.File; +import java.io.IOException; + +import static com.datastax.oss.cdc.agent.CommitLogReaderService.ARCHIVE_FOLDER; +import static com.datastax.oss.cdc.agent.CommitLogReaderService.ERROR_FOLDER; + +public class CommitLogReaderInitializerImpl implements CommitLogReaderInitializer { + @Override + public void initialize(AgentConfig config) throws Exception { + File relocationDir = new File(config.cdcWorkingDir); + if (!relocationDir.exists()) { + if (!relocationDir.mkdir()) { + throw new IOException("Failed to create " + config.cdcWorkingDir); + } + } + + File archiveDir = new File(relocationDir, ARCHIVE_FOLDER); + if (!archiveDir.exists()) { + if (!archiveDir.mkdir()) { + throw new IOException("Failed to create " + archiveDir); + } + } + File errorDir = new File(relocationDir, ERROR_FOLDER); + if (!errorDir.exists()) { + if (!errorDir.mkdir()) { + throw new IOException("Failed to create " + errorDir); + } + } + } +} From 6d798109c944fe33cef0b7635a75a931ef5d017e Mon Sep 17 00:00:00 2001 From: Arkadip Date: Tue, 12 Aug 2025 17:52:51 +0530 Subject: [PATCH 3/7] refactor: delegate initialization logic to `CommitLogReaderInitializer` --- .../oss/cdc/agent/CommitLogReaderService.java | 28 ++++--------------- 1 file changed, 5 insertions(+), 23 deletions(-) diff --git a/agent/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderService.java b/agent/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderService.java index 41a8975d..b9c83fab 100644 --- a/agent/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderService.java +++ b/agent/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderService.java @@ -21,7 +21,6 @@ import lombok.extern.slf4j.Slf4j; import java.io.File; -import java.io.IOException; import java.nio.charset.Charset; import java.nio.file.Files; import java.util.List; @@ -30,8 +29,6 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.IntBinaryOperator; -import java.util.function.LongBinaryOperator; @Slf4j public abstract class CommitLogReaderService implements Runnable, AutoCloseable @@ -72,6 +69,7 @@ public abstract class CommitLogReaderService implements Runnable, AutoCloseable * ordered commitlog file queue. */ final PriorityBlockingQueue commitLogQueue; + private final CommitLogReaderInitializer commitLogReaderInitializer; /** * Consumes commitlog files in parallel. @@ -81,11 +79,13 @@ public abstract class CommitLogReaderService implements Runnable, AutoCloseable public CommitLogReaderService(AgentConfig config, MutationSender mutationSender, SegmentOffsetWriter segmentOffsetWriter, - CommitLogTransfer commitLogTransfer) { + CommitLogTransfer commitLogTransfer, + CommitLogReaderInitializer commitLogReaderInitializer) { this.config = config; this.mutationSender = mutationSender; this.segmentOffsetWriter = segmentOffsetWriter; this.commitLogTransfer = commitLogTransfer; + this.commitLogReaderInitializer = commitLogReaderInitializer; this.commitLogQueue = new PriorityBlockingQueue<>(128, CommitLogUtil::compareCommitLogs); } @@ -150,25 +150,7 @@ public void submitCommitLog(File file) { } public void initialize() throws Exception { - File relocationDir = new File(config.cdcWorkingDir); - if (!relocationDir.exists()) { - if (!relocationDir.mkdir()) { - throw new IOException("Failed to create " + config.cdcWorkingDir); - } - } - - File archiveDir = new File(relocationDir, ARCHIVE_FOLDER); - if (!archiveDir.exists()) { - if (!archiveDir.mkdir()) { - throw new IOException("Failed to create " + archiveDir); - } - } - File errorDir = new File(relocationDir, ERROR_FOLDER); - if (!errorDir.exists()) { - if (!errorDir.mkdir()) { - throw new IOException("Failed to create " + errorDir); - } - } + commitLogReaderInitializer.initialize(config, this); } /** From e802f576fa74bc99a54e119919a06a3aee2264ef Mon Sep 17 00:00:00 2001 From: Arkadip Date: Tue, 12 Aug 2025 17:54:47 +0530 Subject: [PATCH 4/7] refactor: update `CommitLogReaderServiceImpl` constructors to include `CommitLogReaderInitializer` --- .../datastax/oss/cdc/agent/CommitLogReaderServiceImpl.java | 5 +++-- .../datastax/oss/cdc/agent/CommitLogReaderServiceImpl.java | 6 +++--- .../datastax/oss/cdc/agent/CommitLogReaderServiceImpl.java | 5 +++-- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/agent-c3/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderServiceImpl.java b/agent-c3/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderServiceImpl.java index 5ddd88f8..b7996845 100644 --- a/agent-c3/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderServiceImpl.java +++ b/agent-c3/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderServiceImpl.java @@ -35,8 +35,9 @@ public class CommitLogReaderServiceImpl extends CommitLogReaderService { public CommitLogReaderServiceImpl(AgentConfig config, MutationSender mutationSender, SegmentOffsetWriter segmentOffsetWriter, - CommitLogTransfer commitLogTransfer) { - super(config, mutationSender, segmentOffsetWriter, commitLogTransfer); + CommitLogTransfer commitLogTransfer, + CommitLogReaderInitializer commitLogReaderInitializer) { + super(config, mutationSender, segmentOffsetWriter, commitLogTransfer, commitLogReaderInitializer); this.tasksExecutor = new JMXEnabledThreadPoolExecutor( config.cdcConcurrentProcessors == -1 ? DatabaseDescriptor.getFlushWriters() : config.cdcConcurrentProcessors, config.cdcConcurrentProcessors == -1 ? DatabaseDescriptor.getFlushWriters() : config.cdcConcurrentProcessors, diff --git a/agent-c4/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderServiceImpl.java b/agent-c4/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderServiceImpl.java index cf317408..0997f8e4 100644 --- a/agent-c4/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderServiceImpl.java +++ b/agent-c4/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderServiceImpl.java @@ -26,7 +26,6 @@ import java.io.File; import java.util.Optional; import java.util.concurrent.*; -import java.util.function.IntBinaryOperator; /** * Consume a queue of commitlog files to read mutations. @@ -37,8 +36,9 @@ public class CommitLogReaderServiceImpl extends CommitLogReaderService { public CommitLogReaderServiceImpl(AgentConfig config, MutationSender mutationSender, SegmentOffsetWriter segmentOffsetWriter, - CommitLogTransfer commitLogTransfer) { - super(config, mutationSender, segmentOffsetWriter, commitLogTransfer); + CommitLogTransfer commitLogTransfer, + CommitLogReaderInitializer commitLogReaderInitializer) { + super(config, mutationSender, segmentOffsetWriter, commitLogTransfer, commitLogReaderInitializer); this.tasksExecutor = new JMXEnabledThreadPoolExecutor( config.cdcConcurrentProcessors == -1 ? DatabaseDescriptor.getFlushWriters() : config.cdcConcurrentProcessors, config.cdcConcurrentProcessors == -1 ? DatabaseDescriptor.getFlushWriters() : config.cdcConcurrentProcessors, diff --git a/agent-dse4/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderServiceImpl.java b/agent-dse4/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderServiceImpl.java index 1fe35efe..487ad967 100644 --- a/agent-dse4/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderServiceImpl.java +++ b/agent-dse4/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderServiceImpl.java @@ -37,8 +37,9 @@ public class CommitLogReaderServiceImpl extends CommitLogReaderService { public CommitLogReaderServiceImpl(AgentConfig config, MutationSender mutationSender, SegmentOffsetWriter segmentOffsetWriter, - CommitLogTransfer commitLogTransfer) { - super(config, mutationSender, segmentOffsetWriter, commitLogTransfer); + CommitLogTransfer commitLogTransfer, + CommitLogReaderInitializer commitLogReaderInitializer) { + super(config, mutationSender, segmentOffsetWriter, commitLogTransfer, commitLogReaderInitializer); this.tasksExecutor = JMXEnabledThreadPoolExecutor.createAndPrestart( config.cdcConcurrentProcessors == -1 ? DatabaseDescriptor.getFlushWriters() : config.cdcConcurrentProcessors, 1, TimeUnit.MINUTES, From 2fa28897c5aad765f51b3d89d0c12b144d291a30 Mon Sep 17 00:00:00 2001 From: Arkadip Date: Tue, 12 Aug 2025 17:54:54 +0530 Subject: [PATCH 5/7] refactor: use `CommitLogReaderService` constants directly in `CommitLogReaderInitializerImpl` --- .../oss/cdc/agent/CommitLogReaderInitializerImpl.java | 6 ++---- .../oss/cdc/agent/CommitLogReaderInitializerImpl.java | 6 +++--- 2 files changed, 5 insertions(+), 7 deletions(-) diff --git a/agent-c4/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderInitializerImpl.java b/agent-c4/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderInitializerImpl.java index edeb9b17..af2070c6 100644 --- a/agent-c4/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderInitializerImpl.java +++ b/agent-c4/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderInitializerImpl.java @@ -3,8 +3,6 @@ import java.io.File; import java.io.IOException; -import static com.datastax.oss.cdc.agent.CommitLogReaderService.ARCHIVE_FOLDER; -import static com.datastax.oss.cdc.agent.CommitLogReaderService.ERROR_FOLDER; public class CommitLogReaderInitializerImpl implements CommitLogReaderInitializer { @Override @@ -16,13 +14,13 @@ public void initialize(AgentConfig config, CommitLogReaderService commitLogReade } } - File archiveDir = new File(relocationDir, ARCHIVE_FOLDER); + File archiveDir = new File(relocationDir, CommitLogReaderService.ARCHIVE_FOLDER); if (!archiveDir.exists()) { if (!archiveDir.mkdir()) { throw new IOException("Failed to create " + archiveDir); } } - File errorDir = new File(relocationDir, ERROR_FOLDER); + File errorDir = new File(relocationDir, CommitLogReaderService.ERROR_FOLDER); if (!errorDir.exists()) { if (!errorDir.mkdir()) { throw new IOException("Failed to create " + errorDir); diff --git a/agent-dse4/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderInitializerImpl.java b/agent-dse4/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderInitializerImpl.java index 9914acec..ef1b7c4f 100644 --- a/agent-dse4/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderInitializerImpl.java +++ b/agent-dse4/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderInitializerImpl.java @@ -8,7 +8,7 @@ public class CommitLogReaderInitializerImpl implements CommitLogReaderInitializer { @Override - public void initialize(AgentConfig config) throws Exception { + public void initialize(AgentConfig config, CommitLogReaderService commitLogReaderService) throws Exception { File relocationDir = new File(config.cdcWorkingDir); if (!relocationDir.exists()) { if (!relocationDir.mkdir()) { @@ -16,13 +16,13 @@ public void initialize(AgentConfig config) throws Exception { } } - File archiveDir = new File(relocationDir, ARCHIVE_FOLDER); + File archiveDir = new File(relocationDir, CommitLogReaderService.ARCHIVE_FOLDER); if (!archiveDir.exists()) { if (!archiveDir.mkdir()) { throw new IOException("Failed to create " + archiveDir); } } - File errorDir = new File(relocationDir, ERROR_FOLDER); + File errorDir = new File(relocationDir, CommitLogReaderService.ERROR_FOLDER); if (!errorDir.exists()) { if (!errorDir.mkdir()) { throw new IOException("Failed to create " + errorDir); From 490ea945fc8675d5b918935376288b32fa4a1edb Mon Sep 17 00:00:00 2001 From: Arkadip Date: Tue, 12 Aug 2025 17:55:02 +0530 Subject: [PATCH 6/7] refactor: integrate `CommitLogReaderInitializer` into `CommitLogReaderServiceImpl` across all agent versions --- agent-c3/src/main/java/com/datastax/oss/cdc/agent/Agent.java | 3 ++- agent-c4/src/main/java/com/datastax/oss/cdc/agent/Agent.java | 3 ++- agent-dse4/src/main/java/com/datastax/oss/cdc/agent/Agent.java | 3 ++- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/agent-c3/src/main/java/com/datastax/oss/cdc/agent/Agent.java b/agent-c3/src/main/java/com/datastax/oss/cdc/agent/Agent.java index a33af9dc..30e42e87 100644 --- a/agent-c3/src/main/java/com/datastax/oss/cdc/agent/Agent.java +++ b/agent-c3/src/main/java/com/datastax/oss/cdc/agent/Agent.java @@ -65,7 +65,8 @@ static void startCdcAgent(String agentArgs) throws Exception { PulsarMutationSender pulsarMutationSender = new PulsarMutationSender(config); CommitLogTransfer commitLogTransfer = new BlackHoleCommitLogTransfer(config); - CommitLogReaderServiceImpl commitLogReaderService = new CommitLogReaderServiceImpl(config, pulsarMutationSender, segmentOffsetFileWriter, commitLogTransfer); + CommitLogReaderInitializer commitLogReaderInitializer = new CommitLogReaderInitializerImpl(); + CommitLogReaderServiceImpl commitLogReaderService = new CommitLogReaderServiceImpl(config, pulsarMutationSender, segmentOffsetFileWriter, commitLogTransfer, commitLogReaderInitializer); CommitLogProcessor commitLogProcessor = new CommitLogProcessor(DatabaseDescriptor.getCDCLogLocation(), config, commitLogTransfer, segmentOffsetFileWriter, commitLogReaderService, false); commitLogReaderService.initialize(); diff --git a/agent-c4/src/main/java/com/datastax/oss/cdc/agent/Agent.java b/agent-c4/src/main/java/com/datastax/oss/cdc/agent/Agent.java index b769a074..969f332a 100644 --- a/agent-c4/src/main/java/com/datastax/oss/cdc/agent/Agent.java +++ b/agent-c4/src/main/java/com/datastax/oss/cdc/agent/Agent.java @@ -65,7 +65,8 @@ static void startCdcAgent(String agentArgs) throws Exception { PulsarMutationSender pulsarMutationSender = new PulsarMutationSender(config); CommitLogTransfer commitLogTransfer = new BlackHoleCommitLogTransfer(config); - CommitLogReaderServiceImpl commitLogReaderService = new CommitLogReaderServiceImpl(config, pulsarMutationSender, segmentOffsetFileWriter, commitLogTransfer); + CommitLogReaderInitializer commitLogReaderInitializer = new CommitLogReaderInitializerImpl(); + CommitLogReaderServiceImpl commitLogReaderService = new CommitLogReaderServiceImpl(config, pulsarMutationSender, segmentOffsetFileWriter, commitLogTransfer, commitLogReaderInitializer); CommitLogProcessor commitLogProcessor = new CommitLogProcessor(DatabaseDescriptor.getCDCLogLocation(), config, commitLogTransfer, segmentOffsetFileWriter, commitLogReaderService, true); commitLogReaderService.initialize(); diff --git a/agent-dse4/src/main/java/com/datastax/oss/cdc/agent/Agent.java b/agent-dse4/src/main/java/com/datastax/oss/cdc/agent/Agent.java index f6db1492..421d65d7 100644 --- a/agent-dse4/src/main/java/com/datastax/oss/cdc/agent/Agent.java +++ b/agent-dse4/src/main/java/com/datastax/oss/cdc/agent/Agent.java @@ -66,7 +66,8 @@ static void startCdcAgent(String agentArgs) throws Exception { PulsarMutationSender pulsarMutationSender = new PulsarMutationSender(config); CommitLogTransfer commitLogTransfer = new BlackHoleCommitLogTransfer(config); - CommitLogReaderServiceImpl commitLogReaderService = new CommitLogReaderServiceImpl(config, pulsarMutationSender, segmentOffsetFileWriter, commitLogTransfer); + CommitLogReaderInitializer commitLogReaderInitializer = new CommitLogReaderInitializerImpl(); + CommitLogReaderServiceImpl commitLogReaderService = new CommitLogReaderServiceImpl(config, pulsarMutationSender, segmentOffsetFileWriter, commitLogTransfer, commitLogReaderInitializer); CommitLogProcessor commitLogProcessor = new CommitLogProcessor(DatabaseDescriptor.getCDCLogLocation().getAbsolutePath(), config, commitLogTransfer, segmentOffsetFileWriter, commitLogReaderService, true); commitLogReaderService.initialize(); From f5417f0fb2c13c0424fae31144d743a9e7b5da4e Mon Sep 17 00:00:00 2001 From: Arkadip Date: Tue, 12 Aug 2025 22:36:56 +0530 Subject: [PATCH 7/7] chore: add license headers to `CommitLogReaderInitializer` and its implementations across all agent versions --- .../cdc/agent/CommitLogReaderInitializerImpl.java | 15 +++++++++++++++ .../cdc/agent/CommitLogReaderInitializerImpl.java | 15 +++++++++++++++ .../cdc/agent/CommitLogReaderInitializerImpl.java | 15 +++++++++++++++ .../oss/cdc/agent/CommitLogReaderInitializer.java | 15 +++++++++++++++ 4 files changed, 60 insertions(+) diff --git a/agent-c3/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderInitializerImpl.java b/agent-c3/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderInitializerImpl.java index 6259e1ce..6f269ee9 100644 --- a/agent-c3/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderInitializerImpl.java +++ b/agent-c3/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderInitializerImpl.java @@ -1,3 +1,18 @@ +/** + * Copyright DataStax, Inc 2021. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.datastax.oss.cdc.agent; public class CommitLogReaderInitializerImpl implements CommitLogReaderInitializer { diff --git a/agent-c4/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderInitializerImpl.java b/agent-c4/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderInitializerImpl.java index af2070c6..dd5a0119 100644 --- a/agent-c4/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderInitializerImpl.java +++ b/agent-c4/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderInitializerImpl.java @@ -1,3 +1,18 @@ +/** + * Copyright DataStax, Inc 2021. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.datastax.oss.cdc.agent; import java.io.File; diff --git a/agent-dse4/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderInitializerImpl.java b/agent-dse4/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderInitializerImpl.java index ef1b7c4f..13e94842 100644 --- a/agent-dse4/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderInitializerImpl.java +++ b/agent-dse4/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderInitializerImpl.java @@ -1,3 +1,18 @@ +/** + * Copyright DataStax, Inc 2021. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.datastax.oss.cdc.agent; import java.io.File; diff --git a/agent/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderInitializer.java b/agent/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderInitializer.java index 3f5df342..cb22d2a3 100644 --- a/agent/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderInitializer.java +++ b/agent/src/main/java/com/datastax/oss/cdc/agent/CommitLogReaderInitializer.java @@ -1,3 +1,18 @@ +/** + * Copyright DataStax, Inc 2021. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package com.datastax.oss.cdc.agent; public interface CommitLogReaderInitializer {