Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce clustermap change listener to propagate remote replica addition/removal events #1368

Merged
merged 8 commits into from
Feb 10, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,12 @@ public interface ClusterMap extends AutoCloseable {
*/
ReplicaId getBootstrapReplica(String partitionIdStr, DataNodeId dataNodeId);

/**
* Register a listener of cluster map for any changes.
* @param clusterMapChangeListener the {@link ClusterMapChangeListener} to add.
*/
void registerClusterMapListener(ClusterMapChangeListener clusterMapChangeListener);

/**
* Close the cluster map. Any cleanups should be done in this call.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/**
* Copyright 2020 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package com.github.ambry.clustermap;

import java.util.List;


/**
* A {@link ClusterMap} listener that takes actions on local node when cluster map is changed.
*/
public interface ClusterMapChangeListener {
/**
* Take actions when replicas are added or removed on local node.
* @param addedReplicas a list of {@link ReplicaId}(s) that have been added.
* @param removedReplicas a list of {@link ReplicaId}(s) that have been removed.
*/
void onReplicaAddedOrRemoved(List<ReplicaId> addedReplicas, List<ReplicaId> removedReplicas);
}
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,14 @@ public ReplicaId getBootstrapReplica(String partitionIdStr, DataNodeId dataNodeI
return helixClusterManager.getBootstrapReplica(partitionIdStr, dataNodeId);
}

@Override
public void registerClusterMapListener(ClusterMapChangeListener clusterMapChangeListener) {
staticClusterManager.registerClusterMapListener(clusterMapChangeListener);
if (helixClusterManager != null) {
helixClusterManager.registerClusterMapListener(clusterMapChangeListener);
}
}

@Override
public void close() {
staticClusterManager.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ public class HelixClusterManager implements ClusterMap {
// manager to dynamically incorporate newer changes in the cluster. This variable is atomic so that the gauge metric
// reflects the current value.
private final AtomicLong currentXid;
private final List<ClusterMapChangeListener> clusterMapChangeListeners = new ArrayList<>();
final HelixClusterManagerMetrics helixClusterManagerMetrics;

/**
Expand Down Expand Up @@ -489,6 +490,11 @@ public ReplicaId getBootstrapReplica(String partitionIdStr, DataNodeId dataNodeI
return bootstrapReplica;
}

@Override
public void registerClusterMapListener(ClusterMapChangeListener clusterMapChangeListener) {
clusterMapChangeListeners.add(clusterMapChangeListener);
}

/**
* Disconnect from the HelixManagers associated with each and every datacenter.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,11 @@ public MetricRegistry getMetricRegistry() {
return metricRegistry;
}

@Override
public void registerClusterMapListener(ClusterMapChangeListener clusterMapChangeListener) {
throw new UnsupportedOperationException("Registering clustermap listener is not supported in static clustermap");
}

// Administrative API
// -----------------------

Expand Down Expand Up @@ -535,8 +540,7 @@ public JSONObject getSnapshot() {

@Override
public ReplicaId getBootstrapReplica(String partitionIdStr, DataNodeId dataNodeId) {
throw new UnsupportedOperationException(
"Adding new replica is currently not supported in static cluster manager.");
throw new UnsupportedOperationException("Adding new replica is currently not supported in static cluster manager.");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class MockClusterMap implements ClusterMap {
private String localDatacenterName;

private final MockPartitionId specialPartition;
private ClusterMapChangeListener clusterMapChangeListener = null;

private RuntimeException exceptionOnSnapshot = null;

Expand Down Expand Up @@ -249,7 +250,7 @@ protected ArrayList<Port> getListOfPorts(int port) {
return ports;
}

protected ArrayList<Port> getListOfPorts(int port, int sslPort) {
public static ArrayList<Port> getListOfPorts(int port, int sslPort) {
ArrayList<Port> ports = new ArrayList<Port>();
ports.add(new Port(port, PortType.PLAINTEXT));
ports.add(new Port(sslPort, PortType.SSL));
Expand Down Expand Up @@ -537,11 +538,23 @@ public ReplicaId getBootstrapReplica(String partitionIdStr, DataNodeId dataNodeI
return newReplica;
}

@Override
public void registerClusterMapListener(ClusterMapChangeListener clusterMapChangeListener) {
this.clusterMapChangeListener = clusterMapChangeListener;
}

@Override
public void close() {
// No-op.
}

/**
* @return {@link ClusterMapChangeListener} registered to this cluster map.
*/
public ClusterMapChangeListener getClusterMapChangeListener() {
return clusterMapChangeListener;
}

/**
* Sets the local datacenter name and changes the views of the partition classes. Not thread safe.
* @param localDatacenterName the name of the local datacenter
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.codahale.metrics.MetricRegistry;
import com.github.ambry.clustermap.ClusterMap;
import com.github.ambry.clustermap.ClusterMapChangeListener;
import com.github.ambry.clustermap.DataNodeId;
import com.github.ambry.clustermap.MockReplicaId;
import com.github.ambry.clustermap.PartitionId;
Expand Down Expand Up @@ -126,6 +127,10 @@ public ReplicaId getBootstrapReplica(String partitionIdStr, DataNodeId dataNodeI
return null;
}

@Override
public void registerClusterMapListener(ClusterMapChangeListener clusterMapChangeListener) {
}

@Override
public void close() {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

import com.codahale.metrics.MetricRegistry;
import com.github.ambry.clustermap.ClusterMap;
import com.github.ambry.clustermap.ClusterMapChangeListener;
import com.github.ambry.clustermap.ClusterMapUtils;
import com.github.ambry.clustermap.DataNodeId;
import com.github.ambry.clustermap.MockClusterMap;
Expand Down Expand Up @@ -384,8 +385,11 @@ public JSONObject getSnapshot() {
}

@Override
public void close() {
public void registerClusterMapListener(ClusterMapChangeListener clusterMapChangeListener) {
}

@Override
public void close() {
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
import com.github.ambry.clustermap.ReplicaId;
import com.github.ambry.store.Store;
import java.util.List;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;


public class PartitionInfo {
Expand All @@ -25,6 +28,7 @@ public class PartitionInfo {
private final PartitionId partitionId;
private final Store store;
private final ReplicaId localReplicaId;
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();

public PartitionInfo(List<RemoteReplicaInfo> remoteReplicas, PartitionId partitionId, Store store,
ReplicaId localReplicaId) {
Expand All @@ -39,7 +43,12 @@ public PartitionId getPartitionId() {
}

public List<RemoteReplicaInfo> getRemoteReplicaInfos() {
return remoteReplicas;
rwLock.readLock().lock();
try {
return remoteReplicas;
} finally {
rwLock.readLock().unlock();
}
}

public Store getStore() {
Expand All @@ -50,6 +59,52 @@ public ReplicaId getLocalReplicaId() {
return this.localReplicaId;
}

/**
* Add {@link RemoteReplicaInfo} to this {@link PartitionInfo} if it is previously absent.
* @param remoteReplicaInfo the {@link RemoteReplicaInfo} to add.
* @return {@code true} if remote replica info is added. {@code false} if it is already present
*/
boolean addReplicaInfoIfAbsent(RemoteReplicaInfo remoteReplicaInfo) {
rwLock.writeLock().lock();
boolean isAdded = false;
try {
List<RemoteReplicaInfo> foundSameReplica = remoteReplicas.stream()
.filter(info -> info.getReplicaId() == remoteReplicaInfo.getReplicaId())
.collect(Collectors.toList());
if (foundSameReplica.isEmpty()) {
remoteReplicas.add(remoteReplicaInfo);
isAdded = true;
}
} finally {
rwLock.writeLock().unlock();
}
return isAdded;
}

/**
* Remove {@link RemoteReplicaInfo} of given replica from this {@link PartitionInfo} if it is present.
* @param remoteReplica the {@link ReplicaId} whose info should be removed.
* @return {@link RemoteReplicaInfo} that is removed, can be null if it is not present.
*/
RemoteReplicaInfo removeReplicaInfoIfPresent(ReplicaId remoteReplica) {
rwLock.writeLock().lock();
RemoteReplicaInfo replicaInfoToRemove = null;
try {
for (RemoteReplicaInfo remoteReplicaInfo : remoteReplicas) {
if (remoteReplicaInfo.getReplicaId().getDataNodeId() == remoteReplica.getDataNodeId()) {
replicaInfoToRemove = remoteReplicaInfo;
break;
}
}
if (replicaInfoToRemove != null) {
remoteReplicas.remove(replicaInfoToRemove);
}
} finally {
rwLock.writeLock().unlock();
}
return replicaInfoToRemove;
}

@Override
public String toString() {
return partitionId.toString() + " " + remoteReplicas.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,7 @@ public void shutdown() throws ReplicationException {
protected void removeRemoteReplicaInfoFromReplicaThread(List<RemoteReplicaInfo> remoteReplicaInfos) {
for (RemoteReplicaInfo remoteReplicaInfo : remoteReplicaInfos) {
// Thread safe with addRemoteReplicaInfoToReplicaThread.
// For ReplicationManger, for same thread, removeRemoteReplicaInfo() ensures lock is held by only one thread at any time.
// For ReplicationManger, removeRemoteReplicaInfo() ensures lock is held by only one thread at any time.
// For CloudBackUpManager with HelixVcrCluster, Helix requires acknowledgement before next message for the same
// resource, which means methods in HelixVcrStateModel will be executed sequentially for same partition.
// So do listener actions in addPartition() and removePartition().
Expand Down
Loading