Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: reclocking with better asymptotic performance #25720

Merged
merged 1 commit into from
May 10, 2024

Conversation

petrosagg
Copy link
Contributor

@petrosagg petrosagg commented Mar 4, 2024

Motivation

Implement a new reclock operator which has asymptotically better performance than the existing one by taking advantage of a new technique and a new assumption.

The technique exploited by this implementation is maintaining the pile of data waiting to be reclocked in runs of updates that are mutually comparable in their partial order. Maintaining those runs (called chains[1]) allows for very efficient extraction of new batches of data as we can leverage binary search within each chain. Furthermore, for all the concrete timestamp types used for reclocking the linear extension order implemented by Ord leads to an optimal number of chains for a given subset.

The new assumption exploited by this implementation is assuming that the timestamp we're reclocking onto is totally ordered. This is by no means a requirement but mostly a result of the time pressure to release this to fix MaterializeInc/incidents-and-escalations#55. The implementation of reclocking onto a Lattice would result in much more code that would have to review in short notice so it is left as future work.

[1] https://en.wikipedia.org/wiki/Total_order#Chains

Tips for reviewer

Checklist

  • This PR has adequate test coverage / QA involvement has been duly considered.
  • This PR has an associated up-to-date design doc, is a design doc (template), or is sufficiently small to not require a design.
  • If this PR evolves an existing $T ⇔ Proto$T mapping (possibly in a backwards-incompatible way), then it is tagged with a T-proto label.
  • If this PR will require changes to cloud orchestration or tests, there is a companion cloud PR to account for those changes that is tagged with the release-blocker label (example).
  • This PR includes the following user-facing behavior changes:

@petrosagg petrosagg force-pushed the faster-reclocking branch 7 times, most recently from 727cb7b to e733235 Compare March 5, 2024 16:08
@petrosagg petrosagg marked this pull request as ready for review March 5, 2024 16:08
@petrosagg petrosagg requested review from a team and frankmcsherry March 5, 2024 16:08
Copy link
Member

@antiguru antiguru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice!

src/timely-util/src/reclock.rs Outdated Show resolved Hide resolved
src/timely-util/src/reclock.rs Outdated Show resolved Hide resolved
Copy link
Contributor

@def- def- left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to easily test this from outside of cargo-test? I'd like to have a testdrive-based reproducer too, and am willing to write one for this PR if you can tell me an easy way to reproduce the issue.

@petrosagg
Copy link
Contributor Author

Is it possible to easily test this from outside of cargo-test?

What kind of test do you have in mind? Load a lot of bindings and then put an upper bound to on the execution time?

Copy link
Contributor

@frankmcsherry frankmcsherry left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I marked several places where the code would benefit from clearer exposition. I have some further thoughts:

  1. I can't tell what the goals of the code are. My understanding is that we have historically had some sub-optimal asymptotics and this bites us. What properties does this implementation have? Are there asymptotics that we don't care about here? I marked a few places where there are sub-optimal asymptotics that we might discover next and could get a head start on now.
  2. The Replayer state machine confuses me. It may be doing a smart thing, but it doesn't line up with how I understand the remapping task advancing forward as it receives new information. For example, it seems to not maintain MVCC remappings, and reports on one time at a time; this seems limiting in that the data frontier not advancing holds back the release of definitively reclocked updates, which seems like it could be a problem at snapshot time (vs shipping the reclocked updates and letting the downstream operators deal with consolidating the results).

If this needs to go out promptly to fix a current fire don't let me stand in the way, but if we want to get to a high-confidence implementation that doesn't surprise us, I think there is more work we can do.

src/timely-util/src/reclock.rs Outdated Show resolved Hide resolved
src/timely-util/src/reclock.rs Show resolved Hide resolved
let (mut output, reclocked) = builder.new_output();

