-
Notifications
You must be signed in to change notification settings - Fork 513
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix file batch operation #5519
Fix file batch operation #5519
Conversation
Iterator<OutputT> i1 = outputs.iterator(); | ||
Iterator<Element> i2 = batch.iterator(); | ||
while (i1.hasNext() && i2.hasNext()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This parts assumes that List<Output>
is 1:1 mapping of List<URI>
@@ -104,7 +106,12 @@ public Path download(URI src) { | |||
*/ | |||
public List<Path> download(List<URI> srcs) { | |||
try { | |||
return paths.getAll(srcs).values().asList(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
getAll(srcs)
returns a Map<URI, Path>
. Using values
give no guarantee that returned Paths
are ordered.
if (batch.isEmpty()) { | ||
return; | ||
} | ||
LOG.info("Processing batch of {}", batch.size()); | ||
List<URI> uris = batch.stream().map(e -> e.uri).collect(Collectors.toList()); | ||
remoteFileUtil.download(uris).stream().map(fn::apply).forEach(outputReceiver::output); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
original element metadata are not passed back
dd49ef0
to
15a8539
Compare
val paths = p.map { case ((_, f), _) => f }.distinct | ||
|
||
val expected = (1L to 100L).map(i => (i.toString, i)) | ||
contentAndTimestamp should containInAnyOrder(expected) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ran on the original code, got non expected item (1, 10)
-> (1st element was flushed while processing the 10th)
RemoteFileUtil did not preserve element ordering wich was assumed in the FileDownloadDoFn Pane information was not propagated
15a8539
to
fac16a0
Compare
Codecov ReportAll modified and coverable lines are covered by tests ✅
Additional details and impacted files@@ Coverage Diff @@
## main #5519 +/- ##
==========================================
- Coverage 61.43% 61.42% -0.01%
==========================================
Files 312 312
Lines 11103 11103
Branches 762 762
==========================================
- Hits 6821 6820 -1
- Misses 4282 4283 +1 ☔ View full report in Codecov by Sentry. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch!
RemoteFileUtil did not preserve element ordering wich was assumed in the FileDownloadDoFn
Pane information was not propagated