Skip to content

Commit

Permalink
Merge branch '966-launchpad-events' of https://github.com/near/queryapi
Browse files Browse the repository at this point in the history
… into 966-launchpad-events
  • Loading branch information
Kevin101Zhang committed Aug 1, 2024
2 parents a14431c + e7d17fa commit 051d375
Show file tree
Hide file tree
Showing 6 changed files with 340 additions and 130 deletions.
6 changes: 6 additions & 0 deletions coordinator/src/indexer_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ pub struct IndexerConfig {
pub rule: Rule,
pub updated_at_block_height: Option<u64>,
pub created_at_block_height: u64,
pub deleted_at_block_height: Option<u64>,
}

impl KeyProvider for IndexerConfig {
Expand All @@ -39,6 +40,7 @@ impl Default for IndexerConfig {
},
created_at_block_height: 1,
updated_at_block_height: Some(2),
deleted_at_block_height: Some(3),
start_block: StartBlock::Height(100),
}
}
Expand All @@ -53,4 +55,8 @@ impl IndexerConfig {
self.updated_at_block_height
.unwrap_or(self.created_at_block_height)
}

pub fn is_deleted(&self) -> bool {
self.deleted_at_block_height.is_some()
}
}
1 change: 1 addition & 0 deletions coordinator/src/indexer_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,7 @@ mod tests {
},
created_at_block_height: 1,
updated_at_block_height: None,
deleted_at_block_height: None,
start_block: StartBlock::Continue,
};

