-
Notifications
You must be signed in to change notification settings - Fork 25.1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Big arrays sliced from nettey buffers (int) (#89668)
This teaches `IntArray` about our serialization. The interesting bit here is that reading slices the reference to the underlying buffer rather than copying. That reference can be retained as long as it's needed, holding the underlying buffer open until the `IntArray` is `close`d. This should allow aggregations to send dense representations between nodes with one fewer copy operation.
- Loading branch information
Showing
10 changed files
with
252 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
79 changes: 79 additions & 0 deletions
79
server/src/main/java/org/elasticsearch/common/util/ReleasableIntArray.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License | ||
* 2.0 and the Server Side Public License, v 1; you may not use this file except | ||
* in compliance with, at your election, the Elastic License 2.0 or the Server | ||
* Side Public License, v 1. | ||
*/ | ||
|
||
package org.elasticsearch.common.util; | ||
|
||
import org.apache.lucene.util.RamUsageEstimator; | ||
import org.elasticsearch.common.bytes.ReleasableBytesReference; | ||
import org.elasticsearch.common.io.stream.StreamInput; | ||
import org.elasticsearch.common.io.stream.StreamOutput; | ||
|
||
import java.io.IOException; | ||
|
||
class ReleasableIntArray implements IntArray { | ||
private static final long SHALLOW_SIZE = RamUsageEstimator.shallowSizeOfInstance(ReleasableIntArray.class); | ||
|
||
private final ReleasableBytesReference ref; | ||
|
||
ReleasableIntArray(StreamInput in) throws IOException { | ||
ref = in.readReleasableBytesReference(); | ||
} | ||
|
||
@Override | ||
public void writeTo(StreamOutput out) throws IOException { | ||
out.writeBytesReference(ref); | ||
} | ||
|
||
@Override | ||
public long size() { | ||
return ref.length() / 4; | ||
} | ||
|
||
@Override | ||
public int get(long index) { | ||
if (index > Integer.MAX_VALUE / 4) { | ||
// We can't serialize messages longer than 2gb anyway | ||
throw new ArrayIndexOutOfBoundsException(); | ||
} | ||
return ref.getIntLE((int) index * 4); | ||
} | ||
|
||
@Override | ||
public int set(long index, int value) { | ||
throw new UnsupportedOperationException(); | ||
} | ||
|
||
@Override | ||
public int increment(long index, int inc) { | ||
throw new UnsupportedOperationException(); | ||
} | ||
|
||
@Override | ||
public void fill(long fromIndex, long toIndex, int value) { | ||
throw new UnsupportedOperationException(); | ||
} | ||
|
||
@Override | ||
public void set(long index, byte[] buf, int offset, int len) { | ||
throw new UnsupportedOperationException(); | ||
} | ||
|
||
@Override | ||
public long ramBytesUsed() { | ||
/* | ||
* If we return the size of the buffer that we've sliced | ||
* we're likely to double count things. | ||
*/ | ||
return SHALLOW_SIZE; | ||
} | ||
|
||
@Override | ||
public void close() { | ||
ref.decRef(); | ||
} | ||
} |
83 changes: 83 additions & 0 deletions
83
server/src/test/java/org/elasticsearch/common/bytes/ReleasableBytesReferenceStreamTests.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,83 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the Elastic License | ||
* 2.0 and the Server Side Public License, v 1; you may not use this file except | ||
* in compliance with, at your election, the Elastic License 2.0 or the Server | ||
* Side Public License, v 1. | ||
*/ | ||
|
||
package org.elasticsearch.common.bytes; | ||
|
||
import org.elasticsearch.common.io.stream.AbstractStreamTests; | ||
import org.elasticsearch.common.io.stream.BytesStreamOutput; | ||
import org.elasticsearch.common.io.stream.StreamInput; | ||
import org.elasticsearch.common.util.BigArrays; | ||
import org.elasticsearch.common.util.IntArray; | ||
import org.junit.After; | ||
|
||
import java.io.IOException; | ||
import java.util.ArrayList; | ||
import java.util.Collections; | ||
import java.util.IdentityHashMap; | ||
import java.util.Iterator; | ||
import java.util.List; | ||
import java.util.Set; | ||
|
||
import static org.hamcrest.Matchers.equalTo; | ||
|
||
public class ReleasableBytesReferenceStreamTests extends AbstractStreamTests { | ||
private final List<ReleasableBytesReference> opened = new ArrayList<>(); | ||
private final Set<Exception> openTraces = Collections.newSetFromMap(new IdentityHashMap<>()); | ||
|
||
@After | ||
public void checkAllClosed() throws Exception { | ||
// Decrement one time to simulate closing the netty buffer after the stream is captured | ||
for (ReleasableBytesReference r : opened) { | ||
r.decRef(); | ||
} | ||
// Now that we've decremented, we expect all streams will have been closed | ||
Iterator<Exception> iter = openTraces.iterator(); | ||
if (iter.hasNext()) { | ||
throw new Exception("didn't close iterator - cause is opening location", iter.next()); | ||
} | ||
for (ReleasableBytesReference r : opened) { | ||
assertThat(r.hasReferences(), equalTo(false)); | ||
} | ||
} | ||
|
||
@Override | ||
protected StreamInput getStreamInput(BytesReference bytesReference) throws IOException { | ||
// Grab an exception at the opening location, so we can throw it if we don't close | ||
Exception trace = new Exception(); | ||
openTraces.add(trace); | ||
|
||
ReleasableBytesReference counted = new ReleasableBytesReference(bytesReference, () -> openTraces.remove(trace)); | ||
|
||
/* | ||
* Grab a reference to the bytes ref we're using, so we can close it after the | ||
* test to simulate the underlying netter butter closing after the test. | ||
*/ | ||
opened.add(counted); | ||
return counted.streamInput(); | ||
} | ||
|
||
public void testBigIntArrayLivesAfterReleasableIsDecremented() throws IOException { | ||
IntArray testData = BigArrays.NON_RECYCLING_INSTANCE.newIntArray(1, false); | ||
testData.set(0, 1); | ||
|
||
BytesStreamOutput out = new BytesStreamOutput(); | ||
testData.writeTo(out); | ||
|
||
ReleasableBytesReference ref = ReleasableBytesReference.wrap(out.bytes()); | ||
|
||
try (IntArray in = IntArray.readFrom(ref.streamInput())) { | ||
ref.decRef(); | ||
assertThat(ref.hasReferences(), equalTo(true)); | ||
|
||
assertThat(in.size(), equalTo(testData.size())); | ||
assertThat(in.get(0), equalTo(1)); | ||
} | ||
assertThat(ref.hasReferences(), equalTo(false)); | ||
} | ||
|
||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters