Skip to content

Commit

Permalink
cdc: fix incorrect resolved ts timeout (tikv#8573)
Browse files Browse the repository at this point in the history
Signed-off-by: 5kbpers <tangminghua@pingcap.com>
  • Loading branch information
5kbpers committed Sep 3, 2020
1 parent f39927a commit 5608c5d
Showing 1 changed file with 7 additions and 6 deletions.
13 changes: 7 additions & 6 deletions components/cdc/src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -585,18 +585,20 @@ impl<T: 'static + RaftStoreRouter> Endpoint<T> {

fn register_min_ts_event(&self) {
let timeout = self.timer.delay(self.min_ts_interval);
let tso = self.pd_client.get_tso();
let pd_client = self.pd_client.clone();
let scheduler = self.scheduler.clone();
let raft_router = self.raft_router.clone();
let regions: Vec<(u64, ObserveID)> = self
.capture_regions
.iter()
.map(|(region_id, delegate)| (*region_id, delegate.id))
.collect();
let fut = tso.join(timeout.map_err(|_| unreachable!())).then(
move |tso: pd_client::Result<(TimeStamp, ())>| {
let fut = timeout
.map_err(|_| unreachable!())
.then(move |_| pd_client.get_tso())
.then(move |tso: pd_client::Result<TimeStamp>| {
// Ignore get tso errors since we will retry every `min_ts_interval`.
let (min_ts, _) = tso.unwrap_or((TimeStamp::default(), ()));
let min_ts = tso.unwrap_or_default();
// TODO: send a message to raftstore would consume too much cpu time,
// try to handle it outside raftstore.
for (region_id, observe_id) in regions {
Expand Down Expand Up @@ -640,8 +642,7 @@ impl<T: 'static + RaftStoreRouter> Endpoint<T> {
),
}
Ok(())
},
);
});
self.tso_worker.spawn(fut);
}

Expand Down

0 comments on commit 5608c5d

Please sign in to comment.