Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle fields.with.dots in routing_path #83148

Merged
merged 7 commits into from
Feb 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@
"TLS",
"Task Management",
"Transform",
"TSDB",
"Watcher"
]
},
Expand Down
5 changes: 5 additions & 0 deletions docs/changelog/83148.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 83148
summary: Handle `fields.with.dots` in `routing_path`
area: TSDB
type: feature
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@

package org.elasticsearch.cluster.routing;

import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.StringHelper;
import org.elasticsearch.action.RoutingMissingException;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.MappingMetadata;
Expand All @@ -23,7 +25,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.function.IntConsumer;
Expand Down Expand Up @@ -224,66 +226,78 @@ public int indexShard(String id, @Nullable String routing, XContentType sourceTy
}
assert Transports.assertNotTransportThread("parsing the _source can get slow");

List<NameAndHash> hashes = new ArrayList<>();
try {
try (XContentParser parser = sourceType.xContent().createParser(parserConfig, source.streamInput())) {
parser.nextToken(); // Move to first token
if (parser.currentToken() == null) {
throw new IllegalArgumentException("Error extracting routing: source didn't contain any routing fields");
}
int hash = extractObject(parser);
parser.nextToken();
extractObject(hashes, null, parser);
ensureExpectedToken(null, parser.nextToken(), parser);
return hashToShardId(hash);
}
} catch (IOException | ParsingException e) {
throw new IllegalArgumentException("Error extracting routing: " + e.getMessage(), e);
}
return hashToShardId(hashesToHash(hashes));
}

