Skip to content

Commit

Permalink
cdc: separate resolved region outliers (tikv#11991)
Browse files Browse the repository at this point in the history
Separate broadcasing outlier regions and normal regions,
so 1) downstreams know where they should send resolve lock requests,
and 2) resolved ts of normal regions does not fallback.

close pingcap/tiflow#4516
close pingcap/tiflow#4311
ref pingcap/tiflow#4146

Signed-off-by: Neil Shen <overvenus@gmail.com>
  • Loading branch information
overvenus committed Apr 20, 2022
1 parent b526229 commit 45b39dc
Show file tree
Hide file tree
Showing 4 changed files with 244 additions and 24 deletions.
247 changes: 227 additions & 20 deletions components/cdc/src/endpoint.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
// Copyright 2020 TiKV Project Authors. Licensed under Apache-2.0.

use std::cmp::Reverse;
use std::cmp::{Ord, Ordering as CmpOrdering, PartialOrd};
use std::collections::BinaryHeap;
use std::fmt;
use std::sync::{Arc, Mutex as StdMutex};
use std::time::Duration;
Expand Down Expand Up @@ -60,6 +63,12 @@ use crate::service::{Conn, ConnID, FeatureGate};
use crate::{CdcObserver, Error, Result};

const FEATURE_RESOLVED_TS_STORE: Feature = Feature::require(5, 0, 0);
const METRICS_FLUSH_INTERVAL: u64 = 10_000; // 10s
// 10 minutes, it's the default gc life time of TiDB
// and is long enough for most transactions.
const WARN_RESOLVED_TS_LAG_THRESHOLD: Duration = Duration::from_secs(600);
// Suppress repeat resolved ts lag warnning.
const WARN_RESOLVED_TS_COUNT_THRESHOLD: usize = 10;

pub enum Deregister {
Downstream {
Expand Down Expand Up @@ -224,7 +233,75 @@ impl fmt::Debug for Task {
}
}

const METRICS_FLUSH_INTERVAL: u64 = 10_000; // 10s
#[derive(PartialEq, Eq)]
struct ResolvedRegion {
region_id: u64,
resolved_ts: TimeStamp,
}

impl PartialOrd for ResolvedRegion {
fn partial_cmp(&self, other: &Self) -> Option<CmpOrdering> {
Some(self.cmp(other))
}
}

impl Ord for ResolvedRegion {
fn cmp(&self, other: &Self) -> CmpOrdering {
self.resolved_ts.cmp(&other.resolved_ts)
}
}

struct ResolvedRegionHeap {
// BinaryHeap is max heap, so we reverse order to get a min heap.
heap: BinaryHeap<Reverse<ResolvedRegion>>,
}

impl ResolvedRegionHeap {
fn push(&mut self, region_id: u64, resolved_ts: TimeStamp) {
self.heap.push(Reverse(ResolvedRegion {
region_id,
resolved_ts,
}))
}

// Pop slow regions and the minimum resolved ts among them.
fn pop(&mut self, count: usize) -> (TimeStamp, HashSet<u64>) {
let mut min_resolved_ts = TimeStamp::max();
let mut outliers = HashSet::with_capacity_and_hasher(count, Default::default());
for _ in 0..count {
if let Some(resolved_region) = self.heap.pop() {
outliers.insert(resolved_region.0.region_id);
if min_resolved_ts > resolved_region.0.resolved_ts {
min_resolved_ts = resolved_region.0.resolved_ts;
}
} else {
break;
}
}
(min_resolved_ts, outliers)
}

fn to_hash_set(&self) -> (TimeStamp, HashSet<u64>) {
let mut min_resolved_ts = TimeStamp::max();
let mut regions = HashSet::with_capacity_and_hasher(self.heap.len(), Default::default());
for resolved_region in &self.heap {
regions.insert(resolved_region.0.region_id);
if min_resolved_ts > resolved_region.0.resolved_ts {
min_resolved_ts = resolved_region.0.resolved_ts;
}
}
(min_resolved_ts, regions)
}

fn clear(&mut self) {
self.heap.clear();
}

fn reset_and_shrink_to(&mut self, min_capacity: usize) {
self.clear();
self.heap.shrink_to(min_capacity);
}
}

pub struct Endpoint<T, E> {
cluster_id: u64,
Expand All @@ -245,28 +322,31 @@ pub struct Endpoint<T, E> {
concurrency_manager: ConcurrencyManager,

config: CdcConfig,

// Incremental scan
workers: Runtime,
scan_concurrency_semaphore: Arc<Semaphore>,

scan_speed_limiter: Limiter,
max_scan_batch_bytes: usize,
max_scan_batch_size: usize,
sink_memory_quota: MemoryQuota,

min_resolved_ts: TimeStamp,
min_ts_region_id: u64,
old_value_cache: OldValueCache,
resolved_region_heap: ResolvedRegionHeap,

// stats
resolved_region_count: usize,
unresolved_region_count: usize,

sink_memory_quota: MemoryQuota,

// Check leader
// store_id -> client
tikv_clients: Arc<Mutex<HashMap<u64, TikvClient>>>,
env: Arc<Environment>,
security_mgr: Arc<SecurityManager>,
region_read_progress: RegionReadProgressRegistry,

// Metrics and logging.
min_resolved_ts: TimeStamp,
min_ts_region_id: u64,
resolved_region_count: usize,
unresolved_region_count: usize,
warn_resolved_ts_repeat_count: usize,
}

impl<T: 'static + RaftStoreRouter<E>, E: KvEngine> Endpoint<T, E> {
Expand Down Expand Up @@ -336,12 +416,17 @@ impl<T: 'static + RaftStoreRouter<E>, E: KvEngine> Endpoint<T, E> {
concurrency_manager,
min_resolved_ts: TimeStamp::max(),
min_ts_region_id: 0,
resolved_region_heap: ResolvedRegionHeap {
heap: BinaryHeap::new(),
},
old_value_cache,
resolved_region_count: 0,
unresolved_region_count: 0,
sink_memory_quota,
tikv_clients: Arc::new(Mutex::new(HashMap::default())),
region_read_progress,
// Log the first resolved ts warnning.
warn_resolved_ts_repeat_count: WARN_RESOLVED_TS_COUNT_THRESHOLD,
};
ep.register_min_ts_event();
ep
Expand Down Expand Up @@ -709,30 +794,77 @@ impl<T: 'static + RaftStoreRouter<E>, E: KvEngine> Endpoint<T, E> {
}

