Skip to content

Commit

Permalink
fix: redis conn (#442)
Browse files Browse the repository at this point in the history
This PR is fixing the management of the redis connection. In the current
approach, we're reusing an existing connection that is being opened at
startup for test purposes, but that can get broken after some time if
blocks take time to arrive.
Fix: we open/close a new connection every time a new block hits the API.
  • Loading branch information
vabanaerytk authored Oct 11, 2023
1 parent f895907 commit 5e6d704
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 89 deletions.
179 changes: 91 additions & 88 deletions components/chainhook-cli/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,11 +225,12 @@ impl Service {
let mut stacks_event = 0;

let ctx = self.ctx.clone();
let mut predicates_db_conn = match self.config.http_api {
match self.config.http_api {
PredicatesApi::On(ref api_config) => {
Some(open_readwrite_predicates_db_conn_or_panic(api_config, &ctx))
// Test redis connection
open_readwrite_predicates_db_conn(api_config)?;
}
PredicatesApi::Off => None,
PredicatesApi::Off => {}
};

for predicate_with_last_scanned_block in leftover_scans {
Expand Down Expand Up @@ -265,17 +266,8 @@ impl Service {
// If no start block specified, depending on the nature the hook, we'd like to retrieve:
// - contract-id
if let PredicatesApi::On(ref config) = self.config.http_api {
let mut predicates_db_conn = match open_readwrite_predicates_db_conn(config)
{
Ok(con) => con,
Err(e) => {
error!(
self.ctx.expect_logger(),
"unable to register predicate: {}",
e.to_string()
);
continue;
}
let Ok(mut predicates_db_conn) = open_readwrite_predicates_db_conn_verbose(&config, &ctx) else {
continue;
};
update_predicate_spec(
&spec.key(),
Expand All @@ -301,17 +293,8 @@ impl Service {
}
ObserverEvent::PredicateEnabled(spec) => {
if let PredicatesApi::On(ref config) = self.config.http_api {
let mut predicates_db_conn = match open_readwrite_predicates_db_conn(config)
{
Ok(con) => con,
Err(e) => {
error!(
self.ctx.expect_logger(),
"unable to enable predicate: {}",
e.to_string()
);
continue;
}
let Ok(mut predicates_db_conn) = open_readwrite_predicates_db_conn_verbose(&config, &ctx) else {
continue;
};
update_predicate_spec(
&spec.key(),
Expand All @@ -329,17 +312,8 @@ impl Service {
}
ObserverEvent::PredicateDeregistered(spec) => {
if let PredicatesApi::On(ref config) = self.config.http_api {
let mut predicates_db_conn = match open_readwrite_predicates_db_conn(config)
{
Ok(con) => con,
Err(e) => {
error!(
self.ctx.expect_logger(),
"unable to deregister predicate: {}",
e.to_string()
);
continue;
}
let Ok(mut predicates_db_conn) = open_readwrite_predicates_db_conn_verbose(&config, &ctx) else {
continue;
};
let predicate_key = spec.key();
let res: Result<(), redis::RedisError> =
Expand All @@ -355,14 +329,20 @@ impl Service {
}
ObserverEvent::BitcoinChainEvent((chain_update, report)) => {
debug!(self.ctx.expect_logger(), "Bitcoin update not stored");
match chain_update {
chainhook_sdk::types::BitcoinChainEvent::ChainUpdatedWithBlocks(data) => {
if let Some(ref mut predicates_db_conn) = predicates_db_conn {
if let PredicatesApi::On(ref config) = self.config.http_api {
let Ok(mut predicates_db_conn) = open_readwrite_predicates_db_conn_verbose(&config, &ctx) else {
continue;
};

match chain_update {
chainhook_sdk::types::BitcoinChainEvent::ChainUpdatedWithBlocks(
data,
) => {
for confirmed_block in &data.confirmed_blocks {
match expire_predicates_for_block(
&Chain::Bitcoin,
confirmed_block.block_identifier.index,
predicates_db_conn,
&mut predicates_db_conn,
&ctx,
) {
Some(expired_predicate_uuids) => {
Expand All @@ -383,14 +363,14 @@ impl Service {
}
}
}
}
chainhook_sdk::types::BitcoinChainEvent::ChainUpdatedWithReorg(data) => {
if let Some(ref mut predicates_db_conn) = predicates_db_conn {
chainhook_sdk::types::BitcoinChainEvent::ChainUpdatedWithReorg(
data,
) => {
for confirmed_block in &data.confirmed_blocks {
match expire_predicates_for_block(
&Chain::Bitcoin,
confirmed_block.block_identifier.index,
predicates_db_conn,
&mut predicates_db_conn,
&ctx,
) {
Some(expired_predicate_uuids) => {
Expand All @@ -412,9 +392,12 @@ impl Service {
}
}
}
}
if let Some(ref mut predicates_db_conn) = predicates_db_conn {
update_stats_from_report(Chain::Bitcoin, report, predicates_db_conn, &ctx);
update_stats_from_report(
Chain::Bitcoin,
report,
&mut predicates_db_conn,
&ctx,
);
}
}
ObserverEvent::StacksChainEvent((chain_event, report)) => {
Expand All @@ -432,15 +415,49 @@ impl Service {
continue;
}
};

match &chain_event {
StacksChainEvent::ChainUpdatedWithBlocks(data) => {
stacks_event += 1;
if let Some(ref mut predicates_db_conn) = predicates_db_conn {
confirm_entries_in_stacks_blocks(
&data.confirmed_blocks,
&stacks_db_conn_rw,
&self.ctx,
);
draft_entries_in_stacks_blocks(
&data.new_blocks,
&stacks_db_conn_rw,
&self.ctx,
)
}
StacksChainEvent::ChainUpdatedWithReorg(data) => {
confirm_entries_in_stacks_blocks(
&data.confirmed_blocks,
&stacks_db_conn_rw,
&self.ctx,
);
draft_entries_in_stacks_blocks(
&data.blocks_to_apply,
&stacks_db_conn_rw,
&self.ctx,
)
}
StacksChainEvent::ChainUpdatedWithMicroblocks(_)
| StacksChainEvent::ChainUpdatedWithMicroblocksReorg(_) => {}
};

if let PredicatesApi::On(ref config) = self.config.http_api {
let Ok(mut predicates_db_conn) = open_readwrite_predicates_db_conn_verbose(&config, &ctx) else {
continue;
};

match &chain_event {
StacksChainEvent::ChainUpdatedWithBlocks(data) => {
stacks_event += 1;
for confirmed_block in &data.confirmed_blocks {
match expire_predicates_for_block(
&Chain::Stacks,
confirmed_block.block_identifier.index,
predicates_db_conn,
&mut predicates_db_conn,
&ctx,
) {
Some(expired_predicate_uuids) => {
Expand All @@ -461,24 +478,12 @@ impl Service {
}
}
}
confirm_entries_in_stacks_blocks(
&data.confirmed_blocks,
&stacks_db_conn_rw,
&self.ctx,
);
draft_entries_in_stacks_blocks(
&data.new_blocks,
&stacks_db_conn_rw,
&self.ctx,
)
}
StacksChainEvent::ChainUpdatedWithReorg(data) => {
if let Some(ref mut predicates_db_conn) = predicates_db_conn {
StacksChainEvent::ChainUpdatedWithReorg(data) => {
for confirmed_block in &data.confirmed_blocks {
match expire_predicates_for_block(
&Chain::Stacks,
confirmed_block.block_identifier.index,
predicates_db_conn,
&mut predicates_db_conn,
&ctx,
) {
Some(expired_predicate_uuids) => {
Expand All @@ -499,23 +504,17 @@ impl Service {
}
}
}
confirm_entries_in_stacks_blocks(
&data.confirmed_blocks,
&stacks_db_conn_rw,
&self.ctx,
);
draft_entries_in_stacks_blocks(
&data.blocks_to_apply,
&stacks_db_conn_rw,
&self.ctx,
)
}
StacksChainEvent::ChainUpdatedWithMicroblocks(_)
| StacksChainEvent::ChainUpdatedWithMicroblocksReorg(_) => {}
StacksChainEvent::ChainUpdatedWithMicroblocks(_)
| StacksChainEvent::ChainUpdatedWithMicroblocksReorg(_) => {}
};
update_stats_from_report(
Chain::Stacks,
report,
&mut predicates_db_conn,
&ctx,
);
};
if let Some(ref mut predicates_db_conn) = predicates_db_conn {
update_stats_from_report(Chain::Stacks, report, predicates_db_conn, &ctx);
}

// Every 32 blocks, we will check if there's a new Stacks file archive to ingest
if stacks_event > 32 {
stacks_event = 0;
Expand Down Expand Up @@ -1103,18 +1102,22 @@ pub fn open_readwrite_predicates_db_conn(
.map_err(|e| format!("unable to connect to db: {}", e.to_string()))
}

pub fn open_readwrite_predicates_db_conn_verbose(
config: &PredicatesApiConfig,
ctx: &Context,
) -> Result<Connection, String> {
let res = open_readwrite_predicates_db_conn(config);
if let Err(ref e) = res {
error!(ctx.expect_logger(), "{}", e.to_string());
}
res
}

pub fn open_readwrite_predicates_db_conn_or_panic(
config: &PredicatesApiConfig,
ctx: &Context,
) -> Connection {
let redis_con = match open_readwrite_predicates_db_conn(config) {
Ok(con) => con,
Err(message) => {
error!(ctx.expect_logger(), "Redis: {}", message.to_string());
panic!();
}
};
redis_con
open_readwrite_predicates_db_conn_verbose(config, ctx).expect("unable to open redis conn")
}

#[cfg(test)]
Expand Down
2 changes: 1 addition & 1 deletion components/chainhook-sdk/src/indexer/stacks/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ pub fn standardize_stacks_block(
index: match block.block_height {
0 => 0,
_ => block.block_height - 1,
}
},
},
timestamp: block.parent_burn_block_timestamp,
metadata: StacksBlockMetadata {
Expand Down

0 comments on commit 5e6d704

Please sign in to comment.