Skip to content

Commit

Permalink
Simplify diff for routing table
Browse files Browse the repository at this point in the history
Signed-off-by: Bukhtawar Khan <bukhtawa@amazon.com>
  • Loading branch information
Bukhtawar committed Aug 4, 2024
1 parent b773ade commit 3fadc9f
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 122 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.cluster.AbstractDiffable;
import org.opensearch.cluster.Diff;
import org.opensearch.cluster.node.DiscoveryNode;
import org.opensearch.cluster.node.DiscoveryNodes;
import org.opensearch.common.Nullable;
Expand Down Expand Up @@ -75,7 +77,7 @@
* @opensearch.api
*/
@PublicApi(since = "1.0.0")
public class IndexShardRoutingTable implements Iterable<ShardRouting> {
public class IndexShardRoutingTable extends AbstractDiffable<IndexShardRoutingTable> implements Iterable<ShardRouting> {

final ShardShuffler shuffler;
// Shuffler for weighted round-robin shard routing. This uses rotation to permute shards.
Expand Down Expand Up @@ -527,6 +529,12 @@ private static List<ShardRouting> rankShardsAndUpdateStats(
return sortedShards;
}

@Override
public void writeTo(StreamOutput out) throws IOException {
this.shardId().getIndex().writeTo(out);
Builder.writeToThin(this, out);
}

private static class NodeRankComparator implements Comparator<ShardRouting> {
private final Map<String, Double> nodeRanks;

Expand Down Expand Up @@ -1049,6 +1057,14 @@ private void populateInitializingShardWeightsMap(WeightedRouting weightedRouting
}
}

public static IndexShardRoutingTable readFrom(StreamInput in) throws IOException {
return IndexShardRoutingTable.Builder.readFrom(in);
}

public static Diff<IndexShardRoutingTable> readDiffFrom(StreamInput in) throws IOException {
return readDiffFrom(IndexShardRoutingTable::readFrom, in);
}

/**
* Builder of an index shard routing table.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,160 +9,109 @@
package org.opensearch.cluster.routing;

import org.opensearch.cluster.Diff;
import org.opensearch.cluster.DiffableUtils;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.index.Index;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* Represents a difference between {@link RoutingTable} objects that can be serialized and deserialized.
*/
public class RoutingTableIncrementalDiff implements Diff<RoutingTable> {

private final Map<String, Diff<IndexRoutingTable>> diffs;
private DiffableUtils.MapDiff<String, IndexRoutingTable, Map<String, IndexRoutingTable>> indicesRouting;

/**
* Constructs a new RoutingTableIncrementalDiff with the given differences.
*
* @param diffs a map containing the differences of {@link IndexRoutingTable}.
*/
public RoutingTableIncrementalDiff(Map<String, Diff<IndexRoutingTable>> diffs) {
this.diffs = diffs;
private final long version;

private static final DiffableUtils.DiffableValueSerializer<String, IndexRoutingTable> CUSTOM_ROUTING_TABLE_DIFFABLE_VALUE_SERIALIZER =
new DiffableUtils.DiffableValueSerializer<>() {

@Override
public IndexRoutingTable read(StreamInput in, String key) throws IOException {
return IndexRoutingTable.readFrom(in);
}

@Override
public Diff<IndexRoutingTable> readDiff(StreamInput in, String key) throws IOException {
return new RoutingTableIncrementalDiff.IndexRoutingTableIncrementalDiff(in);
}

@Override
public Diff<IndexRoutingTable> diff(IndexRoutingTable currentState, IndexRoutingTable previousState) {
return new RoutingTableIncrementalDiff.IndexRoutingTableIncrementalDiff(currentState.getIndex(), currentState, previousState);
}
};

public RoutingTableIncrementalDiff(RoutingTable before, RoutingTable after) {
version = after.version();
indicesRouting = DiffableUtils.diff(
before.getIndicesRouting(),
after.getIndicesRouting(),
DiffableUtils.getStringKeySerializer(),
CUSTOM_ROUTING_TABLE_DIFFABLE_VALUE_SERIALIZER
);
}

/**
* Gets the map of differences of {@link IndexRoutingTable}.
*
* @return a map containing the differences.
*/
public Map<String, Diff<IndexRoutingTable>> getDiffs() {
return diffs;
public RoutingTableIncrementalDiff(StreamInput in) throws IOException {
version = in.readLong();
indicesRouting = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getStringKeySerializer(), CUSTOM_ROUTING_TABLE_DIFFABLE_VALUE_SERIALIZER);
}

/**
* Reads a {@link RoutingTableIncrementalDiff} from the given {@link StreamInput}.
*
* @param in the input stream to read from.
* @return the deserialized RoutingTableIncrementalDiff.
* @throws IOException if an I/O exception occurs while reading from the stream.
*/
public static RoutingTableIncrementalDiff readFrom(StreamInput in) throws IOException {
int size = in.readVInt();
Map<String, Diff<IndexRoutingTable>> diffs = new HashMap<>();

for (int i = 0; i < size; i++) {
String key = in.readString();
Diff<IndexRoutingTable> diff = IndexRoutingTableIncrementalDiff.readFrom(in);
diffs.put(key, diff);
}
return new RoutingTableIncrementalDiff(diffs);
return new RoutingTableIncrementalDiff(in);
}

/**
* Applies the differences to the provided {@link RoutingTable}.
*
* @param part the original RoutingTable to which the differences will be applied.
* @return the updated RoutingTable with the applied differences.
*/
@Override
public RoutingTable apply(RoutingTable part) {
RoutingTable.Builder builder = new RoutingTable.Builder();
for (IndexRoutingTable indexRoutingTable : part) {
builder.add(indexRoutingTable); // Add existing index routing tables to builder
}

// Apply the diffs
for (Map.Entry<String, Diff<IndexRoutingTable>> entry : diffs.entrySet()) {
builder.add(entry.getValue().apply(part.index(entry.getKey())));
}

return builder.build();
return new RoutingTable(version, indicesRouting.apply(part.getIndicesRouting()));
}

/**
* Writes the differences to the given {@link StreamOutput}.
*
* @param out the output stream to write to.
* @throws IOException if an I/O exception occurs while writing to the stream.
*/
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(diffs.size());
for (Map.Entry<String, Diff<IndexRoutingTable>> entry : diffs.entrySet()) {
out.writeString(entry.getKey());
entry.getValue().writeTo(out);
}
out.writeLong(version);
indicesRouting.writeTo(out);
}

/**
* Represents a difference between {@link IndexShardRoutingTable} objects that can be serialized and deserialized.
*/
public static class IndexRoutingTableIncrementalDiff implements Diff<IndexRoutingTable> {

private final List<IndexShardRoutingTable> indexShardRoutingTables;
private final Diff<Map<Integer, IndexShardRoutingTable>> indexShardRoutingTables ;

/**
* Constructs a new IndexShardRoutingTableDiff with the given shard routing tables.
*
* @param indexShardRoutingTables a list of IndexShardRoutingTable representing the differences.
*/
public IndexRoutingTableIncrementalDiff(List<IndexShardRoutingTable> indexShardRoutingTables) {
this.indexShardRoutingTables = indexShardRoutingTables;
private final Index index;

public IndexRoutingTableIncrementalDiff(Index index, IndexRoutingTable before, IndexRoutingTable after) {
this.index = index;
this.indexShardRoutingTables = DiffableUtils.diff(before.getShards(), after.getShards(), DiffableUtils.getIntKeySerializer());
}

private static final DiffableUtils.DiffableValueReader<Integer, IndexShardRoutingTable> DIFF_VALUE_READER =
new DiffableUtils.DiffableValueReader<>(IndexShardRoutingTable::readFrom, IndexShardRoutingTable::readDiffFrom);


public IndexRoutingTableIncrementalDiff(StreamInput in) throws IOException {
this.index = new Index(in);
this.indexShardRoutingTables = DiffableUtils.readJdkMapDiff(in, DiffableUtils.getIntKeySerializer(), DIFF_VALUE_READER);
}

/**
* Applies the differences to the provided {@link IndexRoutingTable}.
*
* @param part the original IndexRoutingTable to which the differences will be applied.
* @return the updated IndexRoutingTable with the applied differences.
*/
@Override
public IndexRoutingTable apply(IndexRoutingTable part) {
IndexRoutingTable.Builder builder = new IndexRoutingTable.Builder(part.getIndex());
for (IndexShardRoutingTable shardRoutingTable : part) {
builder.addIndexShard(shardRoutingTable); // Add existing shards to builder
}

// Apply the diff: update or add the new shard routing tables
for (IndexShardRoutingTable diffShard : indexShardRoutingTables) {
builder.addIndexShard(diffShard);
}
return builder.build();
return new IndexRoutingTable(index, indexShardRoutingTables.apply(part.getShards()));
}

/**
* Writes the differences to the given {@link StreamOutput}.
*
* @param out the output stream to write to.
* @throws IOException if an I/O exception occurs while writing to the stream.
*/
@Override
public void writeTo(StreamOutput out) throws IOException {
out.writeVInt(indexShardRoutingTables.size());
for (IndexShardRoutingTable shardRoutingTable : indexShardRoutingTables) {
IndexShardRoutingTable.Builder.writeTo(shardRoutingTable, out);
}
index.writeTo(out);
indexShardRoutingTables.writeTo(out);
}

/**
* Reads a {@link IndexRoutingTableIncrementalDiff} from the given {@link StreamInput}.
*
* @param in the input stream to read from.
* @return the deserialized IndexShardRoutingTableDiff.
* @throws IOException if an I/O exception occurs while reading from the stream.
*/
public static IndexRoutingTableIncrementalDiff readFrom(StreamInput in) throws IOException {
int size = in.readVInt();
List<IndexShardRoutingTable> indexShardRoutingTables = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
IndexShardRoutingTable shardRoutingTable = IndexShardRoutingTable.Builder.readFrom(in);
indexShardRoutingTables.add(shardRoutingTable);
}
return new IndexRoutingTableIncrementalDiff(indexShardRoutingTables);
return new IndexRoutingTableIncrementalDiff(in);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
*/
public interface RemoteRoutingTableService extends LifecycleComponent {

public static final DiffableUtils.DiffableValueSerializer<String, IndexRoutingTable> CUSTOM_ROUTING_TABLE_DIFFABLE_VALUE_SERIALIZER =
new DiffableUtils.DiffableValueSerializer<String, IndexRoutingTable>() {
DiffableUtils.DiffableValueSerializer<String, IndexRoutingTable> CUSTOM_ROUTING_TABLE_DIFFABLE_VALUE_SERIALIZER =
new DiffableUtils.DiffableValueSerializer<>() {
@Override
public IndexRoutingTable read(StreamInput in, String key) throws IOException {
return IndexRoutingTable.readFrom(in);
Expand All @@ -51,16 +51,7 @@ public Diff<IndexRoutingTable> readDiff(StreamInput in, String key) throws IOExc

@Override
public Diff<IndexRoutingTable> diff(IndexRoutingTable currentState, IndexRoutingTable previousState) {
List<IndexShardRoutingTable> diffs = new ArrayList<>();
for (Map.Entry<Integer, IndexShardRoutingTable> entry : currentState.getShards().entrySet()) {
Integer index = entry.getKey();
IndexShardRoutingTable currentShardRoutingTable = entry.getValue();
IndexShardRoutingTable previousShardRoutingTable = previousState.shard(index);
if (previousShardRoutingTable == null || !previousShardRoutingTable.equals(currentShardRoutingTable)) {
diffs.add(currentShardRoutingTable);
}
}
return new RoutingTableIncrementalDiff.IndexRoutingTableIncrementalDiff(diffs);
return new RoutingTableIncrementalDiff.IndexRoutingTableIncrementalDiff(currentState.getIndex(), currentState, previousState);
}
};

Expand Down

0 comments on commit 3fadc9f

Please sign in to comment.