Skip to content

Commit

Permalink
Misc fixes in cluster manager, replication and compaction manager (#1237
Browse files Browse the repository at this point in the history
)

1. Set DEBUG and TRACE log level in Histogram dumper
2. Fixed race condition in compaction manager test
3. Printed out remote dc exception during helix cluster manager
initialization (previously it was swallowed and hard to debug)
4. Moved partition lag update operation to exchange metadata method in
replication manager
5. Paused replication on store that is not started. Replication can
resume if store is restarted.
  • Loading branch information
jsjtzyy authored and zzmao committed Aug 15, 2019
1 parent f12ef9e commit e5d6e7e
Show file tree
Hide file tree
Showing 13 changed files with 124 additions and 31 deletions.
5 changes: 5 additions & 0 deletions ambry-api/src/main/java/com.github.ambry/store/Store.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ public interface Store {
*/
boolean isEmpty();

/**
* @return true if the store is started
*/
boolean isStarted();

/**
* Shuts down the store
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,11 @@ public void shutdown() {
started = false;
}

@Override
public boolean isStarted() {
return started;
}

private void checkStarted() throws StoreException {
if (!started) {
throw new StoreException("Store not started", StoreErrorCodes.Store_Not_Started);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,7 @@ public void onInstanceConfigChange(List<InstanceConfig> configs, NotificationCon
try {
initializeInstances(configs);
} catch (Exception e) {
logger.error("Exception occurred when initializing instances in {}: ", dcName, e);
initializationFailureMap.putIfAbsent(dcName, e);
}
instanceConfigInitialized.set(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,8 @@ public void replicate() {
for (RemoteReplicaInfo remoteReplicaInfo : replicasToReplicatePerNode) {
ReplicaId replicaId = remoteReplicaInfo.getReplicaId();
boolean inBackoff = time.milliseconds() < remoteReplicaInfo.getReEnableReplicationTime();
if (replicationDisabledPartitions.contains(replicaId.getPartitionId()) || replicaId.isDown() || inBackoff) {
if (replicationDisabledPartitions.contains(replicaId.getPartitionId()) || replicaId.isDown() || inBackoff
|| !remoteReplicaInfo.getLocalStore().isStarted()) {
continue;
}
activeReplicasPerNode.add(remoteReplicaInfo);
Expand Down Expand Up @@ -438,6 +439,8 @@ List<ExchangeMetadataResponse> exchangeMetadata(ConnectedChannel connectedChanne
new ExchangeMetadataResponse(missingStoreKeys, replicaMetadataResponseInfo.getFindToken(),
replicaMetadataResponseInfo.getRemoteReplicaLagInBytes());
exchangeMetadataResponseList.add(exchangeMetadataResponse);
replicationMetrics.updateLagMetricForRemoteReplica(remoteReplicaInfo,
exchangeMetadataResponse.localLagFromRemoteInBytes);
} catch (Exception e) {
logger.error(
"Remote node: " + remoteNode + " Thread name: " + threadName + " Remote replica: " + remoteReplicaInfo
Expand Down Expand Up @@ -879,8 +882,6 @@ private void writeMessagesToLocalStoreAndAdvanceTokens(List<ExchangeMetadataResp
totalBlobsFixed += messageInfoList.size();
remoteReplicaInfo.setToken(exchangeMetadataResponse.remoteToken);
remoteReplicaInfo.setLocalLagFromRemoteInBytes(exchangeMetadataResponse.localLagFromRemoteInBytes);
replicationMetrics.updateLagMetricForRemoteReplica(remoteReplicaInfo,
exchangeMetadataResponse.localLagFromRemoteInBytes);
logger.trace("Remote node: {} Thread name: {} Remote replica: {} Token after speaking to remote node: {}",
remoteNode, threadName, remoteReplicaInfo.getReplicaId(), exchangeMetadataResponse.remoteToken);
} catch (StoreException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public void start() throws ReplicationException {
retrieveReplicaTokensAndPersistIfNecessary(mountPath);
}
if (replicaThreadPoolByDc.size() == 0) {
logger.warn("Number of Datacenters to replicate from is 0, not starting any replica threads");
logger.warn("Number of data centers to replicate from is 0, not starting any replica threads");
return;
}
// valid for replication manager.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ private void storeBuf(ByteBuffer buffer) {
private final DummyLog log;
final List<MessageInfo> messageInfos;
final PartitionId id;
private boolean started;

InMemoryStore(PartitionId id, List<MessageInfo> messageInfos, List<ByteBuffer> buffers,
ReplicationTest.StoreEventListener listener) {
Expand All @@ -158,10 +159,12 @@ private void storeBuf(ByteBuffer buffer) {
log = new DummyLog(buffers);
this.listener = listener;
this.id = id;
started = true;
}

@Override
public void start() throws StoreException {
started = true;
}

@Override
Expand Down Expand Up @@ -309,5 +312,11 @@ public boolean isEmpty() {

@Override
public void shutdown() throws StoreException {
started = false;
}

@Override
public boolean isStarted() {
return started;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@
* Representation of a host. Contains all the data for all partitions.
*/
public class MockHost {
final Map<PartitionId, InMemoryStore> storesByPartition = new HashMap<>();
private final ClusterMap clusterMap;
private final Map<PartitionId, InMemoryStore> storesByPartition = new HashMap<>();

public final DataNodeId dataNodeId;
final Map<PartitionId, List<MessageInfo>> infosByPartition = new HashMap<>();
Expand Down Expand Up @@ -73,11 +73,11 @@ List<RemoteReplicaInfo> getRemoteReplicaInfos(MockHost remoteHost, ReplicationTe
for (ReplicaId peerReplicaId : replicaId.getPeerReplicaIds()) {
if (peerReplicaId.getDataNodeId().equals(remoteHost.dataNodeId)) {
PartitionId partitionId = replicaId.getPartitionId();
InMemoryStore store = storesByPartition.computeIfAbsent(partitionId, partitionId1 -> new InMemoryStore(partitionId,
infosByPartition.computeIfAbsent(partitionId1,
InMemoryStore store = storesByPartition.computeIfAbsent(partitionId,
partitionId1 -> new InMemoryStore(partitionId, infosByPartition.computeIfAbsent(partitionId1,
(Function<PartitionId, List<MessageInfo>>) partitionId2 -> new ArrayList<>()),
buffersByPartition.computeIfAbsent(partitionId1,
(Function<PartitionId, List<ByteBuffer>>) partitionId22 -> new ArrayList<>()), listener));
buffersByPartition.computeIfAbsent(partitionId1,
(Function<PartitionId, List<ByteBuffer>>) partitionId22 -> new ArrayList<>()), listener));
RemoteReplicaInfo remoteReplicaInfo =
new RemoteReplicaInfo(peerReplicaId, replicaId, store, new MockFindToken(0, 0), Long.MAX_VALUE,
SystemTime.getInstance(), new Port(peerReplicaId.getDataNodeId().getPort(), PortType.PLAINTEXT));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,8 @@ public void replicationAllPauseTest() throws Exception {
}

/**
* Tests pausing replication for all and individual partitions.
* Tests pausing replication for all and individual partitions. Also tests replication will pause on store that is not
* started and resume when store restarted.
* @throws Exception
*/
@Test
Expand Down Expand Up @@ -269,10 +270,12 @@ public void replicationPauseTest() throws Exception {
ReplicaThread replicaThread = replicasAndThread.getSecond();

Map<PartitionId, Integer> progressTracker = new HashMap<>();
PartitionId idToLeaveOut = clusterMap.getAllPartitionIds(null).get(0);
PartitionId partitionToResumeFirst = clusterMap.getAllPartitionIds(null).get(0);
PartitionId partitionToShutdownLocally = clusterMap.getAllPartitionIds(null).get(1);
boolean allStopped = false;
boolean onlyOneResumed = false;
boolean allReenabled = false;
boolean shutdownStoreRestarted = false;
Set<PartitionId> expectedPaused = new HashSet<>();
assertEquals("There should be no disabled partitions", expectedPaused,
replicaThread.getReplicationDisabledPartitions());
Expand All @@ -285,36 +288,42 @@ public void replicationPauseTest() throws Exception {
int lastProgress = progressTracker.computeIfAbsent(id, id1 -> 0);
int currentProgress = token.getIndex();
boolean partDone = currentProgress + 1 == remoteHost.infosByPartition.get(id).size();
if (allStopped || (onlyOneResumed && !id.equals(idToLeaveOut))) {
if (allStopped || (onlyOneResumed && !id.equals(partitionToResumeFirst)) || (allReenabled
&& !shutdownStoreRestarted && id.equals(partitionToShutdownLocally))) {
assertEquals("There should have been no progress", lastProgress, currentProgress);
} else if (!partDone) {
assertTrue("There has been no progress", currentProgress > lastProgress);
progressTracker.put(id, currentProgress);
}
replicationDone = replicationDone && partDone;
}
if (!allStopped && !onlyOneResumed && !allReenabled) {
if (!allStopped && !onlyOneResumed && !allReenabled && !shutdownStoreRestarted) {
replicaThread.controlReplicationForPartitions(clusterMap.getAllPartitionIds(null), false);
expectedPaused.addAll(clusterMap.getAllPartitionIds(null));
assertEquals("Disabled partitions sets do not match", expectedPaused,
replicaThread.getReplicationDisabledPartitions());
allStopped = true;
} else if (!onlyOneResumed && !allReenabled) {
} else if (!onlyOneResumed && !allReenabled && !shutdownStoreRestarted) {
// resume replication for first partition
replicaThread.controlReplicationForPartitions(Collections.singletonList(partitionIds.get(0)), true);
expectedPaused.remove(partitionIds.get(0));
assertEquals("Disabled partitions sets do not match", expectedPaused,
replicaThread.getReplicationDisabledPartitions());
allStopped = false;
onlyOneResumed = true;
} else if (!allReenabled) {
} else if (!allReenabled && !shutdownStoreRestarted) {
// not removing the first partition
replicaThread.controlReplicationForPartitions(clusterMap.getAllPartitionIds(null), true);
// shutdown one local store to pause replication against that store
localHost.storesByPartition.get(partitionToShutdownLocally).shutdown();
onlyOneResumed = false;
allReenabled = true;
expectedPaused.clear();
assertEquals("Disabled partitions sets do not match", expectedPaused,
replicaThread.getReplicationDisabledPartitions());
} else if (!shutdownStoreRestarted) {
localHost.storesByPartition.get(partitionToShutdownLocally).start();
shutdownStoreRestarted = true;
}
if (replicationDone) {
break;
Expand Down Expand Up @@ -1242,8 +1251,8 @@ public void replicationLagMetricTest() throws Exception {
replicaThread.fixMissingStoreKeys(new MockConnectionPool.MockConnection(remoteHost, batchSize),
replicasToReplicate.get(remoteHost.dataNodeId), response);
for (PartitionId partitionId : partitionIds) {
assertTrue("Replication lag should equal to 0",
replicaThread.getReplicationMetrics().getMaxLagForPartition(partitionId) == 0);
assertEquals("Replication lag should equal to 0", 0,
replicaThread.getReplicationMetrics().getMaxLagForPartition(partitionId));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -932,25 +932,25 @@ public void run() {
for (Map.Entry<Resource, Histogram> resourceToHistogram : getBlobLocalDcResourceToLatency.entrySet()) {
Resource resource = resourceToHistogram.getKey();
Histogram histogram = resourceToHistogram.getValue();
logger.info("{} GetBlob local DC latency histogram {}th percentile in ms: {}", resource.toString(),
logger.debug("{} GetBlob local DC latency histogram {}th percentile in ms: {}", resource.toString(),
quantile * 100, histogram.getSnapshot().getValue(quantile));
}
for (Map.Entry<Resource, Histogram> resourceToHistogram : getBlobCrossDcResourceToLatency.entrySet()) {
Resource resource = resourceToHistogram.getKey();
Histogram histogram = resourceToHistogram.getValue();
logger.info("{} GetBlob cross DC latency histogram {}th percentile in ms: {}", resource.toString(),
logger.trace("{} GetBlob cross DC latency histogram {}th percentile in ms: {}", resource.toString(),
quantile * 100, histogram.getSnapshot().getValue(quantile));
}
for (Map.Entry<Resource, Histogram> resourceToHistogram : getBlobInfoLocalDcResourceToLatency.entrySet()) {
Resource resource = resourceToHistogram.getKey();
Histogram histogram = resourceToHistogram.getValue();
logger.info("{} GetBlobInfo local DC latency histogram {}th percentile in ms: {}", resource.toString(),
logger.debug("{} GetBlobInfo local DC latency histogram {}th percentile in ms: {}", resource.toString(),
quantile * 100, histogram.getSnapshot().getValue(quantile));
}
for (Map.Entry<Resource, Histogram> resourceToHistogram : getBlobInfoCrossDcResourceToLatency.entrySet()) {
Resource resource = resourceToHistogram.getKey();
Histogram histogram = resourceToHistogram.getValue();
logger.info("{} GetBlobInfo cross DC latency histogram {}th percentile in ms: {}", resource.toString(),
logger.trace("{} GetBlobInfo cross DC latency histogram {}th percentile in ms: {}", resource.toString(),
quantile * 100, histogram.getSnapshot().getValue(quantile));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1245,10 +1245,12 @@ private static class MockStorageManager extends StorageManager {
* An empty {@link Store} implementation.
*/
private Store store = new Store() {
boolean started;

@Override
public void start() throws StoreException {
throwExceptionIfRequired();
started = true;
}

@Override
Expand Down Expand Up @@ -1317,7 +1319,7 @@ public FindInfo findEntriesSince(FindToken token, long maxTotalSizeOfEntries) th
tokenReceived = token;
maxTotalSizeOfEntriesReceived = maxTotalSizeOfEntries;
throwExceptionIfRequired();
return new FindInfo(Collections.EMPTY_LIST, FIND_TOKEN_FACTORY.getNewFindToken());
return new FindInfo(Collections.emptyList(), FIND_TOKEN_FACTORY.getNewFindToken());
}

@Override
Expand Down Expand Up @@ -1345,8 +1347,14 @@ public boolean isEmpty() {
return false;
}

@Override
public boolean isStarted() {
return started;
}

public void shutdown() throws StoreException {
throwExceptionIfRequired();
started = false;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -634,6 +634,11 @@ public boolean isEmpty() {
public void shutdown() throws StoreException {
throw new IllegalStateException("Not implemented");
}

@Override
public boolean isStarted() {
throw new IllegalStateException("Not implemented");
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -761,7 +761,8 @@ DiskSpaceRequirements getDiskSpaceRequirements() throws StoreException {
/**
* @return {@code true} if this store has been started successfully.
*/
boolean isStarted() {
@Override
public boolean isStarted() {
return started;
}

Expand Down
Loading

0 comments on commit e5d6e7e

Please sign in to comment.