Expand Down
55 changes: 23 additions & 32 deletions coordinator/src/lifecycle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,13 @@ impl<'a> LifecycleManager<'a> {
#[tracing::instrument(name = "initializing", skip_all)]
async fn handle_initializing(
&self,
config: Option<&IndexerConfig>,
config: &IndexerConfig,
_state: &IndexerState,
) -> LifecycleState {
if config.is_none() {
if config.is_deleted() {
return LifecycleState::Deleting;
}

let config = config.unwrap();

if self
.data_layer_handler
.ensure_provisioned(config)
Expand All @@ -114,15 +112,13 @@ impl<'a> LifecycleManager<'a> {
#[tracing::instrument(name = "running", skip_all)]
async fn handle_running(
&self,
config: Option<&IndexerConfig>,
config: &IndexerConfig,
state: &mut IndexerState,
) -> LifecycleState {
if config.is_none() {
if config.is_deleted() {
return LifecycleState::Deleting;
}

let config = config.unwrap();

if !state.enabled {
return LifecycleState::Stopping;
}
Expand All @@ -149,13 +145,11 @@ impl<'a> LifecycleManager<'a> {
}

#[tracing::instrument(name = "stopping", skip_all)]
async fn handle_stopping(&self, config: Option<&IndexerConfig>) -> LifecycleState {
if config.is_none() {
async fn handle_stopping(&self, config: &IndexerConfig) -> LifecycleState {
if config.is_deleted() {
return LifecycleState::Deleting;
}

let config = config.unwrap();

if let Err(error) = self
.block_streams_handler
.stop_if_needed(config.account_id.clone(), config.function_name.clone())
Expand All @@ -178,12 +172,8 @@ impl<'a> LifecycleManager<'a> {
}

#[tracing::instrument(name = "stopped", skip_all)]
async fn handle_stopped(
&self,
config: Option<&IndexerConfig>,
state: &IndexerState,
) -> LifecycleState {
if config.is_none() {
async fn handle_stopped(&self, config: &IndexerConfig, state: &IndexerState) -> LifecycleState {
if config.is_deleted() {
return LifecycleState::Deleting;
}

Expand All @@ -199,13 +189,12 @@ impl<'a> LifecycleManager<'a> {
#[tracing::instrument(name = "repairing", skip_all)]
async fn handle_repairing(
&self,
_config: Option<&IndexerConfig>,
config: &IndexerConfig,
_state: &IndexerState,
) -> LifecycleState {
// TODO: Re-enable auto deprovision once guard rails in place
// if config.is_none() {
// return LifecycleState::Deleting;
// }
if config.is_deleted() {
return LifecycleState::Deleting;
}

// TODO Add more robust error handling, for now just stop
LifecycleState::Repairing
Expand All @@ -230,7 +219,7 @@ impl<'a> LifecycleManager<'a> {
}

tracing::error!("Temporarily preventing indexer deprovision due to service instability");
LifecycleState::Repairing
LifecycleState::Deleted

// if self.state_manager.delete_state(state).await.is_err() {
// // Retry
Expand Down Expand Up @@ -283,7 +272,11 @@ impl<'a> LifecycleManager<'a> {
)
.await
{
Ok(config) => config,
Ok(Some(config)) => config,
Ok(None) => {
warn!("No matching indexer config was found");
continue;
}
Err(error) => {
warn!(?error, "Failed to fetch config");
continue;
Expand All @@ -304,13 +297,11 @@ impl<'a> LifecycleManager<'a> {
}

let desired_lifecycle_state = match state.lifecycle_state {
LifecycleState::Initializing => {
self.handle_initializing(config.as_ref(), &state).await
}
LifecycleState::Running => self.handle_running(config.as_ref(), &mut state).await,
LifecycleState::Stopping => self.handle_stopping(config.as_ref()).await,
LifecycleState::Stopped => self.handle_stopped(config.as_ref(), &state).await,
LifecycleState::Repairing => self.handle_repairing(config.as_ref(), &state).await,
LifecycleState::Initializing => self.handle_initializing(&config, &state).await,
LifecycleState::Running => self.handle_running(&config, &mut state).await,
LifecycleState::Stopping => self.handle_stopping(&config).await,
LifecycleState::Stopped => self.handle_stopped(&config, &state).await,
LifecycleState::Repairing => self.handle_repairing(&config, &state).await,
LifecycleState::Deleting => self.handle_deleting(&state).await,
LifecycleState::Deleted => LifecycleState::Deleted,
};
Expand Down
37 changes: 24 additions & 13 deletions coordinator/src/registry.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#![cfg_attr(test, allow(dead_code))]

use anyhow::Context;
use serde_json::Value;
use std::collections::hash_map::Iter;
use std::collections::HashMap;

Expand Down Expand Up @@ -130,6 +131,7 @@ impl RegistryImpl {
rule: indexer.rule,
updated_at_block_height: indexer.updated_at_block_height,
created_at_block_height: indexer.created_at_block_height,
deleted_at_block_height: indexer.deleted_at_block_height,
},
)
})
Expand Down Expand Up @@ -194,21 +196,30 @@ impl RegistryImpl {
.context("Failed to fetch indexer")?;

if let QueryResponseKind::CallResult(call_result) = response.kind {
let indexer = serde_json::from_slice::<Option<registry_types::IndexerConfig>>(
&call_result.result,
)?
.map(|indexer| IndexerConfig {
// Handle case where call returns successfully but returns null due to not matching
let raw_json: Value = serde_json::from_slice(&call_result.result)
.context("Failed to deserialize config from JSON provided by RPC call")?;
if raw_json.is_null() {
return Ok(None);
}

// Handle case where we now expect returned JSON to actually parse into config
let config: registry_types::IndexerConfig =
serde_json::from_slice::<registry_types::IndexerConfig>(&call_result.result)
.context("Failed to deserialize config from JSON provided by RPC call")?;
let indexer = IndexerConfig {
account_id: account_id.clone(),
function_name: function_name.to_string(),
code: indexer.code,
schema: indexer.schema,
rule: indexer.rule,
start_block: indexer.start_block,
updated_at_block_height: indexer.updated_at_block_height,
created_at_block_height: indexer.created_at_block_height,
});

return Ok(indexer);
code: config.code,
schema: config.schema,
rule: config.rule,
start_block: config.start_block,
updated_at_block_height: config.updated_at_block_height,
created_at_block_height: config.created_at_block_height,
deleted_at_block_height: config.deleted_at_block_height,
};

return Ok(Some(indexer));
}

anyhow::bail!("Invalid registry response")
Expand Down
Loading

0 comments on commit 051d375

Please sign in to comment.