builder.build(move |caps| {
let [cap]: [_; 1] = caps.try_into().expect("one output");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No clue what this is doing, and the syntax seems unnecessarily abstruse.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a habit of mine that allows you extract the capabilities in output order and assert that you didn't forget any in one statement. If you had 2 outputs you'd have to write something like:

let cap_output2 = caps.pop().unwrap(); // <- notice opposite order of outputs
let cap_output1 = caps.pop().unwrap(); // <- notice opposite order of outputs
assert!(caps.is_empty());

This syntax allows you to write:

let [cap_output1, cap_output2]: [_; 2] = caps.try_into().expect("two outputs");

I switched to caps.into_element() for this particular case

Comment on lines 81 to 83
let channel_id = scope.new_identifier();
let (pusher, mut puller) =
scope.pipeline::<Event<FromTime, (D1, FromTime, R)>>(channel_id, &info.address);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These look very suspicious, and I can't guess what they are for. A great opportunity to either comment, or to delete them.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've read further, and it seems like you are using this in place of a more vanilla MPSC queue. This needs more explanation, as it is not something I've seen done nor done myself (pretty sure), and involving timely in (seemingly) random queues .. yeah, please write more here or just use an MPSC queue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added more explanation. This registers the channel with the operator address which activates the operator on arrival of new data and manages all the details. This is very similar to getting an InputHandle and supplying data into a dataflow from outside the worker. Is it a problem to allocate a channel for the operator's address?

builder.build(move |caps| {
let [cap]: [_; 1] = caps.try_into().expect("one output");

// Remap updates beyond the upper
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think by upper you meant "input frontier"? If that's right, I recommend that term instead!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason not to call it pending_remap_updates or something, rather than accepted_bindings which seems (at this point in reading) to have not so much to do with the concepts / goals introduced so far?

});
// If we won't receive any more data for this binding we can go to the next one
if PartialOrder::less_equal(&frontier, &source_upper.frontier()) {
replayer.step();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should find a better name than step! :D Is this the moment at which some sort of compaction is allowed to happen in the bindings? Surely that must communicate something about the frontier to replayer?

Comment on lines 215 to 216
/// The upper frontier of bindings received so far. An `Option<Capability>` is sufficient to
/// describe a frontier because `IntoTime` is required to be totally ordered.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what "upper frontier" means here, and my most literal reading is that it is the maximum antichain of update times received so far: like, if we receive some weird update at 2077, it would be 2077 until we learn about an even greater time. But I bet it is not actually this.

Comment on lines 220 to 221
/// An option representing whether an active binding exists. The option carries the capability
/// that should be used to send all the source data corresponding to its time.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No idea what an "active binding" is at this point.

Comment on lines 271 to 280
self.bindings.extend(data.into_iter());
if self.active_binding.is_none() {
self.active_binding = self.upper_capability.clone();
self.step();
}
match (self.upper_capability.as_mut(), upper.as_option()) {
(Some(cap), Some(time)) => cap.downgrade(time),
(_, None) => self.upper_capability = None,
(None, Some(_)) => unreachable!(),
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No idea what any of this is doing.

}
}

/// Reveals the currently active binding and its accosiated `FromTime` frontier.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo "accosiated" but also what is an "active binding"?

@guswynn guswynn added the release-blocker Critical issue that should block *any* release if not fixed label Mar 7, 2024
@petrosagg petrosagg removed the release-blocker Critical issue that should block *any* release if not fixed label Mar 14, 2024
@petrosagg petrosagg force-pushed the faster-reclocking branch 2 times, most recently from c4533fa to a72016a Compare March 14, 2024 17:35
src/timely-util/src/reclock.rs Show resolved Hide resolved
src/timely-util/src/reclock.rs Show resolved Hide resolved
src/timely-util/src/reclock.rs Show resolved Hide resolved
src/timely-util/src/reclock.rs Show resolved Hide resolved
Copy link
Contributor

@frankmcsherry frankmcsherry left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think at this point you can go ahead and merge it. The main ask I have is that you record somewhere the potential concern that each scheduling can take time proportional to the number of elements in remap_trace, because one must visit them linearly to determine how to remap any element. The alternate design we discussed, where one maintains a Vec<(Antichain<FromTime>, IntoTime)> avoids this problem at the expense of recording Antichain<FromTime> elements whose size concerned you enough to prefer the linear time algo to a logarithmic time algo.

Comment on lines 29 to 30
//! contains all updates `u ∈ S` that are not beyond some `FromTime` frontier fₜ. The collection
//! `R` that prescribes `fₜ` for every `t` is called the remap collection.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit, but if we are doing notation might as well be consistent. The fₜ notation has not been introduced, and probably can just be written R\[t\]?

let channel_id = scope.new_identifier();
// Here we create a channel that can be used to send data from a foreign scope into this
// operator. The channel is associated with this operator's address so that it is activated
// every time events are availalbe for consumption. This mechanism is similar to Timely's input
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo: "availalbe"

let mut capset = CapabilitySet::from_elem(caps.into_element());
capset.downgrade(&as_of.borrow());

// Received updates that may yet be greater or equal to the `remap` input frontier.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Specifically, remap updates, I think? There are several updates, frontiers, etc. Perhaps instead "Received remap updates at times into_time greater or equal to remap_input's input frontier".

Comment on lines +235 to +242
let mut remap_upper = Antichain::from_elem(IntoTime::minimum());
let mut remap_since = as_of;
let mut remap_trace = Vec::new();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These names make me think that perhaps you want an actual trace here. That's probably bouncing the code around more than it needs, but it's perhaps worth thinking about whether a trace would be a helpful way to record the multi-versioned remapping (and if not, perhaps leave a note about why not).

Comment on lines 239 to 247
let mut deferred_source_updates: Vec<ChainBatch<_, _, _>> = Vec::new();
let mut source_frontier = MutableAntichain::new_bottom(FromTime::minimum());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some comments or names would be great here!

}

// STEP 6. Tidy up deferred updates
// Deferred updates are represented as a list chain batches where each batch
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably meant to be "list of chain batches"?

if !new_source_updates.is_empty() {
deferred_source_updates.push(new_source_updates);
}
loop {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above about loop { and my opinions on it. :D

}

/// A batch of differential updates that vary over some partial order. This type maintains the data
/// as a set of disjoint chains that allows for efficient extraction of batches given a frontier.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what "disjoint" implies here. There can be overlap in the chains, like the same update could be in multiple chains. Perhaps just remove the word?

{
r1.plus_equals(&r);
}
Some((d1, t1, r1))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you like, you could also check here to see if r1.is_zero() and drop the result if so. I'm not sure if this applies to any of our source types though (though similarly, I don't know when we'd have two updates at the same FromTime with our existing sources).

Comment on lines +457 to +479
while let Some((_, _, r)) =
updates1.next_if(|(d, t, _)| (d, t) == (&d1, &t1))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This construction suggests that chains are not necessarily consolidated. Should that be made to be the case? Fine if not, of course, but the more structure that exists the better!

Implement a new reclock operator which has asymptotically better
performance than the existing one by taking advantage of a new
technique and a new assumption.

The technique exploited by this implementation is maintaining the pile
of data waiting to be reclocked in runs of updates that are mutually
comparable in their partial order. Maintaining those runs (called
chains[1]) allows for very efficient extraction of new batches of data
as we can leverage binary search within each chain. Furthermore, for all
the concrete timestamp types used for reclocking the linear extension
order implemented by `Ord` leads to an optimal number of chains for a
given subset.

The new assumption exploited by this implementation is assuming that the
timestamp we're reclocking onto is totally ordered. This is by no means
a requirement but mostly a result of the time pressure to release this
to fix #22128. The implementation of reclocking onto a `Lattice` would
result in much more code that would have to review in short notice so it
is left as future work.

[1] https://en.wikipedia.org/wiki/Total_order#Chains

Signed-off-by: Petros Angelatos <petrosagg@gmail.com>
@petrosagg
Copy link
Contributor Author

@frankmcsherry thanks for the review! I fixed all your comments and replaced both loop constructs with a while loop and an appropriate condition.

@petrosagg petrosagg enabled auto-merge May 10, 2024 09:21
@petrosagg petrosagg merged commit 49ae9f5 into MaterializeInc:main May 10, 2024
73 checks passed
@petrosagg petrosagg deleted the faster-reclocking branch May 10, 2024 10:13
@bosconi bosconi added the A-storage Area: storage label Jun 29, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-storage Area: storage
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants