Skip to content

Commit

Permalink
cdc: separate resolved region outliers (#11991) (#12000)
Browse files Browse the repository at this point in the history
ref pingcap/tiflow#4146, close pingcap/tiflow#4311, close pingcap/tiflow#4516, ref #11991, close #11993

Separate broadcasing outlier regions and normal regions,
so 1) downstreams know where they should send resolve lock requests,
and 2) resolved ts of normal regions does not fallback.

Signed-off-by: Neil Shen <overvenus@gmail.com>

Co-authored-by: Neil Shen <overvenus@gmail.com>
  • Loading branch information
ti-srebot and overvenus authored Feb 21, 2022
1 parent 2f807ea commit 9a5cf19
Show file tree
Hide file tree
Showing 7 changed files with 322 additions and 49 deletions.
247 changes: 227 additions & 20 deletions components/cdc/src/endpoint.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0.

use std::cmp::Reverse;
use std::cmp::{Ord, Ordering as CmpOrdering, PartialOrd};
use std::collections::BinaryHeap;
use std::f64::INFINITY;
use std::fmt;
use std::marker::PhantomData;
Expand Down Expand Up @@ -54,6 +57,12 @@ use crate::service::{Conn, ConnID, FeatureGate};
use crate::{CdcObserver, Error, Result};

const FEATURE_RESOLVED_TS_STORE: Feature = Feature::require(5, 0, 0);
const METRICS_FLUSH_INTERVAL: u64 = 10_000; // 10s
// 10 minutes, it's the default gc life time of TiDB
// and is long enough for most transactions.
const WARN_RESOLVED_TS_LAG_THRESHOLD: Duration = Duration::from_secs(600);
// Suppress repeat resolved ts lag warnning.
const WARN_RESOLVED_TS_COUNT_THRESHOLD: usize = 10;

pub enum Deregister {
Downstream {
Expand Down Expand Up @@ -218,7 +227,75 @@ impl fmt::Debug for Task {
}
}

const METRICS_FLUSH_INTERVAL: u64 = 10_000; // 10s
#[derive(PartialEq, Eq)]
struct ResolvedRegion {
region_id: u64,
resolved_ts: TimeStamp,
}

impl PartialOrd for ResolvedRegion {
fn partial_cmp(&self, other: &Self) -> Option<CmpOrdering> {
Some(self.cmp(other))
}
}

impl Ord for ResolvedRegion {
fn cmp(&self, other: &Self) -> CmpOrdering {
self.resolved_ts.cmp(&other.resolved_ts)
}
}

struct ResolvedRegionHeap {
// BinaryHeap is max heap, so we reverse order to get a min heap.
heap: BinaryHeap<Reverse<ResolvedRegion>>,
}

impl ResolvedRegionHeap {
fn push(&mut self, region_id: u64, resolved_ts: TimeStamp) {
self.heap.push(Reverse(ResolvedRegion {
region_id,
resolved_ts,
}))
}

// Pop slow regions and the minimum resolved ts among them.
fn pop(&mut self, count: usize) -> (TimeStamp, HashSet<u64>) {
let mut min_resolved_ts = TimeStamp::max();
let mut outliers = HashSet::with_capacity_and_hasher(count, Default::default());
for _ in 0..count {
if let Some(resolved_region) = self.heap.pop() {
outliers.insert(resolved_region.0.region_id);
if min_resolved_ts > resolved_region.0.resolved_ts {
min_resolved_ts = resolved_region.0.resolved_ts;
}
} else {
break;
}
}
(min_resolved_ts, outliers)
}

fn to_hash_set(&self) -> (TimeStamp, HashSet<u64>) {
let mut min_resolved_ts = TimeStamp::max();
let mut regions = HashSet::with_capacity_and_hasher(self.heap.len(), Default::default());
for resolved_region in &self.heap {
regions.insert(resolved_region.0.region_id);
if min_resolved_ts > resolved_region.0.resolved_ts {
min_resolved_ts = resolved_region.0.resolved_ts;
}
}
(min_resolved_ts, regions)
}

fn clear(&mut self) {
self.heap.clear();
}

fn reset_and_shrink_to(&mut self, min_capacity: usize) {
self.clear();
self.heap.shrink_to(min_capacity);
}
}

pub struct Endpoint<T, E> {
cluster_id: u64,
Expand All @@ -239,28 +316,31 @@ pub struct Endpoint<T, E> {
concurrency_manager: ConcurrencyManager,

config: CdcConfig,

// Incremental scan
workers: Runtime,
scan_concurrency_semaphore: Arc<Semaphore>,

scan_speed_limiter: Limiter,
max_scan_batch_bytes: usize,
max_scan_batch_size: usize,
sink_memory_quota: MemoryQuota,

min_resolved_ts: TimeStamp,
min_ts_region_id: u64,
old_value_cache: OldValueCache,
resolved_region_heap: ResolvedRegionHeap,

// stats
resolved_region_count: usize,
unresolved_region_count: usize,

sink_memory_quota: MemoryQuota,

// Check leader
// store_id -> client
tikv_clients: Arc<Mutex<HashMap<u64, TikvClient>>>,
env: Arc<Environment>,
security_mgr: Arc<SecurityManager>,
region_read_progress: RegionReadProgressRegistry,

// Metrics and logging.
min_resolved_ts: TimeStamp,
min_ts_region_id: u64,
resolved_region_count: usize,
unresolved_region_count: usize,
warn_resolved_ts_repeat_count: usize,
}

impl<T: 'static + RaftStoreRouter<E>, E: KvEngine> Endpoint<T, E> {
Expand Down Expand Up @@ -329,12 +409,17 @@ impl<T: 'static + RaftStoreRouter<E>, E: KvEngine> Endpoint<T, E> {
concurrency_manager,
min_resolved_ts: TimeStamp::max(),
min_ts_region_id: 0,
resolved_region_heap: ResolvedRegionHeap {
heap: BinaryHeap::new(),
},
old_value_cache,
resolved_region_count: 0,
unresolved_region_count: 0,
sink_memory_quota,
tikv_clients: Arc::new(Mutex::new(HashMap::default())),
region_read_progress,
// Log the first resolved ts warnning.
warn_resolved_ts_repeat_count: WARN_RESOLVED_TS_COUNT_THRESHOLD,
};
ep.register_min_ts_event();
ep
Expand Down Expand Up @@ -692,30 +777,77 @@ impl<T: 'static + RaftStoreRouter<E>, E: KvEngine> Endpoint<T, E> {
}

