Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Close streams fetched from data-providers #18552

Merged
merged 6 commits into from
Jan 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -610,8 +610,10 @@ public T getItem(int index) {
* because the backend can have the item on that index (we simply
* not yet fetched this item during the scrolling).
*/
return (T) getDataProvider().fetch(buildQuery(index, 1)).findFirst()
.orElse(null);
try (Stream<T> stream = getDataProvider()
.fetch(buildQuery(index, 1))) {
return stream.findFirst().orElse(null);
}
}
}

Expand Down Expand Up @@ -1010,18 +1012,20 @@ protected Stream<T> fetchFromProvider(int offset, int limit) {
int page = 0;
do {
final int newOffset = offset + page * pageSize;
Stream<T> dataProviderStream = doFetchFromDataProvider(
newOffset, pageSize);
// Stream.Builder is not thread safe, so for parallel stream
// we need to first collect items before adding them
if (dataProviderStream.isParallel()) {
getLogger().debug(
"Data provider {} has returned parallel stream on 'fetch' call",
getDataProvider().getClass());
dataProviderStream.collect(Collectors.toList())
.forEach(addItemAndCheckConsumer);
} else {
dataProviderStream.forEach(addItemAndCheckConsumer);
try (Stream<T> dataProviderStream = doFetchFromDataProvider(
newOffset, pageSize)) {
// Stream.Builder is not thread safe, so for parallel
// stream we need to first collect items before adding
// them
if (dataProviderStream.isParallel()) {
getLogger().debug(
"Data provider {} has returned parallel stream on 'fetch' call",
getDataProvider().getClass());
dataProviderStream.collect(Collectors.toList())
.forEach(addItemAndCheckConsumer);
} else {
dataProviderStream.forEach(addItemAndCheckConsumer);
}
}
page++;
} while (page < pages
Expand All @@ -1040,8 +1044,10 @@ protected Stream<T> fetchFromProvider(int offset, int limit) {
getLogger().debug(
"Data provider {} has returned parallel stream on 'fetch' call",
getDataProvider().getClass());
stream = stream.collect(Collectors.toList()).stream();
assert !stream.isParallel();
try (Stream<T> parallelStream = stream) {
stream = parallelStream.collect(Collectors.toList()).stream();
assert !stream.isParallel();
}
}

SizeVerifier verifier = new SizeVerifier<>(limit);
Expand Down Expand Up @@ -1476,17 +1482,20 @@ private Activation activate(Range range) {

// XXX Explicitly refresh anything that is updated
List<String> activeKeys = new ArrayList<>(range.length());
fetchFromProvider(range.getStart(), range.length()).forEach(bean -> {
boolean mapperHasKey = keyMapper.has(bean);
String key = keyMapper.key(bean);
if (mapperHasKey) {
// Ensure latest instance from provider is used
keyMapper.refresh(bean);
passivatedByUpdate.values().stream()
.forEach(set -> set.remove(key));
}
activeKeys.add(key);
});
try (Stream<T> stream = fetchFromProvider(range.getStart(),
range.length())) {
stream.forEach(bean -> {
boolean mapperHasKey = keyMapper.has(bean);
String key = keyMapper.key(bean);
if (mapperHasKey) {
// Ensure latest instance from provider is used
keyMapper.refresh(bean);
passivatedByUpdate.values().stream()
.forEach(set -> set.remove(key));
}
activeKeys.add(key);
});
}
boolean needsSizeRecheck = activeKeys.size() < range.length();
return new Activation(activeKeys, needsSizeRecheck);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,22 @@ public interface DataView<T> extends Serializable {
/**
* Get the full data available to the component. Data is filtered and sorted
* the same way as in the component.
* <p>
* Consumers of the returned stream are responsible for closing it when all
* the stream operations are done to ensure that any resources feeding the
* stream are properly released. Failure to close the stream might lead to
* resource leaks.
* <p>
* It is strongly recommended to use a try-with-resources block to
* automatically close the stream after its terminal operation has been
* executed. Below is an example of how to properly use and close the
* stream:
*
* <pre>{@code
* try (Stream<T> stream = dataView.getItems()) {
* stream.forEach(System.out::println); // Example terminal operation
* }
* }</pre>
*
* @return filtered and sorted data set
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,16 +255,18 @@ private List<String> activate(Range range) {
// XXX Explicitly refresh anything that is updated
List<String> activeKeys = new ArrayList<>(range.length());

fetchItems.apply(parentKey, range).forEach(bean -> {
boolean mapperHasKey = keyMapper.has(bean);
String key = keyMapper.key(bean);
if (mapperHasKey) {
// Ensure latest instance from provider is used
keyMapper.refresh(bean);
passivatedByUpdate.values().forEach(set -> set.remove(key));
}
activeKeys.add(key);
});
try (Stream<T> stream = fetchItems.apply(parentKey, range)) {
stream.forEach(bean -> {
boolean mapperHasKey = keyMapper.has(bean);
String key = keyMapper.key(bean);
if (mapperHasKey) {
// Ensure latest instance from provider is used
keyMapper.refresh(bean);
passivatedByUpdate.values().forEach(set -> set.remove(key));
}
activeKeys.add(key);
});
}
return activeKeys;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -534,8 +534,9 @@ private Stream<T> getFlatChildrenStream(T parent) {
private Stream<T> getFlatChildrenStream(T parent, boolean includeParent) {
List<T> childList = Collections.emptyList();
if (isExpanded(parent)) {
childList = doFetchDirectChildren(parent)
.collect(Collectors.toList());
try (Stream<T> stream = doFetchDirectChildren(parent)) {
childList = stream.collect(Collectors.toList());
}
if (childList.isEmpty()) {
removeChildren(parent == null ? null
: getDataProvider().getId(parent));
Expand Down Expand Up @@ -563,8 +564,9 @@ private Stream<T> getChildrenStream(T parent, Range range,
boolean includeParent) {
List<T> childList = Collections.emptyList();
if (isExpanded(parent)) {
childList = doFetchDirectChildren(parent, range)
.collect(Collectors.toList());
try (Stream<T> stream = doFetchDirectChildren(parent, range)) {
childList = stream.collect(Collectors.toList());
}
if (childList.isEmpty()) {
removeChildren(parent == null ? null
: getDataProvider().getId(parent));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -996,6 +996,18 @@ public void getItem_withUndefinedSizeAndSorting() {
dataCommunicator.getItem(2));
}

@Test
public void getItem_streamIsClosed() {
AtomicBoolean streamIsClosed = new AtomicBoolean();
dataCommunicator.setDataProvider(createDataProvider(streamIsClosed),
null);

fakeClientCommunication();
dataCommunicator.getItem(0);

Assert.assertTrue(streamIsClosed.get());
}

@Test
public void itemCountEstimateAndStep_defaults() {
Assert.assertEquals(dataCommunicator.getItemCountEstimate(),
Expand Down Expand Up @@ -1353,6 +1365,18 @@ public void fetchFromProvider_itemCountLessThanTwoPages_correctItemsReturned() {

}

@Test
public void fetchFromProvider_streamIsClosed() {
AtomicBoolean streamIsClosed = new AtomicBoolean();
dataCommunicator.setDataProvider(createDataProvider(streamIsClosed),
null);
dataCommunicator.setRequestedRange(0, 50);

fakeClientCommunication();

Assert.assertTrue(streamIsClosed.get());
}

@Test
public void fetchEnabled_getItemCount_stillReturnsItemsCount() {
dataCommunicator.setFetchEnabled(false);
Expand Down Expand Up @@ -1737,6 +1761,11 @@ public Stream<Item> fetch(Query<Item, Object> query) {
}

private AbstractDataProvider<Item, Object> createDataProvider() {
return createDataProvider(new AtomicBoolean());
}

private AbstractDataProvider<Item, Object> createDataProvider(
AtomicBoolean streamIsClosed) {
return new AbstractDataProvider<Item, Object>() {
@Override
public boolean isInMemory() {
Expand All @@ -1752,7 +1781,8 @@ public int size(Query<Item, Object> query) {
public Stream<Item> fetch(Query<Item, Object> query) {
return asParallelIfRequired(IntStream.range(query.getOffset(),
query.getLimit() + query.getOffset()))
.mapToObj(Item::new);
.mapToObj(Item::new)
.onClose(() -> streamIsClosed.set(true));
}
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.IntStream;
import java.util.stream.Stream;

import org.junit.Assert;
import org.junit.Before;
Expand Down Expand Up @@ -221,6 +223,33 @@ public void uniqueKeyProviderIsNotSet_keysGeneratedByKeyMapper() {
communicator.getKeyMapper().get(i)));
}

@Test
public void expandRoot_streamIsClosed() {
AtomicBoolean streamIsClosed = new AtomicBoolean();

dataProvider = new TreeDataProvider<>(treeData) {

@Override
public Stream<Item> fetchChildren(
HierarchicalQuery<Item, SerializablePredicate<Item>> query) {
return super.fetchChildren(query)
.onClose(() -> streamIsClosed.set(true));
}
};

communicator.setDataProvider(dataProvider, null);

communicator.expand(ROOT);
fakeClientCommunication();

communicator.setParentRequestedRange(0, 50, ROOT);
fakeClientCommunication();

communicator.reset();

Assert.assertTrue(streamIsClosed.get());
}

@Test
public void expandRoot_filterOutAllChildren_clearCalled() {
parentClearCalled = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
Expand Down Expand Up @@ -288,6 +289,42 @@ public void getExpandedItems_tryToAddItemsToCollection_shouldThrowException() {
expandedItems.add(new TreeNode("third-1"));
}

@Test
public void fetchHierarchyItems_streamIsClosed() {
AtomicBoolean streamIsClosed = new AtomicBoolean();
mapper = new HierarchyMapper<>(new TreeDataProvider<>(data) {
@Override
public Stream<Node> fetchChildren(
HierarchicalQuery<Node, SerializablePredicate<Node>> query) {
return super.fetchChildren(query)
.onClose(() -> streamIsClosed.set(true));
}
});
Node rootNode = testData.get(0);
mapper.expand(rootNode);
mapper.fetchHierarchyItems(rootNode, Range.between(0, 10)).count();

Assert.assertTrue(streamIsClosed.get());
}

@Test
public void fetchChildItems_streamIsClosed() {
AtomicBoolean streamIsClosed = new AtomicBoolean();
mapper = new HierarchyMapper<>(new TreeDataProvider<>(data) {
@Override
public Stream<Node> fetchChildren(
HierarchicalQuery<Node, SerializablePredicate<Node>> query) {
return super.fetchChildren(query)
.onClose(() -> streamIsClosed.set(true));
}
});
Node rootNode = testData.get(0);
mapper.expand(rootNode);
mapper.fetchChildItems(rootNode, Range.between(0, 10));

Assert.assertTrue(streamIsClosed.get());
}

private void expand(Node node) {
insertRows(mapper.expand(node, mapper.getIndexOf(node).orElse(null)));
}
Expand Down
Loading