Skip to content

Commit

Permalink
Merge pull request #16 from SAP/develop
Browse files Browse the repository at this point in the history
Added delayed processing based on memory usage. Fixing other issues
  • Loading branch information
lnowakowski authored Apr 9, 2024
2 parents d2cb6af + 83e7cbe commit 7e86528
Show file tree
Hide file tree
Showing 19 changed files with 348 additions and 9 deletions.
5 changes: 5 additions & 0 deletions commercedbsync/external-dependencies.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,10 @@
<artifactId>jackson-datatype-jsr310</artifactId>
<version>2.13.3</version>
</dependency>
<dependency>
<groupId>org.openjdk.jol</groupId>
<artifactId>jol-core</artifactId>
<version>0.10</version>
</dependency>
</dependencies>
</project>
29 changes: 29 additions & 0 deletions commercedbsync/project.properties
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,35 @@ migration.log.sql.source.showparameters=true
##
migration.data.filestorage.container.name=migration
migration.data.fulldatabase.enabled=true
##
# Activates enhanced memory usage logging
#
# @values true or false
# @optional false
##
migration.profiling=false
##
# Delays reading until a minimum amount of memory is available
#
# @values any number
# @optional false
##
migration.memory.min=5000000
##
# Number of attempts to wait for free memory
#
# @values any number
# @optional false
##
migration.memory.attempts=300
##
# Number of time to wait for free memory (milliseconds)
#
# @values any number
# @optional false
##
migration.memory.wait=2000


# Enhanced Logging
log4j2.appender.migrationAppender.type=Console
Expand Down
6 changes: 6 additions & 0 deletions commercedbsync/resources/commercedbsync-spring.xml
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,11 @@
<property name="configurationService" ref="configurationService" />
</bean>

<bean id="updateYDeploymentsPostProcessor"
class="com.sap.cx.boosters.commercedbsync.processors.impl.UpdateYDeploymentsPostProcessor">
<property name="databaseCopyTaskRepository" ref="databaseCopyTaskRepository"/>
</bean>

<alias name="defaultCopyCompleteEventListener" alias="copyCompleteEventListener"/>
<bean id="defaultCopyCompleteEventListener"
class="com.sap.cx.boosters.commercedbsync.events.handlers.CopyCompleteEventListener"
Expand All @@ -240,6 +245,7 @@
<ref bean="reportMigrationPostProcessor"/>
<ref bean="jdbcQueriesPostProcessor"/>
<ref bean="adjustActiveTypeSystemPostProcessor"/>
<ref bean="updateYDeploymentsPostProcessor"/>
</util:list>
</property>
</bean>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ public BatchMarkerDataReaderTask(PipeTaskContext pipeTaskContext, int batchId, S

@Override
protected Boolean internalRun() throws Exception {
waitForFreeMemory();
process(batchMarkersPair.getLeft(), batchMarkersPair.getRight());
return Boolean.TRUE;
}
Expand All @@ -56,6 +57,9 @@ private void process(Object lastValue, Object nextValue) throws Exception {
lastValue, nextValue, pageSize);
}
DataSet page = adapter.getBatchOrderedByColumn(ctx.getMigrationContext(), queryDefinition);

profileData(ctx, batchId, table, pageSize, page);

getPipeTaskContext().getRecorder().record(PerformanceUnit.ROWS, pageSize);
getPipeTaskContext().getPipe().put(MaybeFinished.of(page));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,15 @@
import com.sap.cx.boosters.commercedbsync.context.CopyContext;
import com.sap.cx.boosters.commercedbsync.dataset.DataSet;
import com.sap.cx.boosters.commercedbsync.performance.PerformanceUnit;