fn on_min_ts(&mut self, regions: Vec<u64>, min_ts: TimeStamp) {
// Reset resolved_regions to empty.
let resolved_regions = &mut self.resolved_region_heap;
resolved_regions.clear();

let total_region_count = regions.len();
// TODO: figure out how to avoid create a hashset every time,
// saving some CPU.
let mut resolved_regions =
HashSet::with_capacity_and_hasher(regions.len(), Default::default());
self.min_resolved_ts = TimeStamp::max();
let mut advance_ok = 0;
let mut advance_failed_none = 0;
let mut advance_failed_same = 0;
let mut advance_failed_stale = 0;
for region_id in regions {
if let Some(delegate) = self.capture_regions.get_mut(&region_id) {
let old_resolved_ts = delegate
.resolver
.as_ref()
.map_or(TimeStamp::zero(), |r| r.resolved_ts());
if old_resolved_ts > min_ts {
advance_failed_stale += 1;
}
if let Some(resolved_ts) = delegate.on_min_ts(min_ts) {
if resolved_ts < self.min_resolved_ts {
self.min_resolved_ts = resolved_ts;
self.min_ts_region_id = region_id;
}
resolved_regions.insert(region_id);
resolved_regions.push(region_id, resolved_ts);

if resolved_ts == old_resolved_ts {
advance_failed_same += 1;
} else {
advance_ok += 1;
}
} else {
advance_failed_none += 1;
}
}
}
self.resolved_region_count = resolved_regions.len();
let lag_millis = min_ts
.physical()
.saturating_sub(self.min_resolved_ts.physical());
if Duration::from_millis(lag_millis) > WARN_RESOLVED_TS_LAG_THRESHOLD {
self.warn_resolved_ts_repeat_count += 1;
if self.warn_resolved_ts_repeat_count >= WARN_RESOLVED_TS_COUNT_THRESHOLD {
self.warn_resolved_ts_repeat_count = 0;
warn!("cdc resolved ts lag too large";
"min_resolved_ts" => self.min_resolved_ts,
"min_ts_region_id" => self.min_ts_region_id,
"min_ts" => min_ts,
"ok" => advance_ok,
"none" => advance_failed_none,
"stale" => advance_failed_stale,
"same" => advance_failed_same);
}
}
self.resolved_region_count = resolved_regions.heap.len();
self.unresolved_region_count = total_region_count - self.resolved_region_count;
self.broadcast_resolved_ts(resolved_regions);

// Separate broadcasing outlier regions and normal regions,
// so 1) downstreams know where they should send resolve lock requests,
// and 2) resolved ts of normal regions does not fallback.
//
// Max number of outliers, in most cases, only a few regions are outliers.
// TODO: figure out how to avoid create hashset every time, saving some CPU.
let max_outlier_count = 32;
let (outlier_min_resolved_ts, outlier_regions) = resolved_regions.pop(max_outlier_count);
let (normal_min_resolved_ts, normal_regions) = resolved_regions.to_hash_set();
self.broadcast_resolved_ts(outlier_min_resolved_ts, outlier_regions);
self.broadcast_resolved_ts(normal_min_resolved_ts, normal_regions);
}

