Skip to content
This repository has been archived by the owner on May 3, 2022. It is now read-only.

Commit

Permalink
Revert to log4j logging style
Browse files Browse the repository at this point in the history
  • Loading branch information
krakowski committed Oct 4, 2018
1 parent 116a9be commit 5d5f18c
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 80 deletions.
154 changes: 83 additions & 71 deletions src/main/java/de/hhu/bsinfo/dxram/migration/MigrationManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@


import de.hhu.bsinfo.dxmem.DXMem;
import de.hhu.bsinfo.dxmem.data.ChunkID;
import de.hhu.bsinfo.dxnet.MessageReceiver;
import de.hhu.bsinfo.dxnet.core.Message;
import de.hhu.bsinfo.dxnet.core.NetworkException;
Expand All @@ -27,37 +26,36 @@
import de.hhu.bsinfo.dxram.boot.AbstractBootComponent;
import de.hhu.bsinfo.dxram.chunk.ChunkComponent;
import de.hhu.bsinfo.dxram.chunk.ChunkMigrationComponent;
import de.hhu.bsinfo.dxram.chunk.ChunkService;
import de.hhu.bsinfo.dxram.engine.DXRAMComponentAccessor;
import de.hhu.bsinfo.dxram.migration.data.MigrationPayload;
import de.hhu.bsinfo.dxram.log.messages.RemoveMessage;
import de.hhu.bsinfo.dxram.migration.data.MigrationIdentifier;
import de.hhu.bsinfo.dxram.migration.data.MigrationPayload;
import de.hhu.bsinfo.dxram.migration.messages.MigrationFinish;
import de.hhu.bsinfo.dxram.migration.messages.MigrationMessages;
import de.hhu.bsinfo.dxram.migration.messages.MigrationPush;
import de.hhu.bsinfo.dxram.migration.progress.MigrationProgress;
import de.hhu.bsinfo.dxram.migration.progress.MigrationProgressTracker;
import de.hhu.bsinfo.dxram.net.NetworkComponent;
import de.hhu.bsinfo.dxutils.ArrayListLong;
import de.hhu.bsinfo.dxutils.NodeID;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.text.DecimalFormat;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.LongStream;