import java.util.Set;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class BatchOffsetDataReaderTask extends DataReaderTask {

private static final Logger LOG = LoggerFactory.getLogger(BatchOffsetDataReaderTask.class);

private final long offset;
private final Set<String> batchColumns;
private final int batchId;
Expand All @@ -32,6 +35,7 @@ public BatchOffsetDataReaderTask(PipeTaskContext pipeTaskContext, int batchId, l

@Override
protected Boolean internalRun() throws Exception {
waitForFreeMemory();
process();
return Boolean.TRUE;
}
Expand All @@ -50,7 +54,9 @@ private void process() throws Exception {
queryDefinition.setDeletionEnabled(context.getMigrationContext().isDeletionEnabled());
queryDefinition.setLpTableEnabled(context.getMigrationContext().isLpTableMigrationEnabled());
DataSet result = adapter.getBatchWithoutIdentifier(context.getMigrationContext(), queryDefinition);
profileData(context, batchId, table, pageSize, result);
getPipeTaskContext().getRecorder().record(PerformanceUnit.ROWS, result.getAllResults().size());
getPipeTaskContext().getPipe().put(MaybeFinished.of(result));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,19 @@

package com.sap.cx.boosters.commercedbsync.concurrent.impl.task;

import com.sap.cx.boosters.commercedbsync.concurrent.PipeAbortedException;
import com.sap.cx.boosters.commercedbsync.constants.CommercedbsyncConstants;
import com.sap.cx.boosters.commercedbsync.context.CopyContext;
import com.sap.cx.boosters.commercedbsync.dataset.DataSet;
import de.hybris.platform.core.MasterTenant;
import org.openjdk.jol.info.GraphLayout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class DataReaderTask extends RetriableTask {

private static final Logger LOG = LoggerFactory.getLogger(DataReaderTask.class);

private final PipeTaskContext pipeTaskContext;

protected DataReaderTask(PipeTaskContext pipeTaskContext) {
Expand All @@ -18,4 +29,34 @@ protected DataReaderTask(PipeTaskContext pipeTaskContext) {
public PipeTaskContext getPipeTaskContext() {
return pipeTaskContext;
}

protected void waitForFreeMemory() throws Exception {
CopyContext context = getPipeTaskContext().getContext();
final long minMem = context.getMigrationContext().getMemoryMin();
long freeMem = Runtime.getRuntime().freeMemory();

int cnt = 0;
while (freeMem < minMem) {
LOG.trace("Waiting for freeMem {} / {} Attempts={}", freeMem, minMem, cnt);
Thread.sleep(context.getMigrationContext().getMemoryWait());
cnt++;
if (cnt >= context.getMigrationContext().getMemoryMaxAttempts()) {
throw new PipeAbortedException("Maximum wait time exceeded. See property "
+ CommercedbsyncConstants.MIGRATION_PROFILING_MEMORY_ATTEMPTS + " for more details.");
}
freeMem = Runtime.getRuntime().freeMemory();
}
}

protected void profileData(final CopyContext context, final int batchId, final String table, final long pageSize,
final DataSet result) {
if (context.getMigrationContext().isProfiling() && result != null) {
final long objSize = GraphLayout.parseInstance(result.getAllResults()).totalSize();
final long freeMem = Runtime.getRuntime().freeMemory();
final int clusterID = MasterTenant.getInstance().getClusterID();
LOG.trace(
"Memory usage: [{}], Table = {}, BatchId = {}, Page Size = {}, Batch Memory Size = {}, Free System Memory = {}",
clusterID, table, batchId, pageSize, objSize, freeMem);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ public DefaultDataReaderTask(PipeTaskContext pipeTaskContext) {

@Override
protected Boolean internalRun() throws Exception {
waitForFreeMemory();
process();
return Boolean.TRUE;
}
Expand All @@ -27,6 +28,8 @@ private void process() throws Exception {
MigrationContext migrationContext = getPipeTaskContext().getContext().getMigrationContext();
DataSet all = getPipeTaskContext().getDataRepositoryAdapter().getAll(migrationContext,
getPipeTaskContext().getTable());
profileData(getPipeTaskContext().getContext(), -1, getPipeTaskContext().getTable(),
getPipeTaskContext().getPageSize(), all);
getPipeTaskContext().getRecorder().record(PerformanceUnit.ROWS, all.getAllResults().size());
getPipeTaskContext().getPipe().put(MaybeFinished.of(all));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,10 @@ public final class CommercedbsyncConstants extends GeneratedCommercedbsyncConsta
public static final String MIGRATION_FILE_STORAGE_CONTAINER_NAME = "migration.data.filestorage.container.name";
public static final String MIGRATION_INPUT_PROFILES = "migration.input.profiles";
public static final String MIGRATION_OUTPUT_PROFILES = "migration.output.profiles";
public static final String MIGRATION_PROFILING = "migration.profiling";
public static final String MIGRATION_PROFILING_MEMORY_MIN = "migration.memory.min";
public static final String MIGRATION_PROFILING_MEMORY_ATTEMPTS = "migration.memory.attempts";
public static final String MIGRATION_PROFILING_MEMORY_WAIT = "migration.memory.wait";

public static final String MIGRATION_DATA_READTASK_KEEPALIVE_SECONDS = "migration.data.readtask.keepaliveseconds";
public static final String MIGRATION_DATA_READTASK_QUEUE_CAPACITY = "migration.data.readtask.queuecapacity";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,14 @@ public interface MigrationContext {

void setFullDatabaseMigrationEnabled(boolean enabled);

boolean isProfiling();

long getMemoryMin();

int getMemoryMaxAttempts();

int getMemoryWait();

void refreshSelf();

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,26 @@ public Set<String> getOutputProfiles() {
return getListProperty(CommercedbsyncConstants.MIGRATION_OUTPUT_PROFILES);
}

@Override
public boolean isProfiling() {
return getBooleanProperty(CommercedbsyncConstants.MIGRATION_PROFILING);
}

@Override
public long getMemoryMin() {
return getLongProperty(CommercedbsyncConstants.MIGRATION_PROFILING_MEMORY_MIN);
}

@Override
public int getMemoryMaxAttempts() {
return getNumericProperty(CommercedbsyncConstants.MIGRATION_PROFILING_MEMORY_ATTEMPTS);
}

@Override
public int getMemoryWait() {
return getNumericProperty(CommercedbsyncConstants.MIGRATION_PROFILING_MEMORY_WAIT);
}

@Override
public boolean isDeletionEnabled() {
return this.deletionEnabled;
Expand Down Expand Up @@ -322,6 +342,10 @@ protected int getNumericProperty(final String key) {
return configuration.getInt(key);
}

protected long getLongProperty(final String key) {
return configuration.getLong(key);
}

protected String getStringProperty(final String key) {
return configuration.getString(key);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.sap.cx.boosters.commercedbsync.context.LaunchOptions;
import com.sap.cx.boosters.commercedbsync.context.MigrationContext;
import com.sap.cx.boosters.commercedbsync.model.cron.MigrationCronJobModel;
import com.sap.cx.boosters.commercedbsync.provider.CopyItemProvider;
import org.apache.commons.lang3.BooleanUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
Expand Down Expand Up @@ -48,8 +49,6 @@ public abstract class AbstractMigrationJobPerformable extends AbstractJobPerform

private static final String[] RUNNING_MIGRATION = new String[]{MigrationProgress.RUNNING.toString(),
MigrationProgress.PROCESSED.toString(), MigrationProgress.POSTPROCESSING.toString()};
private static final String[] TYPE_SYSTEM_RELATED_TYPES = new String[]{"atomictypes", "attributeDescriptors",
"collectiontypes", "composedtypes", "enumerationvalues", "maptypes"};

private static final String MIGRATION_UPDATE_TYPE_SYSTEM = "migration.ds.update.typesystem.table";
private static final String SOURCE_TYPESYSTEMNAME = "migration.ds.source.db.typesystemname";
Expand Down Expand Up @@ -152,7 +151,7 @@ protected void updateTypesystemTable(Set<String> migrationItems) throws Exceptio
}
DataRepository sourceRepository = migrationContext.getDataSourceRepository();
for (final String tableName : migrationItems) {
if (Arrays.stream(TYPE_SYSTEM_RELATED_TYPES)
if (Arrays.stream(CopyItemProvider.TYPE_SYSTEM_RELATED_TYPES)
.anyMatch(t -> StringUtils.startsWithIgnoreCase(tableName, t))) {
try (Connection connection = sourceRepository.getConnection();
Statement stmt = connection.createStatement();
Expand Down
Loading

0 comments on commit 7e86528

Please sign in to comment.