fn broadcast_resolved_ts(&self, regions: HashSet<u64>) {
let min_resolved_ts = self.min_resolved_ts.into_inner();
fn broadcast_resolved_ts(&self, min_resolved_ts: TimeStamp, regions: HashSet<u64>) {
let min_resolved_ts = min_resolved_ts.into_inner();
let send_cdc_event = |regions: &HashSet<u64>, min_resolved_ts: u64, conn: &Conn| {
let downstream_regions = conn.get_downstreams();
let mut resolved_ts = ResolvedTs::default();
Expand Down Expand Up @@ -830,7 +962,9 @@ impl<T: 'static + RaftStoreRouter<E>, E: KvEngine> Endpoint<T, E> {
let fut = async move {
let _ = timeout.compat().await;
// Ignore get tso errors since we will retry every `min_ts_interval`.
let mut min_ts = pd_client.get_tso().await.unwrap_or_default();
let min_ts_pd = pd_client.get_tso().await.unwrap_or_default();
let mut min_ts = min_ts_pd;
let mut min_ts_min_lock = min_ts_pd;

// Sync with concurrency manager so that it can work correctly when optimizations
// like async commit is enabled.
Expand All @@ -841,6 +975,7 @@ impl<T: 'static + RaftStoreRouter<E>, E: KvEngine> Endpoint<T, E> {
if min_mem_lock_ts < min_ts {
min_ts = min_mem_lock_ts;
}
min_ts_min_lock = min_mem_lock_ts;
}

match scheduler.schedule(Task::RegisterMinTsEvent) {
Expand Down Expand Up @@ -883,6 +1018,13 @@ impl<T: 'static + RaftStoreRouter<E>, E: KvEngine> Endpoint<T, E> {
Err(err) => panic!("failed to schedule min ts event, error: {:?}", err),
}
}
let lag_millis = min_ts_pd.physical().saturating_sub(min_ts.physical());
if Duration::from_millis(lag_millis) > WARN_RESOLVED_TS_LAG_THRESHOLD {
// TODO: Suppress repeat logs by using WARN_RESOLVED_TS_COUNT_THRESHOLD.
info!("cdc min_ts lag too large";
"min_ts" => min_ts, "min_ts_pd" => min_ts_pd,
"min_ts_min_lock" => min_ts_min_lock);
}
};
self.tso_worker.spawn(fut);
}
Expand Down Expand Up @@ -1356,6 +1498,10 @@ impl<T: 'static + RaftStoreRouter<E>, E: KvEngine> Runnable for Endpoint<T, E> {

impl<T: 'static + RaftStoreRouter<E>, E: KvEngine> RunnableWithTimer for Endpoint<T, E> {
fn on_timeout(&mut self) {
// Reclaim resolved_region_heap memory.
self.resolved_region_heap
.reset_and_shrink_to(self.capture_regions.len());

CDC_CAPTURED_REGION_COUNT.set(self.capture_regions.len() as i64);
CDC_REGION_RESOLVE_STATUS_GAUGE_VEC
.with_label_values(&["unresolved"])
Expand Down Expand Up @@ -2506,4 +2652,65 @@ mod tests {
event
);
}