@SuppressWarnings("WeakerAccess")
public class MigrationManager implements MessageReceiver, ChunkMigrator {

public static final ThreadFactory THREAD_FACTORY = new MigrationThreadFactory();

private static final Logger log = LoggerFactory.getLogger(MigrationManager.class);

private static final AtomicLong MIGRATION_COUNTER = new AtomicLong(0);
private static final Logger log = LogManager.getFormatterLogger(MigrationManager.class);

private final ExecutorService m_executor;

Expand Down Expand Up @@ -88,7 +86,7 @@ public MigrationManager(int p_workerCount, final DXRAMComponentAccessor p_compon
* @param p_range The chunk range.
* @return A ticket containing information associated with the created migration.
*/
public MigrationTicket<MigrationStatus> migrateRange(final short p_target, final LongRange p_range) {
public MigrationTicket migrateRange(final short p_target, final LongRange p_range) {
MigrationIdentifier identifier = new MigrationIdentifier(m_boot.getNodeId(), p_target);
List<MigrationTask> tasks = createMigrationTasks(identifier, p_range);

Expand All @@ -97,31 +95,9 @@ public MigrationTicket<MigrationStatus> migrateRange(final short p_target, final

tasks.forEach(m_executor::execute);

return new MigrationTicket<>(future, identifier);
return new MigrationTicket(future, identifier);
}

// public CompletableFuture<Void> migrateRanges(final short p_target, final List<LongRange> p_ranges) {
// MigrationIdentifier rootIdentifier;
// MigrationIdentifier taskIdentifier;
// MigrationTask task;
// MigrationProgress progress;
// short source = m_boot.getNodeId();
//
// for (LongRange range : p_ranges) {
// rootIdentifier = new MigrationIdentifier(source, p_target, range.getFrom(), range.getTo());
// task = new MigrationTask(this, new MigrationIdentifier(source, p_target, range.getFrom(), range.getTo(), 0), range.getFrom(), range.getTo());
// progress = new MigrationProgress(1);
// m_progressMap.put(rootIdentifier, progress);
// }
//
// @SuppressWarnings("unchecked")
// CompletableFuture<MigrationStatus>[] futureList = (CompletableFuture<MigrationStatus>[]) IntStream.range(0, p_ranges.length - 1)
// .mapToObj(i -> migrateRange(p_target, p_ranges[i], p_ranges[i + 1]))
// .toArray();
//
// return CompletableFuture.allOf(futureList);
// }

/**
* Creates multiple migration tasks using the specified migration identifier.
*
Expand All @@ -133,8 +109,6 @@ public List<MigrationTask> createMigrationTasks(MigrationIdentifier p_identifier

long[] partitions = partition(p_range.getFrom(), p_range.getTo(), m_workerCount);

log.debug("{} {}", String.format("[%X,%X]", partitions[0], partitions[1]), p_range);

List<LongRange> chunkRange;
for (int i = 0, j = 0; i < partitions.length - 1; i += 2, j++) {
chunkRange = Collections.singletonList(new LongRange(partitions[i], partitions[i + 1]));
Expand All @@ -147,7 +121,7 @@ public List<MigrationTask> createMigrationTasks(MigrationIdentifier p_identifier
// TODO(krakowski)
// Move this method to dxutils
public static long[] partition(long p_start, long p_end, int p_count) {
log.info("Creating {} partitions for chunks [{} , {}]", p_count, ChunkID.toHexString(p_start), ChunkID.toHexString(p_end));
log.info("Creating %d partitions for chunks [%X , %X]", p_count, p_start, p_end);
int elementCount = (int) (p_end - p_start);

if (p_count > elementCount) {
Expand Down Expand Up @@ -184,7 +158,7 @@ public void onIncomingMessage(final Message p_message) {
}

if (p_message.getType() != DXRAMMessageTypes.MIGRATION_MESSAGES_TYPE) {
log.warn("Received wrong message type {}", p_message.getType());
log.warn("Received wrong message type %d", p_message.getType());
}

m_executor.execute(() -> {
Expand All @@ -201,8 +175,7 @@ public void onIncomingMessage(final Message p_message) {

@Override
public void onStatus(MigrationIdentifier p_identifier, long p_startId, long p_endId, Status p_result) {
log.debug("Received Result {} for chunk range [{}, {}]", p_result, ChunkID.toHexString(p_startId),
ChunkID.toHexString(p_endId));
log.debug("Received Result %s for chunk range [%X, %X]", p_result, p_startId, p_endId);
}

@Override
Expand All @@ -216,24 +189,24 @@ public Status migrate(MigrationIdentifier p_identifier, List<LongRange> p_ranges

byte[][] data = new byte[chunkCount][];

log.debug("Collecting {} chunks from memory", chunkCount);
log.debug("Collecting %d chunks from memory", chunkCount);

int index = 0;
for (LongRange range : p_ranges) {
for (long chunkId = range.getFrom(); chunkId < range.getTo(); chunkId++) {

if ((data[index] = m_memory.get().get(chunkId).getData()) == null) {
log.warn("Chunk {} does not exist", ChunkID.toHexString(chunkId));
log.warn("Chunk %X does not exist", chunkId);
throw new IllegalArgumentException("Can't migrate non-existent chunks");
}

index++;
}
}

log.debug("Creating chunk ranges {} and migration push message for node {}",
log.debug("Creating chunk ranges %s and migration push message for node %X",
LongRange.collectionToString(p_ranges),
NodeID.toHexString(p_identifier.getTarget()));
p_identifier.getTarget());

MigrationPayload migrationPayload = new MigrationPayload(p_ranges, data);

Expand All @@ -244,9 +217,9 @@ public Status migrate(MigrationIdentifier p_identifier, List<LongRange> p_ranges
.reduce(0, (a, b) -> a + b);

try {
log.debug("Sending chunk ranges {} to {} containing {}",
log.debug("Sending chunk ranges %s to %X containing %s",
LongRange.collectionToString(p_ranges),
NodeID.toHexString(migrationPush.getDestination()),
migrationPush.getDestination(),
readableFileSize(size));

m_network.sendMessage(migrationPush);
Expand Down Expand Up @@ -274,9 +247,9 @@ private void handle(final MigrationPush p_migrationPush) {

List<LongRange> ranges = payload.getLongRanges();

log.debug("Received chunk range {} from {} containing {}",
log.debug("Received chunk range %s from %X containing %s",
LongRange.collectionToString(ranges),
NodeID.toHexString(p_migrationPush.getDestination()),
p_migrationPush.getDestination(),
readableFileSize(size));

final long[] chunkIds = ranges.stream().flatMapToLong(range -> LongStream.range(range.getFrom(), range.getTo())).toArray();
Expand All @@ -285,12 +258,12 @@ private void handle(final MigrationPush p_migrationPush) {

boolean status = m_chunkMigration.putMigratedChunks(chunkIds, payload.getData());

log.debug("Storing migrated chunks {}", status ? "succeeded" : "failed");
log.debug("Storing migrated chunks %s", status ? "succeeded" : "failed");

final MigrationFinish migrationFinish = new MigrationFinish(p_migrationPush.getIdentifier(), ranges, status);

try {
log.debug("Sending response to {}", NodeID.toHexString(migrationFinish.getDestination()));
log.debug("Sending response to %X", migrationFinish.getDestination());
m_network.sendMessage(migrationFinish);
} catch (NetworkException e) {
log.error("Couldn't send migration finish message", e);
Expand All @@ -299,44 +272,48 @@ private void handle(final MigrationPush p_migrationPush) {

private void handle(final MigrationFinish p_migrationFinish) {
if (!p_migrationFinish.isFinished()) {
log.warn("Migration was not successful on node {}", NodeID.toHexString(p_migrationFinish.getSource()));
log.warn("Migration was not successful on node %X", p_migrationFinish.getSource());
}

MigrationIdentifier identifier = p_migrationFinish.getIdentifier();

log.debug("ProgressMap[{}] = {}", identifier, m_progressTracker.isRunning(identifier));
log.debug("ProgressMap[%s] = %b", identifier.toString(), m_progressTracker.isRunning(identifier));

Collection<LongRange> ranges = p_migrationFinish.getLongRanges();

log.debug("Migration {} successfully migrated chunk ranges {}", p_migrationFinish.getIdentifier(), LongRange.collectionToString(ranges));
log.debug("Migration %s successfully migrated chunk ranges %s", p_migrationFinish.getIdentifier().toString(),
LongRange.collectionToString(ranges));

log.debug("Removing migrated chunks from local memory");

// Remove chunks from local storage
for (LongRange range : ranges) {
for (long cid = range.getFrom(); cid < range.getTo(); cid++) {
int chunkSize = m_memory.remove().remove(cid, true);
m_backup.deregisterChunk(cid, chunkSize);
}
}

// if (m_backup.isActive()) {
// for (long cid = startId; cid <= endId; cid++) {
// short[] backupPeers;
// backupPeers = m_backup.getArrayOfBackupPeersForLocalChunks(cid);
// if (backupPeers != null) {
// for (int j = 0; j < backupPeers.length; j++) {
// if (backupPeers[j] != m_boot.getNodeId() && backupPeers[j] != NodeID.INVALID_ID) {
// try {
// m_network.sendMessage(new RemoveMessage(backupPeers[j], new ArrayListLong(cid)));
// } catch (final NetworkException ignored) {
// log.warn("Sending RemoveMessage to {} failed", NodeID.toHexStringShort(backupPeers[j]));
// }
// }
// }
// }
// }
// }

// Remove chunks on remote backup peers
if (m_backup.isActive()) {
for (LongRange range : ranges) {
for (long cid = range.getFrom(); cid < range.getTo(); cid++) {
short[] backupPeers;
backupPeers = m_backup.getArrayOfBackupPeersForLocalChunks(cid);
if (backupPeers != null) {
for (int j = 0; j < backupPeers.length; j++) {
if (backupPeers[j] != m_boot.getNodeId() && backupPeers[j] != NodeID.INVALID_ID) {
try {
m_network.sendMessage(new RemoveMessage(backupPeers[j], new ArrayListLong(cid)));
} catch (final NetworkException ignored) {
log.warn("Sending RemoveMessage to %X failed", backupPeers[j]);
}
}
}
}
}
}
}


m_progressTracker.setFinished(identifier, ranges);
Expand All @@ -362,7 +339,42 @@ public int getWorkerCount() {
return m_workerCount;
}

public void registerMessages() {
/**
* Waits until the corresponding migration finishes.
*
* @param p_ticket The ticket associated with the migration.
* @return The migration's status or null if an exception occurred.
*/
@Nullable
public MigrationStatus await(final @NotNull MigrationTicket p_ticket) {
try {
return p_ticket.getFuture().get();
} catch (InterruptedException | ExecutionException p_e) {
log.warn("Waiting on migration failed", p_e);
return null;
}
}

/**
* Waits until the corresponding migration finishes or the specified timeout is reached.
*
* @param p_ticket The ticket associated with the migration.
* @return The migration's status or null if an exception occurred or the timeout was reached.
*/
@Nullable
public MigrationStatus await(final long p_timeout, final @NotNull TimeUnit p_timeUnit, final @NotNull MigrationTicket p_ticket) {
try {
return p_ticket.getFuture().get(p_timeout, p_timeUnit);
} catch (InterruptedException | ExecutionException | TimeoutException p_e) {
log.warn("Waiting on migration failed", p_e);
return null;
}
}

/**
* Registers all message types within the network component.
*/
void registerMessages() {

m_network.registerMessageType(DXRAMMessageTypes.MIGRATION_MESSAGES_TYPE,
MigrationMessages.SUBTYPE_MIGRATION_PUSH, MigrationPush.class);
Expand Down
31 changes: 29 additions & 2 deletions src/main/java/de/hhu/bsinfo/dxram/migration/MigrationService.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
package de.hhu.bsinfo.dxram.migration;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

Expand Down Expand Up @@ -45,6 +48,8 @@
import de.hhu.bsinfo.dxram.net.NetworkComponent;
import de.hhu.bsinfo.dxutils.ArrayListLong;
import de.hhu.bsinfo.dxutils.NodeID;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

/**
* Migration service providing migration of chunks.
Expand Down Expand Up @@ -168,10 +173,32 @@ public void targetMigrate(final long p_chunkID, final short p_target) {
* @param p_endChunkID The last chunk id
* @param p_target The target node
*/
public MigrationTicket<MigrationStatus> migrateRange(final long p_startChunkID, final long p_endChunkID, final short p_target) {
public MigrationTicket migrateRange(final long p_startChunkID, final long p_endChunkID, final short p_target) {
return m_migrationManager.migrateRange(p_target, new LongRange(p_startChunkID, p_endChunkID));
}

/**
* Waits until the corresponding migration finishes.
*
* @param p_ticket The ticket associated with the migration.
* @return The migration's status or null if an exception occurred.
*/
@Nullable
public MigrationStatus await(final @NotNull MigrationTicket p_ticket) {
return m_migrationManager.await(p_ticket);
}

/**
* Waits until the corresponding migration finishes or the specified timeout is reached.
*
* @param p_ticket The ticket associated with the migration.
* @return The migration's status or null if an exception occurred or the timeout was reached.
*/
@Nullable
public MigrationStatus await(final long p_timeout, final @NotNull TimeUnit p_timeUnit, final @NotNull MigrationTicket p_ticket) {
return m_migrationManager.await(p_timeout, p_timeUnit, p_ticket);
}

/**
* Migrates all chunks to another node.
*
Expand Down Expand Up @@ -261,7 +288,7 @@ protected boolean supportsPeer() {

@Override
protected void resolveComponentDependencies(final DXRAMComponentAccessor p_componentAccessor) {
m_migrationManager = new MigrationManager(1, p_componentAccessor);
m_migrationManager = new MigrationManager(16, p_componentAccessor);
m_boot = p_componentAccessor.getComponent(AbstractBootComponent.class);
m_backup = p_componentAccessor.getComponent(BackupComponent.class);
m_chunk = p_componentAccessor.getComponent(ChunkComponent.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,18 @@

import java.util.concurrent.CompletableFuture;

public class MigrationTicket<T> {
public class MigrationTicket {

private final CompletableFuture<T> m_future;
private final CompletableFuture<MigrationStatus> m_future;

private final MigrationIdentifier m_identifier;

public MigrationTicket(CompletableFuture<T> p_future, MigrationIdentifier p_identifier) {
public MigrationTicket(CompletableFuture<MigrationStatus> p_future, MigrationIdentifier p_identifier) {
m_future = p_future;
m_identifier = p_identifier;
}

public CompletableFuture<T> getFuture() {
CompletableFuture<MigrationStatus> getFuture() {
return m_future;
}

Expand Down
Loading

0 comments on commit 5d5f18c

Please sign in to comment.