-
Notifications
You must be signed in to change notification settings - Fork 468
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
storage: reclocking with better asymptotic performance #25720
Conversation
727cb7b
to
e733235
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice!
4e9b2f6
to
61041c9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it possible to easily test this from outside of cargo-test? I'd like to have a testdrive-based reproducer too, and am willing to write one for this PR if you can tell me an easy way to reproduce the issue.
What kind of test do you have in mind? Load a lot of bindings and then put an upper bound to on the execution time? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I marked several places where the code would benefit from clearer exposition. I have some further thoughts:
- I can't tell what the goals of the code are. My understanding is that we have historically had some sub-optimal asymptotics and this bites us. What properties does this implementation have? Are there asymptotics that we don't care about here? I marked a few places where there are sub-optimal asymptotics that we might discover next and could get a head start on now.
- The
Replayer
state machine confuses me. It may be doing a smart thing, but it doesn't line up with how I understand the remapping task advancing forward as it receives new information. For example, it seems to not maintain MVCC remappings, and reports on one time at a time; this seems limiting in that the data frontier not advancing holds back the release of definitively reclocked updates, which seems like it could be a problem at snapshot time (vs shipping the reclocked updates and letting the downstream operators deal with consolidating the results).
If this needs to go out promptly to fix a current fire don't let me stand in the way, but if we want to get to a high-confidence implementation that doesn't surprise us, I think there is more work we can do.
src/timely-util/src/reclock.rs
Outdated
let (mut output, reclocked) = builder.new_output(); | ||
|
||
builder.build(move |caps| { | ||
let [cap]: [_; 1] = caps.try_into().expect("one output"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No clue what this is doing, and the syntax seems unnecessarily abstruse.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a habit of mine that allows you extract the capabilities in output order and assert that you didn't forget any in one statement. If you had 2 outputs you'd have to write something like:
let cap_output2 = caps.pop().unwrap(); // <- notice opposite order of outputs
let cap_output1 = caps.pop().unwrap(); // <- notice opposite order of outputs
assert!(caps.is_empty());
This syntax allows you to write:
let [cap_output1, cap_output2]: [_; 2] = caps.try_into().expect("two outputs");
I switched to caps.into_element()
for this particular case
src/timely-util/src/reclock.rs
Outdated
let channel_id = scope.new_identifier(); | ||
let (pusher, mut puller) = | ||
scope.pipeline::<Event<FromTime, (D1, FromTime, R)>>(channel_id, &info.address); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These look very suspicious, and I can't guess what they are for. A great opportunity to either comment, or to delete them.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've read further, and it seems like you are using this in place of a more vanilla MPSC queue. This needs more explanation, as it is not something I've seen done nor done myself (pretty sure), and involving timely in (seemingly) random queues .. yeah, please write more here or just use an MPSC queue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added more explanation. This registers the channel with the operator address which activates the operator on arrival of new data and manages all the details. This is very similar to getting an InputHandle
and supplying data into a dataflow from outside the worker. Is it a problem to allocate a channel for the operator's address?
src/timely-util/src/reclock.rs
Outdated
builder.build(move |caps| { | ||
let [cap]: [_; 1] = caps.try_into().expect("one output"); | ||
|
||
// Remap updates beyond the upper |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think by upper
you meant "input frontier"? If that's right, I recommend that term instead!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason not to call it pending_remap_updates
or something, rather than accepted_bindings
which seems (at this point in reading) to have not so much to do with the concepts / goals introduced so far?
src/timely-util/src/reclock.rs
Outdated
}); | ||
// If we won't receive any more data for this binding we can go to the next one | ||
if PartialOrder::less_equal(&frontier, &source_upper.frontier()) { | ||
replayer.step(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should find a better name than step
! :D Is this the moment at which some sort of compaction is allowed to happen in the bindings? Surely that must communicate something about the frontier to replayer
?
src/timely-util/src/reclock.rs
Outdated
/// The upper frontier of bindings received so far. An `Option<Capability>` is sufficient to | ||
/// describe a frontier because `IntoTime` is required to be totally ordered. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure what "upper frontier" means here, and my most literal reading is that it is the maximum antichain of update times received so far: like, if we receive some weird update at 2077, it would be 2077 until we learn about an even greater time. But I bet it is not actually this.
src/timely-util/src/reclock.rs
Outdated
/// An option representing whether an active binding exists. The option carries the capability | ||
/// that should be used to send all the source data corresponding to its time. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No idea what an "active binding" is at this point.
src/timely-util/src/reclock.rs
Outdated
self.bindings.extend(data.into_iter()); | ||
if self.active_binding.is_none() { | ||
self.active_binding = self.upper_capability.clone(); | ||
self.step(); | ||
} | ||
match (self.upper_capability.as_mut(), upper.as_option()) { | ||
(Some(cap), Some(time)) => cap.downgrade(time), | ||
(_, None) => self.upper_capability = None, | ||
(None, Some(_)) => unreachable!(), | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No idea what any of this is doing.
src/timely-util/src/reclock.rs
Outdated
} | ||
} | ||
|
||
/// Reveals the currently active binding and its accosiated `FromTime` frontier. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo "accosiated" but also what is an "active binding"?
c4533fa
to
a72016a
Compare
ce461c9
to
fb064cb
Compare
5820675
to
529af20
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think at this point you can go ahead and merge it. The main ask I have is that you record somewhere the potential concern that each scheduling can take time proportional to the number of elements in remap_trace
, because one must visit them linearly to determine how to remap any element. The alternate design we discussed, where one maintains a Vec<(Antichain<FromTime>, IntoTime)>
avoids this problem at the expense of recording Antichain<FromTime>
elements whose size concerned you enough to prefer the linear time algo to a logarithmic time algo.
src/timely-util/src/reclock.rs
Outdated
//! contains all updates `u ∈ S` that are not beyond some `FromTime` frontier fₜ. The collection | ||
//! `R` that prescribes `fₜ` for every `t` is called the remap collection. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit, but if we are doing notation might as well be consistent. The fₜ
notation has not been introduced, and probably can just be written R\[t\]
?
src/timely-util/src/reclock.rs
Outdated
let channel_id = scope.new_identifier(); | ||
// Here we create a channel that can be used to send data from a foreign scope into this | ||
// operator. The channel is associated with this operator's address so that it is activated | ||
// every time events are availalbe for consumption. This mechanism is similar to Timely's input |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo: "availalbe"
src/timely-util/src/reclock.rs
Outdated
let mut capset = CapabilitySet::from_elem(caps.into_element()); | ||
capset.downgrade(&as_of.borrow()); | ||
|
||
// Received updates that may yet be greater or equal to the `remap` input frontier. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Specifically, remap updates, I think? There are several updates, frontiers, etc. Perhaps instead "Received remap
updates at times into_time
greater or equal to remap_input
's input frontier".
let mut remap_upper = Antichain::from_elem(IntoTime::minimum()); | ||
let mut remap_since = as_of; | ||
let mut remap_trace = Vec::new(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These names make me think that perhaps you want an actual trace here. That's probably bouncing the code around more than it needs, but it's perhaps worth thinking about whether a trace would be a helpful way to record the multi-versioned remapping (and if not, perhaps leave a note about why not).
src/timely-util/src/reclock.rs
Outdated
let mut deferred_source_updates: Vec<ChainBatch<_, _, _>> = Vec::new(); | ||
let mut source_frontier = MutableAntichain::new_bottom(FromTime::minimum()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some comments or names would be great here!
src/timely-util/src/reclock.rs
Outdated
} | ||
|
||
// STEP 6. Tidy up deferred updates | ||
// Deferred updates are represented as a list chain batches where each batch |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably meant to be "list of chain batches"?
src/timely-util/src/reclock.rs
Outdated
if !new_source_updates.is_empty() { | ||
deferred_source_updates.push(new_source_updates); | ||
} | ||
loop { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See above about loop {
and my opinions on it. :D
src/timely-util/src/reclock.rs
Outdated
} | ||
|
||
/// A batch of differential updates that vary over some partial order. This type maintains the data | ||
/// as a set of disjoint chains that allows for efficient extraction of batches given a frontier. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure what "disjoint" implies here. There can be overlap in the chains, like the same update could be in multiple chains. Perhaps just remove the word?
src/timely-util/src/reclock.rs
Outdated
{ | ||
r1.plus_equals(&r); | ||
} | ||
Some((d1, t1, r1)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you like, you could also check here to see if r1.is_zero()
and drop the result if so. I'm not sure if this applies to any of our source types though (though similarly, I don't know when we'd have two updates at the same FromTime
with our existing sources).
while let Some((_, _, r)) = | ||
updates1.next_if(|(d, t, _)| (d, t) == (&d1, &t1)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This construction suggests that chains are not necessarily consolidated. Should that be made to be the case? Fine if not, of course, but the more structure that exists the better!
Implement a new reclock operator which has asymptotically better performance than the existing one by taking advantage of a new technique and a new assumption. The technique exploited by this implementation is maintaining the pile of data waiting to be reclocked in runs of updates that are mutually comparable in their partial order. Maintaining those runs (called chains[1]) allows for very efficient extraction of new batches of data as we can leverage binary search within each chain. Furthermore, for all the concrete timestamp types used for reclocking the linear extension order implemented by `Ord` leads to an optimal number of chains for a given subset. The new assumption exploited by this implementation is assuming that the timestamp we're reclocking onto is totally ordered. This is by no means a requirement but mostly a result of the time pressure to release this to fix #22128. The implementation of reclocking onto a `Lattice` would result in much more code that would have to review in short notice so it is left as future work. [1] https://en.wikipedia.org/wiki/Total_order#Chains Signed-off-by: Petros Angelatos <petrosagg@gmail.com>
529af20
to
db7a352
Compare
@frankmcsherry thanks for the review! I fixed all your comments and replaced both |
Motivation
Implement a new reclock operator which has asymptotically better performance than the existing one by taking advantage of a new technique and a new assumption.
The technique exploited by this implementation is maintaining the pile of data waiting to be reclocked in runs of updates that are mutually comparable in their partial order. Maintaining those runs (called chains[1]) allows for very efficient extraction of new batches of data as we can leverage binary search within each chain. Furthermore, for all the concrete timestamp types used for reclocking the linear extension order implemented by
Ord
leads to an optimal number of chains for a given subset.The new assumption exploited by this implementation is assuming that the timestamp we're reclocking onto is totally ordered. This is by no means a requirement but mostly a result of the time pressure to release this to fix MaterializeInc/incidents-and-escalations#55. The implementation of reclocking onto a
Lattice
would result in much more code that would have to review in short notice so it is left as future work.[1] https://en.wikipedia.org/wiki/Total_order#Chains
Tips for reviewer
Checklist
$T ⇔ Proto$T
mapping (possibly in a backwards-incompatible way), then it is tagged with aT-proto
label.