Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

change(state): Remove active_value field from ChainTipSender #4175

Merged
merged 6 commits into from
Apr 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 9 additions & 6 deletions .github/workflows/test-full-sync.yml
Original file line number Diff line number Diff line change
Expand Up @@ -236,12 +236,15 @@ jobs:
- name: Full sync
id: full-sync
run: |
gcloud compute ssh \
full-sync-${{ env.GITHUB_REF_SLUG_URL }}-${{ env.GITHUB_SHA_SHORT }} \
--zone ${{ env.ZONE }} \
--quiet \
--ssh-flag="-o ServerAliveInterval=5" \
--command="docker logs --follow ${{ env.CONTAINER_NAME }}"
for RETRY in 1 2 3 4; do
gcloud compute ssh \
full-sync-${{ env.GITHUB_REF_SLUG_URL }}-${{ env.GITHUB_SHA_SHORT }} \
--zone ${{ env.ZONE }} \
--quiet \
--ssh-flag="-o ServerAliveInterval=15" \
--command="docker logs --follow ${{ env.CONTAINER_NAME }}" \
|| echo "ssh disconnected $RETRY times"
done

EXIT_CODE=$(\
gcloud compute ssh \
Expand Down
84 changes: 48 additions & 36 deletions zebra-state/src/service/chain_tip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ use std::sync::Arc;

use chrono::{DateTime, Utc};
use tokio::sync::watch;
use tracing::instrument;
use tracing::{field, instrument};

#[cfg(any(test, feature = "proptest-impl"))]
use proptest_derive::Arbitrary;
Expand Down Expand Up @@ -120,10 +120,6 @@ pub struct ChainTipSender {

/// The sender channel for chain tip data.
sender: watch::Sender<ChainTipData>,

/// A copy of the data in `sender`.
// TODO: Replace with calls to `watch::Sender::borrow` once Tokio is updated to 1.0.0 (#2573)
active_value: ChainTipData,
}

impl ChainTipSender {
Expand All @@ -135,14 +131,13 @@ impl ChainTipSender {
network: Network,
) -> (Self, LatestChainTip, ChainTipChange) {
let initial_tip = initial_tip.into();
ChainTipSender::record_new_tip(&initial_tip);
Self::record_new_tip(&initial_tip);

let (sender, receiver) = watch::channel(None);

let mut sender = ChainTipSender {
use_non_finalized_tip: false,
sender,
active_value: None,
};

let current = LatestChainTip::new(receiver);
Expand All @@ -156,21 +151,13 @@ impl ChainTipSender {
/// Update the latest finalized tip.
///
/// May trigger an update to the best tip.
//
// TODO: when we replace active_value with `watch::Sender::borrow`,
// refactor instrument to avoid multiple borrows, to prevent deadlocks
#[instrument(
skip(self, new_tip),
fields(
old_use_non_finalized_tip = ?self.use_non_finalized_tip,
old_height = ?self.active_value.as_ref().map(|block| block.height),
old_hash = ?self.active_value.as_ref().map(|block| block.hash),
new_height,
new_hash,
))]
fields(old_use_non_finalized_tip, old_height, old_hash, new_height, new_hash)
)]
pub fn set_finalized_tip(&mut self, new_tip: impl Into<Option<ChainTipBlock>> + Clone) {
let new_tip = new_tip.into();
ChainTipSender::record_new_tip(&new_tip);
self.record_fields(&new_tip);

if !self.use_non_finalized_tip {
self.update(new_tip);
Expand All @@ -180,24 +167,16 @@ impl ChainTipSender {
/// Update the latest non-finalized tip.
///
/// May trigger an update to the best tip.
//
// TODO: when we replace active_value with `watch::Sender::borrow`,
// refactor instrument to avoid multiple borrows, to prevent deadlocks
#[instrument(
skip(self, new_tip),
fields(
old_use_non_finalized_tip = ?self.use_non_finalized_tip,
old_height = ?self.active_value.as_ref().map(|block| block.height),
old_hash = ?self.active_value.as_ref().map(|block| block.hash),
new_height,
new_hash,
))]
fields(old_use_non_finalized_tip, old_height, old_hash, new_height, new_hash)
)]
pub fn set_best_non_finalized_tip(
&mut self,
new_tip: impl Into<Option<ChainTipBlock>> + Clone,
) {
let new_tip = new_tip.into();
ChainTipSender::record_new_tip(&new_tip);
self.record_fields(&new_tip);

// once the non-finalized state becomes active, it is always populated
// but ignoring `None`s makes the tests easier
Expand All @@ -212,7 +191,11 @@ impl ChainTipSender {
/// An update is only sent if the current best tip is different from the last best tip
/// that was sent.
fn update(&mut self, new_tip: Option<ChainTipBlock>) {
let needs_update = match (new_tip.as_ref(), self.active_value.as_ref()) {
// Correctness: the `self.sender.borrow()` must not be placed in a `let` binding to prevent
// a read-lock being created and living beyond the `self.sender.send(..)` call. If that
// happens, the `send` method will attempt to obtain a write-lock and will dead-lock.
// Without the binding, the guard is dropped at the end of the expression.
let needs_update = match (new_tip.as_ref(), self.sender.borrow().as_ref()) {
teor2345 marked this conversation as resolved.
Show resolved Hide resolved
// since the blocks have been contextually validated,
// we know their hashes cover all the block data
(Some(new_tip), Some(active_value)) => new_tip.hash != active_value.hash,
Expand All @@ -221,22 +204,51 @@ impl ChainTipSender {
};

if needs_update {
let _ = self.sender.send(new_tip.clone());
self.active_value = new_tip;
let _ = self.sender.send(new_tip);
}
}

/// Record `new_tip` in the current span.
///
/// Callers should create a new span with empty `new_height` and `new_hash` fields.
fn record_new_tip(new_tip: &Option<ChainTipBlock>) {
Self::record_tip(&tracing::Span::current(), "new", new_tip);
}

/// Record `new_tip` and the fields from `self` in the current span.
///
/// The fields recorded are:
///
/// - `new_height`
/// - `new_hash`
/// - `old_height`
/// - `old_hash`
/// - `old_use_non_finalized_tip`
///
/// Callers should create a new span with the empty fields described above.
fn record_fields(&self, new_tip: &Option<ChainTipBlock>) {
let span = tracing::Span::current();

let new_height = new_tip.as_ref().map(|block| block.height);
let new_hash = new_tip.as_ref().map(|block| block.hash);
let old_tip = &*self.sender.borrow();

Self::record_tip(&span, "new", new_tip);
Self::record_tip(&span, "old", old_tip);

span.record(
"old_use_non_finalized_tip",
&field::debug(self.use_non_finalized_tip),
);
}

/// Record `tip` into `span` using the `prefix` to name the fields.
///
/// Callers should create a new span with empty `{prefix}_height` and `{prefix}_hash` fields.
fn record_tip(span: &tracing::Span, prefix: &str, tip: &Option<ChainTipBlock>) {
let height = tip.as_ref().map(|block| block.height);
let hash = tip.as_ref().map(|block| block.hash);

span.record("new_height", &tracing::field::debug(new_height));
span.record("new_hash", &tracing::field::debug(new_hash));
span.record(format!("{}_height", prefix).as_str(), &field::debug(height));
span.record(format!("{}_hash", prefix).as_str(), &field::debug(hash));
}
}

Expand Down