fn on_min_ts(&mut self, regions: Vec<u64>, min_ts: TimeStamp) {
// Reset resolved_regions to empty.
let resolved_regions = &mut self.resolved_region_heap;
resolved_regions.clear();

let total_region_count = regions.len();
// TODO: figure out how to avoid create a hashset every time,
// saving some CPU.
let mut resolved_regions =
HashSet::with_capacity_and_hasher(regions.len(), Default::default());
self.min_resolved_ts = TimeStamp::max();
let mut advance_ok = 0;
let mut advance_failed_none = 0;
let mut advance_failed_same = 0;
let mut advance_failed_stale = 0;
for region_id in regions {
if let Some(delegate) = self.capture_regions.get_mut(&region_id) {
let old_resolved_ts = delegate
.resolver
.as_ref()
.map_or(TimeStamp::zero(), |r| r.resolved_ts());
if old_resolved_ts > min_ts {
advance_failed_stale += 1;
}
if let Some(resolved_ts) = delegate.on_min_ts(min_ts) {
if resolved_ts < self.min_resolved_ts {
self.min_resolved_ts = resolved_ts;
self.min_ts_region_id = region_id;
}
resolved_regions.insert(region_id);
resolved_regions.push(region_id, resolved_ts);

if resolved_ts == old_resolved_ts {
advance_failed_same += 1;
} else {
advance_ok += 1;
}
} else {
advance_failed_none += 1;
}
}
}
self.resolved_region_count = resolved_regions.len();
let lag_millis = min_ts
.physical()
.saturating_sub(self.min_resolved_ts.physical());
if Duration::from_millis(lag_millis) > WARN_RESOLVED_TS_LAG_THRESHOLD {
self.warn_resolved_ts_repeat_count += 1;
if self.warn_resolved_ts_repeat_count >= WARN_RESOLVED_TS_COUNT_THRESHOLD {
self.warn_resolved_ts_repeat_count = 0;
warn!("cdc resolved ts lag too large";
"min_resolved_ts" => self.min_resolved_ts,
"min_ts_region_id" => self.min_ts_region_id,
"min_ts" => min_ts,
"ok" => advance_ok,
"none" => advance_failed_none,
"stale" => advance_failed_stale,
"same" => advance_failed_same);
}
}
self.resolved_region_count = resolved_regions.heap.len();
self.unresolved_region_count = total_region_count - self.resolved_region_count;
self.broadcast_resolved_ts(resolved_regions);

// Separate broadcasing outlier regions and normal regions,
// so 1) downstreams know where they should send resolve lock requests,
// and 2) resolved ts of normal regions does not fallback.
//
// Max number of outliers, in most cases, only a few regions are outliers.
// TODO: figure out how to avoid create hashset every time, saving some CPU.
let max_outlier_count = 32;
let (outlier_min_resolved_ts, outlier_regions) = resolved_regions.pop(max_outlier_count);
let (normal_min_resolved_ts, normal_regions) = resolved_regions.to_hash_set();
self.broadcast_resolved_ts(outlier_min_resolved_ts, outlier_regions);
self.broadcast_resolved_ts(normal_min_resolved_ts, normal_regions);
}