#[test]
fn test_resolved_region_heap() {
let mut heap = ResolvedRegionHeap {
heap: BinaryHeap::new(),
};
heap.push(5, 5.into());
heap.push(4, 4.into());
heap.push(6, 6.into());
heap.push(3, 3.into());

let (ts, regions) = heap.pop(0);
assert_eq!(ts, TimeStamp::max());
assert!(regions.is_empty());

let (ts, regions) = heap.pop(2);
assert_eq!(ts, 3.into());
assert_eq!(regions.len(), 2);
assert!(regions.contains(&3));
assert!(regions.contains(&4));

// Pop outliers more then it has.
let (ts, regions) = heap.pop(3);
assert_eq!(ts, 5.into());
assert_eq!(regions.len(), 2);
assert!(regions.contains(&5));
assert!(regions.contains(&6));

// Empty regions
let (ts, regions) = heap.to_hash_set();
assert_eq!(ts, TimeStamp::max());
assert!(regions.is_empty());

let mut heap1 = ResolvedRegionHeap {
heap: BinaryHeap::new(),
};
heap1.push(5, 5.into());
heap1.push(4, 4.into());
heap1.push(6, 6.into());
heap1.push(3, 3.into());

let (ts, regions) = heap1.pop(1);
assert_eq!(ts, 3.into());
assert_eq!(regions.len(), 1);
assert!(regions.contains(&3));

let (ts, regions) = heap1.to_hash_set();
assert_eq!(ts, 4.into());
assert_eq!(regions.len(), 3);
assert!(regions.contains(&4));
assert!(regions.contains(&5));
assert!(regions.contains(&6));

heap1.reset_and_shrink_to(3);
assert_eq!(3, heap1.heap.capacity());
assert!(heap1.heap.is_empty());

heap1.push(1, 1.into());
heap1.clear();
assert!(heap1.heap.is_empty());
}
}
1 change: 1 addition & 0 deletions components/cdc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#![feature(box_patterns)]
#![feature(assert_matches)]
#![feature(shrink_to)]

mod channel;
mod config;
Expand Down
3 changes: 0 additions & 3 deletions components/raftstore/src/store/local_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,6 @@ pub struct RaftMetrics {
pub propose: RaftProposeMetrics,
pub process_ready: LocalHistogram,
pub commit_log: LocalHistogram,
pub check_leader: LocalHistogram,
pub leader_missing: Arc<Mutex<HashSet<u64>>>,
pub invalid_proposal: RaftInvalidProposeMetrics,
pub write_block_wait: LocalHistogram,
Expand All @@ -410,7 +409,6 @@ impl RaftMetrics {
.with_label_values(&["ready"])
.local(),
commit_log: PEER_COMMIT_LOG_HISTOGRAM.local(),
check_leader: CHECK_LEADER_DURATION_HISTOGRAM.local(),
leader_missing: Arc::default(),
invalid_proposal: Default::default(),
write_block_wait: STORE_WRITE_MSG_BLOCK_WAIT_DURATION_HISTOGRAM.local(),
Expand All @@ -431,7 +429,6 @@ impl RaftMetrics {
self.propose.flush();
self.process_ready.flush();
self.commit_log.flush();
self.check_leader.flush();
self.message_dropped.flush();
self.invalid_proposal.flush();
self.write_block_wait.flush();
Expand Down
6 changes: 0 additions & 6 deletions components/raftstore/src/store/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -345,12 +345,6 @@ lazy_static! {
pub static ref PEER_ADMIN_CMD_COUNTER: AdminCmdVec =
auto_flush_from!(PEER_ADMIN_CMD_COUNTER_VEC, AdminCmdVec);

pub static ref CHECK_LEADER_DURATION_HISTOGRAM: Histogram =
register_histogram!(
"tikv_resolved_ts_check_leader_duration_seconds",
"Bucketed histogram of handling check leader request duration",
exponential_buckets(0.005, 2.0, 20).unwrap()
).unwrap();
pub static ref PEER_COMMIT_LOG_HISTOGRAM: Histogram =
register_histogram!(
"tikv_raftstore_commit_log_duration_seconds",
Expand Down
Loading

0 comments on commit 9a5cf19

Please sign in to comment.