private static int extractObject(XContentParser source) throws IOException {
ensureExpectedToken(Token.FIELD_NAME, source.nextToken(), source);
String firstFieldName = source.currentName();
source.nextToken();
int firstHash = extractItem(source);
if (source.currentToken() == Token.END_OBJECT) {
// Just one routing key in this object
// Use ^ like Map.Entry's hashcode
return Murmur3HashFunction.hash(firstFieldName) ^ firstHash;
}
List<NameAndHash> hashes = new ArrayList<>();
hashes.add(new NameAndHash(firstFieldName, firstHash));
do {
private static void extractObject(List<NameAndHash> hashes, @Nullable String path, XContentParser source) throws IOException {
while (source.currentToken() != Token.END_OBJECT) {
ensureExpectedToken(Token.FIELD_NAME, source.currentToken(), source);
String fieldName = source.currentName();
String subPath = path == null ? fieldName : path + "." + fieldName;
source.nextToken();
hashes.add(new NameAndHash(fieldName, extractItem(source)));
} while (source.currentToken() != Token.END_OBJECT);
Collections.sort(hashes, Comparator.comparing(nameAndHash -> nameAndHash.name));
/*
* This is the same as Arrays.hash(Map.Entry<fieldName, hash>) but we're
* writing it out so for extra paranoia. Changing this will change how
* documents are routed and we don't want a jdk update that modifies Arrays
* or Map.Entry to sneak up on us.
*/
int hash = 0;
for (NameAndHash nameAndHash : hashes) {
int thisHash = Murmur3HashFunction.hash(nameAndHash.name) ^ nameAndHash.hash;
hash = 31 * hash + thisHash;
extractItem(hashes, subPath, source);
}
return hash;
}

private static int extractItem(XContentParser source) throws IOException {
if (source.currentToken() == Token.START_OBJECT) {
int hash = extractObject(source);
source.nextToken();
return hash;
private static void extractItem(List<NameAndHash> hashes, String path, XContentParser source) throws IOException {
switch (source.currentToken()) {
case START_OBJECT:
source.nextToken();
extractObject(hashes, path, source);
source.nextToken();
break;
case VALUE_STRING:
hashes.add(new NameAndHash(new BytesRef(path), hash(new BytesRef(source.text()))));
source.nextToken();
break;
case VALUE_NULL:
source.nextToken();
break;
default:
throw new ParsingException(
source.getTokenLocation(),
"Routing values must be strings but found [{}]",
source.currentToken()
);
}
if (source.currentToken() == Token.VALUE_STRING) {
int hash = Murmur3HashFunction.hash(source.text());
source.nextToken();
return hash;
}

private static int hash(BytesRef ref) {
return StringHelper.murmurhash3_x86_32(ref, 0);
}

private static int hashesToHash(List<NameAndHash> hashes) {
Collections.sort(hashes);
Iterator<NameAndHash> itr = hashes.iterator();
if (itr.hasNext() == false) {
throw new IllegalArgumentException("Error extracting routing: source didn't contain any routing fields");
}
NameAndHash prev = itr.next();
int hash = hash(prev.name) ^ prev.hash;
while (itr.hasNext()) {
NameAndHash next = itr.next();
if (prev.name.equals(next.name)) {
throw new IllegalArgumentException("Duplicate routing dimension for [" + next.name + "]");
}
int thisHash = hash(next.name) ^ next.hash;
hash = 31 * hash + thisHash;
prev = next;
}
throw new ParsingException(source.getTokenLocation(), "Routing values must be strings but found [{}]", source.currentToken());
return hash;
}

@Override
Expand Down Expand Up @@ -316,5 +330,10 @@ private String error(String operation) {
}
}

private record NameAndHash(String name, int hash) {}
private static record NameAndHash(BytesRef name, int hash) implements Comparable<NameAndHash> {
@Override
public int compareTo(NameAndHash o) {
return name.compareTo(o.name);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
*/
package org.elasticsearch.cluster.routing;

import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.StringHelper;
import org.elasticsearch.Version;
import org.elasticsearch.action.RoutingMissingException;
import org.elasticsearch.cluster.metadata.IndexMetadata;
Expand Down Expand Up @@ -513,7 +515,7 @@ public void testRoutingPathOneSub() throws IOException {
assertIndexShard(
routing,
Map.of("foo", Map.of("bar", "cat"), "baz", "dog"),
Math.floorMod(hash(List.of("foo", List.of("bar", "cat"))), shards)
Math.floorMod(hash(List.of("foo.bar", "cat")), shards)
);
}

Expand All @@ -523,10 +525,16 @@ public void testRoutingPathManySubs() throws IOException {
assertIndexShard(
routing,
Map.of("foo", Map.of("a", "cat"), "bar", Map.of("thing", "yay", "this", "too")),
Math.floorMod(hash(List.of("bar", List.of("thing", "yay", "this", "too"), "foo", List.of("a", "cat"))), shards)
Math.floorMod(hash(List.of("bar.thing", "yay", "bar.this", "too", "foo.a", "cat")), shards)
);
}

public void testRoutingPathDotInName() throws IOException {
int shards = between(2, 1000);
IndexRouting routing = indexRoutingForPath(shards, "foo.bar");
assertIndexShard(routing, Map.of("foo.bar", "cat", "baz", "dog"), Math.floorMod(hash(List.of("foo.bar", "cat")), shards));
}

public void testRoutingPathBwc() throws IOException {
Version version = VersionUtils.randomIndexCompatibleVersion(random());
IndexRouting routing = indexRoutingForPath(version, 8, "dim.*,other.*,top");
Expand All @@ -538,12 +546,13 @@ public void testRoutingPathBwc() throws IOException {
* versions of Elasticsearch must continue to route based on the
* version on the index.
*/
assertIndexShard(routing, Map.of("dim", Map.of("a", "a")), 0);
assertIndexShard(routing, Map.of("dim", Map.of("a", "a")), 4);
assertIndexShard(routing, Map.of("dim", Map.of("a", "b")), 5);
assertIndexShard(routing, Map.of("dim", Map.of("c", "d")), 4);
assertIndexShard(routing, Map.of("other", Map.of("a", "a")), 5);
assertIndexShard(routing, Map.of("top", "a"), 3);
assertIndexShard(routing, Map.of("dim", Map.of("c", "d"), "top", "b"), 2);
assertIndexShard(routing, Map.of("other", Map.of("a", "a")), 7);
assertIndexShard(routing, Map.of("top", "a"), 5);
assertIndexShard(routing, Map.of("dim", Map.of("c", "d"), "top", "b"), 0);
assertIndexShard(routing, Map.of("dim.a", "a"), 4);
}

private IndexRouting indexRoutingForPath(int shards, String path) {
Expand All @@ -560,8 +569,8 @@ private IndexRouting indexRoutingForPath(Version createdVersion, int shards, Str
);
}

private void assertIndexShard(IndexRouting routing, Map<String, Object> source, int id) throws IOException {
assertThat(routing.indexShard(randomAlphaOfLength(5), null, XContentType.JSON, source(source)), equalTo(id));
private void assertIndexShard(IndexRouting routing, Map<String, Object> source, int expected) throws IOException {
assertThat(routing.indexShard(randomAlphaOfLength(5), null, XContentType.JSON, source(source)), equalTo(expected));
}

private BytesReference source(Map<String, Object> doc) throws IOException {
Expand All @@ -581,24 +590,14 @@ private BytesReference source(Map<String, Object> doc) throws IOException {
/**
* Build the hash we expect from the extracter.
*/
private int hash(List<?> keysAndValues) {
private int hash(List<String> keysAndValues) {
assertThat(keysAndValues.size() % 2, equalTo(0));
int hash = 0;
for (int i = 0; i < keysAndValues.size(); i += 2) {
int thisHash = Murmur3HashFunction.hash(keysAndValues.get(i).toString()) ^ expectedValueHash(keysAndValues.get(i + 1));
hash = hash * 31 + thisHash;
int keyHash = StringHelper.murmurhash3_x86_32(new BytesRef(keysAndValues.get(i)), 0);
int valueHash = StringHelper.murmurhash3_x86_32(new BytesRef(keysAndValues.get(i + 1)), 0);
hash = hash * 31 + (keyHash ^ valueHash);
}
return hash;
}

private int expectedValueHash(Object value) {
if (value instanceof List) {
return hash((List<?>) value);
}
if (value instanceof String) {
return Murmur3HashFunction.hash((String) value);
}
throw new IllegalArgumentException("Unsupported value: " + value);
}

}