fn broadcast_resolved_ts(&self, regions: HashSet<u64>) {
let min_resolved_ts = self.min_resolved_ts.into_inner();
fn broadcast_resolved_ts(&self, min_resolved_ts: TimeStamp, regions: HashSet<u64>) {
let min_resolved_ts = min_resolved_ts.into_inner();
let send_cdc_event = |regions: &HashSet<u64>, min_resolved_ts: u64, conn: &Conn| {
let downstream_regions = conn.get_downstreams();
let mut resolved_ts = ResolvedTs::default();
Expand Down Expand Up @@ -847,7 +979,9 @@ impl<T: 'static + RaftStoreRouter<E>, E: KvEngine> Endpoint<T, E> {
let fut = async move {
let _ = timeout.compat().await;
// Ignore get tso errors since we will retry every `min_ts_interval`.
let mut min_ts = pd_client.get_tso().await.unwrap_or_default();
let min_ts_pd = pd_client.get_tso().await.unwrap_or_default();
let mut min_ts = min_ts_pd;
let mut min_ts_min_lock = min_ts_pd;

// Sync with concurrency manager so that it can work correctly when optimizations
// like async commit is enabled.
Expand All @@ -858,6 +992,7 @@ impl<T: 'static + RaftStoreRouter<E>, E: KvEngine> Endpoint<T, E> {
if min_mem_lock_ts < min_ts {
min_ts = min_mem_lock_ts;
}
min_ts_min_lock = min_mem_lock_ts;
}

match scheduler.schedule(Task::RegisterMinTsEvent) {
Expand Down Expand Up @@ -900,6 +1035,13 @@ impl<T: 'static + RaftStoreRouter<E>, E: KvEngine> Endpoint<T, E> {
Err(err) => panic!("failed to schedule min ts event, error: {:?}", err),
}
}
let lag_millis = min_ts_pd.physical().saturating_sub(min_ts.physical());
if Duration::from_millis(lag_millis) > WARN_RESOLVED_TS_LAG_THRESHOLD {
// TODO: Suppress repeat logs by using WARN_RESOLVED_TS_COUNT_THRESHOLD.
info!("cdc min_ts lag too large";
"min_ts" => min_ts, "min_ts_pd" => min_ts_pd,
"min_ts_min_lock" => min_ts_min_lock);
}
};
self.tso_worker.spawn(fut);
}
Expand Down Expand Up @@ -1454,6 +1596,10 @@ impl<T: 'static + RaftStoreRouter<E>, E: KvEngine> Runnable for Endpoint<T, E> {

impl<T: 'static + RaftStoreRouter<E>, E: KvEngine> RunnableWithTimer for Endpoint<T, E> {
fn on_timeout(&mut self) {
// Reclaim resolved_region_heap memory.
self.resolved_region_heap
.reset_and_shrink_to(self.capture_regions.len());

CDC_CAPTURED_REGION_COUNT.set(self.capture_regions.len() as i64);
CDC_REGION_RESOLVE_STATUS_GAUGE_VEC
.with_label_values(&["unresolved"])
Expand Down Expand Up @@ -2723,4 +2869,65 @@ mod tests {
event
);
}

#[test]
fn test_resolved_region_heap() {
let mut heap = ResolvedRegionHeap {
heap: BinaryHeap::new(),
};
heap.push(5, 5.into());
heap.push(4, 4.into());
heap.push(6, 6.into());
heap.push(3, 3.into());

let (ts, regions) = heap.pop(0);
assert_eq!(ts, TimeStamp::max());
assert!(regions.is_empty());

let (ts, regions) = heap.pop(2);
assert_eq!(ts, 3.into());
assert_eq!(regions.len(), 2);
assert!(regions.contains(&3));
assert!(regions.contains(&4));

// Pop outliers more then it has.
let (ts, regions) = heap.pop(3);
assert_eq!(ts, 5.into());
assert_eq!(regions.len(), 2);
assert!(regions.contains(&5));
assert!(regions.contains(&6));

// Empty regions
let (ts, regions) = heap.to_hash_set();
assert_eq!(ts, TimeStamp::max());
assert!(regions.is_empty());

let mut heap1 = ResolvedRegionHeap {
heap: BinaryHeap::new(),
};
heap1.push(5, 5.into());
heap1.push(4, 4.into());
heap1.push(6, 6.into());
heap1.push(3, 3.into());

let (ts, regions) = heap1.pop(1);
assert_eq!(ts, 3.into());
assert_eq!(regions.len(), 1);
assert!(regions.contains(&3));

let (ts, regions) = heap1.to_hash_set();
assert_eq!(ts, 4.into());
assert_eq!(regions.len(), 3);
assert!(regions.contains(&4));
assert!(regions.contains(&5));
assert!(regions.contains(&6));

heap1.reset_and_shrink_to(3);
assert_eq!(3, heap1.heap.capacity());
assert!(heap1.heap.is_empty());

heap1.push(1, 1.into());
heap1.clear();
assert!(heap1.heap.is_empty());
}
}
1 change: 1 addition & 0 deletions components/cdc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#![feature(box_patterns)]
#![feature(assert_matches)]
#![feature(shrink_to)]

mod channel;
mod config;
Expand Down
Loading

0 comments on commit 45b39dc

Please sign in to comment.