Skip to content
3 changes: 2 additions & 1 deletion agent-c3/src/main/java/com/datastax/oss/cdc/agent/Agent.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ static void startCdcAgent(String agentArgs) throws Exception {

PulsarMutationSender pulsarMutationSender = new PulsarMutationSender(config);
CommitLogTransfer commitLogTransfer = new BlackHoleCommitLogTransfer(config);
CommitLogReaderServiceImpl commitLogReaderService = new CommitLogReaderServiceImpl(config, pulsarMutationSender, segmentOffsetFileWriter, commitLogTransfer);
CommitLogReaderInitializer commitLogReaderInitializer = new CommitLogReaderInitializerImpl();
CommitLogReaderServiceImpl commitLogReaderService = new CommitLogReaderServiceImpl(config, pulsarMutationSender, segmentOffsetFileWriter, commitLogTransfer, commitLogReaderInitializer);
CommitLogProcessor commitLogProcessor = new CommitLogProcessor(DatabaseDescriptor.getCDCLogLocation(), config, commitLogTransfer, segmentOffsetFileWriter, commitLogReaderService, false);

commitLogReaderService.initialize();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/**
* Copyright DataStax, Inc 2021.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datastax.oss.cdc.agent;

public class CommitLogReaderInitializerImpl implements CommitLogReaderInitializer {
@Override
public void initialize(AgentConfig config, CommitLogReaderService commitLogReaderService) throws Exception {

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,9 @@ public class CommitLogReaderServiceImpl extends CommitLogReaderService {
public CommitLogReaderServiceImpl(AgentConfig config,
MutationSender<CFMetaData> mutationSender,
SegmentOffsetWriter segmentOffsetWriter,
CommitLogTransfer commitLogTransfer) {
super(config, mutationSender, segmentOffsetWriter, commitLogTransfer);
CommitLogTransfer commitLogTransfer,
CommitLogReaderInitializer commitLogReaderInitializer) {
super(config, mutationSender, segmentOffsetWriter, commitLogTransfer, commitLogReaderInitializer);
this.tasksExecutor = new JMXEnabledThreadPoolExecutor(
config.cdcConcurrentProcessors == -1 ? DatabaseDescriptor.getFlushWriters() : config.cdcConcurrentProcessors,
config.cdcConcurrentProcessors == -1 ? DatabaseDescriptor.getFlushWriters() : config.cdcConcurrentProcessors,
Expand Down
3 changes: 2 additions & 1 deletion agent-c4/src/main/java/com/datastax/oss/cdc/agent/Agent.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,8 @@ static void startCdcAgent(String agentArgs) throws Exception {

PulsarMutationSender pulsarMutationSender = new PulsarMutationSender(config);
CommitLogTransfer commitLogTransfer = new BlackHoleCommitLogTransfer(config);
CommitLogReaderServiceImpl commitLogReaderService = new CommitLogReaderServiceImpl(config, pulsarMutationSender, segmentOffsetFileWriter, commitLogTransfer);
CommitLogReaderInitializer commitLogReaderInitializer = new CommitLogReaderInitializerImpl();
CommitLogReaderServiceImpl commitLogReaderService = new CommitLogReaderServiceImpl(config, pulsarMutationSender, segmentOffsetFileWriter, commitLogTransfer, commitLogReaderInitializer);
CommitLogProcessor commitLogProcessor = new CommitLogProcessor(DatabaseDescriptor.getCDCLogLocation(), config, commitLogTransfer, segmentOffsetFileWriter, commitLogReaderService, true);

commitLogReaderService.initialize();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/**
* Copyright DataStax, Inc 2021.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datastax.oss.cdc.agent;

import java.io.File;
import java.io.IOException;


public class CommitLogReaderInitializerImpl implements CommitLogReaderInitializer {
@Override
public void initialize(AgentConfig config, CommitLogReaderService commitLogReaderService) throws Exception {
File relocationDir = new File(config.cdcWorkingDir);
if (!relocationDir.exists()) {
if (!relocationDir.mkdir()) {
throw new IOException("Failed to create " + config.cdcWorkingDir);
}
}

File archiveDir = new File(relocationDir, CommitLogReaderService.ARCHIVE_FOLDER);
if (!archiveDir.exists()) {
if (!archiveDir.mkdir()) {
throw new IOException("Failed to create " + archiveDir);
}
}
File errorDir = new File(relocationDir, CommitLogReaderService.ERROR_FOLDER);
if (!errorDir.exists()) {
if (!errorDir.mkdir()) {
throw new IOException("Failed to create " + errorDir);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import java.io.File;
import java.util.Optional;
import java.util.concurrent.*;
import java.util.function.IntBinaryOperator;

/**
* Consume a queue of commitlog files to read mutations.
Expand All @@ -37,8 +36,9 @@ public class CommitLogReaderServiceImpl extends CommitLogReaderService {
public CommitLogReaderServiceImpl(AgentConfig config,
MutationSender<TableMetadata> mutationSender,
SegmentOffsetWriter segmentOffsetWriter,
CommitLogTransfer commitLogTransfer) {
super(config, mutationSender, segmentOffsetWriter, commitLogTransfer);
CommitLogTransfer commitLogTransfer,
CommitLogReaderInitializer commitLogReaderInitializer) {
super(config, mutationSender, segmentOffsetWriter, commitLogTransfer, commitLogReaderInitializer);
this.tasksExecutor = new JMXEnabledThreadPoolExecutor(
config.cdcConcurrentProcessors == -1 ? DatabaseDescriptor.getFlushWriters() : config.cdcConcurrentProcessors,
config.cdcConcurrentProcessors == -1 ? DatabaseDescriptor.getFlushWriters() : config.cdcConcurrentProcessors,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,8 @@ static void startCdcAgent(String agentArgs) throws Exception {

PulsarMutationSender pulsarMutationSender = new PulsarMutationSender(config);
CommitLogTransfer commitLogTransfer = new BlackHoleCommitLogTransfer(config);
CommitLogReaderServiceImpl commitLogReaderService = new CommitLogReaderServiceImpl(config, pulsarMutationSender, segmentOffsetFileWriter, commitLogTransfer);
CommitLogReaderInitializer commitLogReaderInitializer = new CommitLogReaderInitializerImpl();
CommitLogReaderServiceImpl commitLogReaderService = new CommitLogReaderServiceImpl(config, pulsarMutationSender, segmentOffsetFileWriter, commitLogTransfer, commitLogReaderInitializer);
CommitLogProcessor commitLogProcessor = new CommitLogProcessor(DatabaseDescriptor.getCDCLogLocation().getAbsolutePath(), config, commitLogTransfer, segmentOffsetFileWriter, commitLogReaderService, true);

commitLogReaderService.initialize();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/**
* Copyright DataStax, Inc 2021.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datastax.oss.cdc.agent;

import java.io.File;
import java.io.IOException;

import static com.datastax.oss.cdc.agent.CommitLogReaderService.ARCHIVE_FOLDER;
import static com.datastax.oss.cdc.agent.CommitLogReaderService.ERROR_FOLDER;

public class CommitLogReaderInitializerImpl implements CommitLogReaderInitializer {
@Override
public void initialize(AgentConfig config, CommitLogReaderService commitLogReaderService) throws Exception {
File relocationDir = new File(config.cdcWorkingDir);
if (!relocationDir.exists()) {
if (!relocationDir.mkdir()) {
throw new IOException("Failed to create " + config.cdcWorkingDir);
}
}

File archiveDir = new File(relocationDir, CommitLogReaderService.ARCHIVE_FOLDER);
if (!archiveDir.exists()) {
if (!archiveDir.mkdir()) {
throw new IOException("Failed to create " + archiveDir);
}
}
File errorDir = new File(relocationDir, CommitLogReaderService.ERROR_FOLDER);
if (!errorDir.exists()) {
if (!errorDir.mkdir()) {
throw new IOException("Failed to create " + errorDir);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,9 @@ public class CommitLogReaderServiceImpl extends CommitLogReaderService {
public CommitLogReaderServiceImpl(AgentConfig config,
MutationSender<TableMetadata> mutationSender,
SegmentOffsetWriter segmentOffsetWriter,
CommitLogTransfer commitLogTransfer) {
super(config, mutationSender, segmentOffsetWriter, commitLogTransfer);
CommitLogTransfer commitLogTransfer,
CommitLogReaderInitializer commitLogReaderInitializer) {
super(config, mutationSender, segmentOffsetWriter, commitLogTransfer, commitLogReaderInitializer);
this.tasksExecutor = JMXEnabledThreadPoolExecutor.createAndPrestart(
config.cdcConcurrentProcessors == -1 ? DatabaseDescriptor.getFlushWriters() : config.cdcConcurrentProcessors,
1, TimeUnit.MINUTES,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
/**
* Copyright DataStax, Inc 2021.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datastax.oss.cdc.agent;

public interface CommitLogReaderInitializer {
void initialize(AgentConfig config, CommitLogReaderService commitLogReaderService) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import lombok.extern.slf4j.Slf4j;

import java.io.File;
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.file.Files;
import java.util.List;
Expand All @@ -30,8 +29,6 @@
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.IntBinaryOperator;
import java.util.function.LongBinaryOperator;

@Slf4j
public abstract class CommitLogReaderService implements Runnable, AutoCloseable
Expand Down Expand Up @@ -72,6 +69,7 @@ public abstract class CommitLogReaderService implements Runnable, AutoCloseable
* ordered commitlog file queue.
*/
final PriorityBlockingQueue<File> commitLogQueue;
private final CommitLogReaderInitializer commitLogReaderInitializer;

/**
* Consumes commitlog files in parallel.
Expand All @@ -81,11 +79,13 @@ public abstract class CommitLogReaderService implements Runnable, AutoCloseable
public CommitLogReaderService(AgentConfig config,
MutationSender<?> mutationSender,
SegmentOffsetWriter segmentOffsetWriter,
CommitLogTransfer commitLogTransfer) {
CommitLogTransfer commitLogTransfer,
CommitLogReaderInitializer commitLogReaderInitializer) {
this.config = config;
this.mutationSender = mutationSender;
this.segmentOffsetWriter = segmentOffsetWriter;
this.commitLogTransfer = commitLogTransfer;
this.commitLogReaderInitializer = commitLogReaderInitializer;
this.commitLogQueue = new PriorityBlockingQueue<>(128, CommitLogUtil::compareCommitLogs);
}

Expand Down Expand Up @@ -150,25 +150,7 @@ public void submitCommitLog(File file) {
}

public void initialize() throws Exception {
File relocationDir = new File(config.cdcWorkingDir);
if (!relocationDir.exists()) {
if (!relocationDir.mkdir()) {
throw new IOException("Failed to create " + config.cdcWorkingDir);
}
}

File archiveDir = new File(relocationDir, ARCHIVE_FOLDER);
if (!archiveDir.exists()) {
if (!archiveDir.mkdir()) {
throw new IOException("Failed to create " + archiveDir);
}
}
File errorDir = new File(relocationDir, ERROR_FOLDER);
if (!errorDir.exists()) {
if (!errorDir.mkdir()) {
throw new IOException("Failed to create " + errorDir);
}
}
commitLogReaderInitializer.initialize(config, this);
}

/**
Expand Down
Loading