diff --git a/flow-data/src/main/java/com/vaadin/flow/data/provider/DataCommunicator.java b/flow-data/src/main/java/com/vaadin/flow/data/provider/DataCommunicator.java index e6d78a9231c..b29d3b13fb4 100644 --- a/flow-data/src/main/java/com/vaadin/flow/data/provider/DataCommunicator.java +++ b/flow-data/src/main/java/com/vaadin/flow/data/provider/DataCommunicator.java @@ -610,8 +610,10 @@ public T getItem(int index) { * because the backend can have the item on that index (we simply * not yet fetched this item during the scrolling). */ - return (T) getDataProvider().fetch(buildQuery(index, 1)).findFirst() - .orElse(null); + try (Stream stream = getDataProvider() + .fetch(buildQuery(index, 1))) { + return stream.findFirst().orElse(null); + } } } @@ -1010,18 +1012,20 @@ protected Stream fetchFromProvider(int offset, int limit) { int page = 0; do { final int newOffset = offset + page * pageSize; - Stream dataProviderStream = doFetchFromDataProvider( - newOffset, pageSize); - // Stream.Builder is not thread safe, so for parallel stream - // we need to first collect items before adding them - if (dataProviderStream.isParallel()) { - getLogger().debug( - "Data provider {} has returned parallel stream on 'fetch' call", - getDataProvider().getClass()); - dataProviderStream.collect(Collectors.toList()) - .forEach(addItemAndCheckConsumer); - } else { - dataProviderStream.forEach(addItemAndCheckConsumer); + try (Stream dataProviderStream = doFetchFromDataProvider( + newOffset, pageSize)) { + // Stream.Builder is not thread safe, so for parallel + // stream we need to first collect items before adding + // them + if (dataProviderStream.isParallel()) { + getLogger().debug( + "Data provider {} has returned parallel stream on 'fetch' call", + getDataProvider().getClass()); + dataProviderStream.collect(Collectors.toList()) + .forEach(addItemAndCheckConsumer); + } else { + dataProviderStream.forEach(addItemAndCheckConsumer); + } } page++; } while (page < pages @@ -1040,8 +1044,10 @@ protected Stream fetchFromProvider(int offset, int limit) { getLogger().debug( "Data provider {} has returned parallel stream on 'fetch' call", getDataProvider().getClass()); - stream = stream.collect(Collectors.toList()).stream(); - assert !stream.isParallel(); + try (Stream parallelStream = stream) { + stream = parallelStream.collect(Collectors.toList()).stream(); + assert !stream.isParallel(); + } } SizeVerifier verifier = new SizeVerifier<>(limit); @@ -1476,17 +1482,20 @@ private Activation activate(Range range) { // XXX Explicitly refresh anything that is updated List activeKeys = new ArrayList<>(range.length()); - fetchFromProvider(range.getStart(), range.length()).forEach(bean -> { - boolean mapperHasKey = keyMapper.has(bean); - String key = keyMapper.key(bean); - if (mapperHasKey) { - // Ensure latest instance from provider is used - keyMapper.refresh(bean); - passivatedByUpdate.values().stream() - .forEach(set -> set.remove(key)); - } - activeKeys.add(key); - }); + try (Stream stream = fetchFromProvider(range.getStart(), + range.length())) { + stream.forEach(bean -> { + boolean mapperHasKey = keyMapper.has(bean); + String key = keyMapper.key(bean); + if (mapperHasKey) { + // Ensure latest instance from provider is used + keyMapper.refresh(bean); + passivatedByUpdate.values().stream() + .forEach(set -> set.remove(key)); + } + activeKeys.add(key); + }); + } boolean needsSizeRecheck = activeKeys.size() < range.length(); return new Activation(activeKeys, needsSizeRecheck); } diff --git a/flow-data/src/main/java/com/vaadin/flow/data/provider/DataView.java b/flow-data/src/main/java/com/vaadin/flow/data/provider/DataView.java index a2bf6dfea90..b3a4cc9e04d 100644 --- a/flow-data/src/main/java/com/vaadin/flow/data/provider/DataView.java +++ b/flow-data/src/main/java/com/vaadin/flow/data/provider/DataView.java @@ -59,6 +59,22 @@ public interface DataView extends Serializable { /** * Get the full data available to the component. Data is filtered and sorted * the same way as in the component. + *

+ * Consumers of the returned stream are responsible for closing it when all + * the stream operations are done to ensure that any resources feeding the + * stream are properly released. Failure to close the stream might lead to + * resource leaks. + *

+ * It is strongly recommended to use a try-with-resources block to + * automatically close the stream after its terminal operation has been + * executed. Below is an example of how to properly use and close the + * stream: + * + *

{@code
+     * try (Stream stream = dataView.getItems()) {
+     *     stream.forEach(System.out::println); // Example terminal operation
+     * }
+     * }
* * @return filtered and sorted data set */ diff --git a/flow-data/src/main/java/com/vaadin/flow/data/provider/hierarchy/HierarchicalCommunicationController.java b/flow-data/src/main/java/com/vaadin/flow/data/provider/hierarchy/HierarchicalCommunicationController.java index 5e111a671f8..416cb98bb37 100644 --- a/flow-data/src/main/java/com/vaadin/flow/data/provider/hierarchy/HierarchicalCommunicationController.java +++ b/flow-data/src/main/java/com/vaadin/flow/data/provider/hierarchy/HierarchicalCommunicationController.java @@ -255,16 +255,18 @@ private List activate(Range range) { // XXX Explicitly refresh anything that is updated List activeKeys = new ArrayList<>(range.length()); - fetchItems.apply(parentKey, range).forEach(bean -> { - boolean mapperHasKey = keyMapper.has(bean); - String key = keyMapper.key(bean); - if (mapperHasKey) { - // Ensure latest instance from provider is used - keyMapper.refresh(bean); - passivatedByUpdate.values().forEach(set -> set.remove(key)); - } - activeKeys.add(key); - }); + try (Stream stream = fetchItems.apply(parentKey, range)) { + stream.forEach(bean -> { + boolean mapperHasKey = keyMapper.has(bean); + String key = keyMapper.key(bean); + if (mapperHasKey) { + // Ensure latest instance from provider is used + keyMapper.refresh(bean); + passivatedByUpdate.values().forEach(set -> set.remove(key)); + } + activeKeys.add(key); + }); + } return activeKeys; } diff --git a/flow-data/src/main/java/com/vaadin/flow/data/provider/hierarchy/HierarchyMapper.java b/flow-data/src/main/java/com/vaadin/flow/data/provider/hierarchy/HierarchyMapper.java index b47bd6fd9b8..ae8cae9a57a 100644 --- a/flow-data/src/main/java/com/vaadin/flow/data/provider/hierarchy/HierarchyMapper.java +++ b/flow-data/src/main/java/com/vaadin/flow/data/provider/hierarchy/HierarchyMapper.java @@ -534,8 +534,9 @@ private Stream getFlatChildrenStream(T parent) { private Stream getFlatChildrenStream(T parent, boolean includeParent) { List childList = Collections.emptyList(); if (isExpanded(parent)) { - childList = doFetchDirectChildren(parent) - .collect(Collectors.toList()); + try (Stream stream = doFetchDirectChildren(parent)) { + childList = stream.collect(Collectors.toList()); + } if (childList.isEmpty()) { removeChildren(parent == null ? null : getDataProvider().getId(parent)); @@ -563,8 +564,9 @@ private Stream getChildrenStream(T parent, Range range, boolean includeParent) { List childList = Collections.emptyList(); if (isExpanded(parent)) { - childList = doFetchDirectChildren(parent, range) - .collect(Collectors.toList()); + try (Stream stream = doFetchDirectChildren(parent, range)) { + childList = stream.collect(Collectors.toList()); + } if (childList.isEmpty()) { removeChildren(parent == null ? null : getDataProvider().getId(parent)); diff --git a/flow-data/src/test/java/com/vaadin/flow/data/provider/DataCommunicatorTest.java b/flow-data/src/test/java/com/vaadin/flow/data/provider/DataCommunicatorTest.java index c3a92b323f3..f6e575dc49e 100644 --- a/flow-data/src/test/java/com/vaadin/flow/data/provider/DataCommunicatorTest.java +++ b/flow-data/src/test/java/com/vaadin/flow/data/provider/DataCommunicatorTest.java @@ -996,6 +996,18 @@ public void getItem_withUndefinedSizeAndSorting() { dataCommunicator.getItem(2)); } + @Test + public void getItem_streamIsClosed() { + AtomicBoolean streamIsClosed = new AtomicBoolean(); + dataCommunicator.setDataProvider(createDataProvider(streamIsClosed), + null); + + fakeClientCommunication(); + dataCommunicator.getItem(0); + + Assert.assertTrue(streamIsClosed.get()); + } + @Test public void itemCountEstimateAndStep_defaults() { Assert.assertEquals(dataCommunicator.getItemCountEstimate(), @@ -1353,6 +1365,18 @@ public void fetchFromProvider_itemCountLessThanTwoPages_correctItemsReturned() { } + @Test + public void fetchFromProvider_streamIsClosed() { + AtomicBoolean streamIsClosed = new AtomicBoolean(); + dataCommunicator.setDataProvider(createDataProvider(streamIsClosed), + null); + dataCommunicator.setRequestedRange(0, 50); + + fakeClientCommunication(); + + Assert.assertTrue(streamIsClosed.get()); + } + @Test public void fetchEnabled_getItemCount_stillReturnsItemsCount() { dataCommunicator.setFetchEnabled(false); @@ -1737,6 +1761,11 @@ public Stream fetch(Query query) { } private AbstractDataProvider createDataProvider() { + return createDataProvider(new AtomicBoolean()); + } + + private AbstractDataProvider createDataProvider( + AtomicBoolean streamIsClosed) { return new AbstractDataProvider() { @Override public boolean isInMemory() { @@ -1752,7 +1781,8 @@ public int size(Query query) { public Stream fetch(Query query) { return asParallelIfRequired(IntStream.range(query.getOffset(), query.getLimit() + query.getOffset())) - .mapToObj(Item::new); + .mapToObj(Item::new) + .onClose(() -> streamIsClosed.set(true)); } }; } diff --git a/flow-data/src/test/java/com/vaadin/flow/data/provider/hierarchy/HierarchicalCommunicatorDataTest.java b/flow-data/src/test/java/com/vaadin/flow/data/provider/hierarchy/HierarchicalCommunicatorDataTest.java index 74142f79714..4274e2d2207 100644 --- a/flow-data/src/test/java/com/vaadin/flow/data/provider/hierarchy/HierarchicalCommunicatorDataTest.java +++ b/flow-data/src/test/java/com/vaadin/flow/data/provider/hierarchy/HierarchicalCommunicatorDataTest.java @@ -18,7 +18,9 @@ import java.io.Serializable; import java.util.Arrays; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.IntStream; +import java.util.stream.Stream; import org.junit.Assert; import org.junit.Before; @@ -221,6 +223,33 @@ public void uniqueKeyProviderIsNotSet_keysGeneratedByKeyMapper() { communicator.getKeyMapper().get(i))); } + @Test + public void expandRoot_streamIsClosed() { + AtomicBoolean streamIsClosed = new AtomicBoolean(); + + dataProvider = new TreeDataProvider<>(treeData) { + + @Override + public Stream fetchChildren( + HierarchicalQuery> query) { + return super.fetchChildren(query) + .onClose(() -> streamIsClosed.set(true)); + } + }; + + communicator.setDataProvider(dataProvider, null); + + communicator.expand(ROOT); + fakeClientCommunication(); + + communicator.setParentRequestedRange(0, 50, ROOT); + fakeClientCommunication(); + + communicator.reset(); + + Assert.assertTrue(streamIsClosed.get()); + } + @Test public void expandRoot_filterOutAllChildren_clearCalled() { parentClearCalled = false; diff --git a/flow-data/src/test/java/com/vaadin/flow/data/provider/hierarchy/HierarchyMapperWithDataTest.java b/flow-data/src/test/java/com/vaadin/flow/data/provider/hierarchy/HierarchyMapperWithDataTest.java index 6da7a8e211e..fe31e5ee0db 100644 --- a/flow-data/src/test/java/com/vaadin/flow/data/provider/hierarchy/HierarchyMapperWithDataTest.java +++ b/flow-data/src/test/java/com/vaadin/flow/data/provider/hierarchy/HierarchyMapperWithDataTest.java @@ -21,6 +21,7 @@ import java.util.Comparator; import java.util.List; import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -288,6 +289,42 @@ public void getExpandedItems_tryToAddItemsToCollection_shouldThrowException() { expandedItems.add(new TreeNode("third-1")); } + @Test + public void fetchHierarchyItems_streamIsClosed() { + AtomicBoolean streamIsClosed = new AtomicBoolean(); + mapper = new HierarchyMapper<>(new TreeDataProvider<>(data) { + @Override + public Stream fetchChildren( + HierarchicalQuery> query) { + return super.fetchChildren(query) + .onClose(() -> streamIsClosed.set(true)); + } + }); + Node rootNode = testData.get(0); + mapper.expand(rootNode); + mapper.fetchHierarchyItems(rootNode, Range.between(0, 10)).count(); + + Assert.assertTrue(streamIsClosed.get()); + } + + @Test + public void fetchChildItems_streamIsClosed() { + AtomicBoolean streamIsClosed = new AtomicBoolean(); + mapper = new HierarchyMapper<>(new TreeDataProvider<>(data) { + @Override + public Stream fetchChildren( + HierarchicalQuery> query) { + return super.fetchChildren(query) + .onClose(() -> streamIsClosed.set(true)); + } + }); + Node rootNode = testData.get(0); + mapper.expand(rootNode); + mapper.fetchChildItems(rootNode, Range.between(0, 10)); + + Assert.assertTrue(streamIsClosed.get()); + } + private void expand(Node node) { insertRows(mapper.expand(node, mapper.getIndexOf(node).orElse(null))); }