Skip to content

Commit

Permalink
Revert "migrate to fastnear (#355)" (#357)
Browse files Browse the repository at this point in the history
This reverts commit ec9bed7.
  • Loading branch information
kobayurii authored Oct 4, 2024
1 parent d7f7b0f commit 168622d
Show file tree
Hide file tree
Showing 19 changed files with 753 additions and 460 deletions.
510 changes: 199 additions & 311 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,5 +68,5 @@ near-vm-runner = { git = 'https://github.com/kobayurii/nearcore.git', branch = "
"near_vm",
] }

near-lake-framework = { git = 'https://github.com/kobayurii/near-lake-framework-rs.git', branch = 'fork/0.7.11' }
near-lake-framework = { git = 'https://github.com/kobayurii/near-lake-framework-rs.git', branch = '0.7.20' }
near-jsonrpc-client = { git = 'https://github.com/kobayurii/near-jsonrpc-client-rs.git', branch = '0.13.3' }
87 changes: 87 additions & 0 deletions cache-storage/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use futures::FutureExt;
use near_indexer_primitives::near_primitives;

mod utils;

Expand Down Expand Up @@ -114,6 +115,92 @@ impl RedisCacheStorage {
}
}

#[derive(Clone)]
pub struct BlocksByFinalityCache {
cache_storage: RedisCacheStorage,
}

/// Sets the keys in Redis shared between the ReadRPC components about the most recent
/// blocks based on finalities (final or optimistic).
/// `final_height` of `optimistic_height` depending on `block_type` passed.
/// Additionally, sets the JSON serialized `StreamerMessage` into keys `final` or `optimistic`
/// accordingly.
impl BlocksByFinalityCache {
// Use redis database 0(default for redis) for handling the blocks by finality cache.
pub async fn new(redis_url: String) -> anyhow::Result<Self> {
Ok(Self {
cache_storage: RedisCacheStorage::new(redis_url, 0).await?,
})
}

pub async fn update_block_by_finality(
&self,
finality: near_indexer_primitives::near_primitives::types::Finality,
streamer_message: &near_indexer_primitives::StreamerMessage,
) -> anyhow::Result<()> {
let block_height = streamer_message.block.header.height;
let block_type = serde_json::to_string(&finality)?;

let last_height = self
.cache_storage
.get(format!("{}_height", block_type))
.await
.unwrap_or(0);

// If the block height is greater than the last height, update the block streamer message
// if we have a few indexers running, we need to make sure that we are not updating the same block
// or block which is already processed or block less than the last processed block
if block_height > last_height {
let json_streamer_message = serde_json::to_string(streamer_message)?;
// Update the last block height
// Create a clone of the redis client and redis cmd to avoid borrowing issues
let update_height_feature = self
.cache_storage
.set(format!("{}_height", block_type), block_height);

// Update the block streamer message
// Create a clone of the redis client and redis cmd to avoid borrowing issues
let update_stream_msg_feature =
self.cache_storage.set(block_type, json_streamer_message);

// Wait for both futures to complete
futures::future::join_all([
update_height_feature.boxed(),
update_stream_msg_feature.boxed(),
])
.await
.into_iter()
.collect::<anyhow::Result<_>>()?;
};

Ok(())
}

pub async fn get_block_by_finality(
&self,
finality: near_indexer_primitives::near_primitives::types::Finality,
) -> anyhow::Result<near_indexer_primitives::StreamerMessage> {
let block_type = serde_json::to_string(&finality)?;
let resp: String = self.cache_storage.get(block_type).await?;
Ok(serde_json::from_str(&resp)?)
}

pub async fn update_protocol_version(
&self,
protocol_version: near_primitives::types::ProtocolVersion,
) -> anyhow::Result<()> {
self.cache_storage
.set("protocol_version", protocol_version)
.await
}

pub async fn get_protocol_version(
&self,
) -> anyhow::Result<near_primitives::types::ProtocolVersion> {
self.cache_storage.get("protocol_version").await
}
}

#[derive(Clone)]
pub struct TxIndexerCache {
cache_storage: RedisCacheStorage,
Expand Down
5 changes: 4 additions & 1 deletion config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,10 @@ tracked_accounts = "${TRACKED_ACCOUNTS}"
tracked_changes = "${TRACKED_CHANGES}"

[lake_config]
num_threads = "${LAKE_NUM_THREADS}"
aws_access_key_id = "${AWS_ACCESS_KEY_ID}"
aws_secret_access_key = "${AWS_SECRET_ACCESS_KEY}"
aws_default_region = "${AWS_DEFAULT_REGION}"
aws_bucket_name = "${AWS_BUCKET_NAME}"

[tx_details_storage]
bucket_name = "${TX_BUCKET_NAME}"
Expand Down
3 changes: 3 additions & 0 deletions configuration/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ license.workspace = true

[dependencies]
anyhow = "1.0.70"
aws-credential-types = "1.1.4"
aws-sdk-s3 = { version = "1.14.0", features = ["behavior-version-latest"] }
aws-types = "1.1.4"
dotenv = "0.15.0"
google-cloud-storage = "0.20.0"
lazy_static = "1.4.0"
Expand Down
15 changes: 12 additions & 3 deletions configuration/example.config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,18 @@ near_rpc_url = "https://beta.rpc.mainnet.near.org"

### Lake framework configuration
[lake_config]
# Number of threads to use for fetching data from fastnear
# Default: 2 * available threads
#num_threads = 8

## Lake framework AWS access key id
aws_access_key_id = "${AWS_ACCESS_KEY_ID}"

## Lake framework AWS secret access key
aws_secret_access_key = "${AWS_SECRET_ACCESS_KEY}"

## Lake framework AWS default region
aws_default_region = "eu-central-1"

## Lake framework bucket name
aws_bucket_name = "near-lake-data-mainnet"

[tx_details_storage]
## Transaction details are stored in the S3-compatibe object storage (Google Cloud Storage by default)
Expand Down
77 changes: 61 additions & 16 deletions configuration/src/configs/lake.rs
Original file line number Diff line number Diff line change
@@ -1,43 +1,88 @@
use crate::configs::deserialize_optional_data_or_env;
use aws_sdk_s3::config::StalledStreamProtectionConfig;
use near_lake_framework::near_indexer_primitives::near_primitives;
use serde_derive::Deserialize;

use crate::configs::{deserialize_optional_data_or_env, required_value_or_panic};

#[derive(Debug, Clone)]
pub struct LakeConfig {
pub num_threads: Option<u64>,
pub aws_access_key_id: String,
pub aws_secret_access_key: String,
pub aws_default_region: String,
pub aws_bucket_name: String,
}

impl LakeConfig {
pub async fn s3_config(&self) -> aws_sdk_s3::Config {
let credentials = aws_credential_types::Credentials::new(
&self.aws_access_key_id,
&self.aws_secret_access_key,
None,
None,
"",
);
aws_sdk_s3::Config::builder()
.stalled_stream_protection(StalledStreamProtectionConfig::disabled())
.credentials_provider(credentials)
.region(aws_types::region::Region::new(
self.aws_default_region.clone(),
))
.build()
}

pub async fn lake_config(
&self,
start_block_height: near_primitives::types::BlockHeight,
chain_id: crate::ChainId,
) -> anyhow::Result<near_lake_framework::FastNearConfig> {
let mut config_builder = near_lake_framework::FastNearConfigBuilder::default();
match chain_id {
crate::ChainId::Mainnet => config_builder = config_builder.mainnet(),
// Testnet is the default chain for other chain_id
_ => config_builder = config_builder.testnet(),
};
if let Some(num_threads) = self.num_threads {
config_builder = config_builder.num_threads(num_threads);
};
) -> anyhow::Result<near_lake_framework::LakeConfig> {
let config_builder = near_lake_framework::LakeConfigBuilder::default();
Ok(config_builder
.s3_config(self.s3_config().await)
.s3_region_name(&self.aws_default_region)
.s3_bucket_name(&self.aws_bucket_name)
.start_block_height(start_block_height)
.build()?)
.build()
.expect("Failed to build LakeConfig"))
}

pub async fn lake_s3_client(&self) -> near_lake_framework::s3_fetchers::LakeS3Client {
let s3_config = self.s3_config().await;
near_lake_framework::s3_fetchers::LakeS3Client::new(aws_sdk_s3::Client::from_conf(
s3_config,
))
}
}

#[derive(Deserialize, Debug, Clone, Default)]
pub struct CommonLakeConfig {
#[serde(deserialize_with = "deserialize_optional_data_or_env", default)]
pub num_threads: Option<u64>,
pub aws_access_key_id: Option<String>,
#[serde(deserialize_with = "deserialize_optional_data_or_env", default)]
pub aws_secret_access_key: Option<String>,
#[serde(deserialize_with = "deserialize_optional_data_or_env", default)]
pub aws_default_region: Option<String>,
#[serde(deserialize_with = "deserialize_optional_data_or_env", default)]
pub aws_bucket_name: Option<String>,
}

impl From<CommonLakeConfig> for LakeConfig {
fn from(common_config: CommonLakeConfig) -> Self {
Self {
num_threads: common_config.num_threads,
aws_access_key_id: required_value_or_panic(
"aws_access_key_id",
common_config.aws_access_key_id,
),
aws_secret_access_key: required_value_or_panic(
"aws_secret_access_key",
common_config.aws_secret_access_key,
),
aws_default_region: required_value_or_panic(
"aws_default_region",
common_config.aws_default_region,
),
aws_bucket_name: required_value_or_panic(
"aws_bucket_name",
common_config.aws_bucket_name,
),
}
}
}
1 change: 1 addition & 0 deletions near-state-indexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ tokio = { version = "1.36.0", features = [
tokio-stream = "0.1"
tracing = "0.1.34"

cache-storage.workspace = true
configuration.workspace = true
database.workspace = true
logic-state-indexer.workspace = true
Expand Down
24 changes: 23 additions & 1 deletion near-state-indexer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use logic_state_indexer::{handle_streamer_message, NearClient, INDEXER};
mod configs;
mod metrics;
mod near_client;
mod utils;

#[actix_web::main]
async fn main() -> anyhow::Result<()> {
Expand Down Expand Up @@ -58,6 +59,12 @@ async fn run(home_dir: std::path::PathBuf) -> anyhow::Result<()> {
let state_indexer_config =
configuration::read_configuration::<configuration::NearStateIndexerConfig>().await?;

tracing::info!(target: INDEXER, "Connecting to redis...");
let finality_blocks_storage = cache_storage::BlocksByFinalityCache::new(
state_indexer_config.general.redis_url.to_string(),
)
.await?;

tracing::info!(target: INDEXER, "Setup near_indexer...");
let indexer_config = near_indexer::IndexerConfig {
home_dir,
Expand All @@ -73,7 +80,7 @@ async fn run(home_dir: std::path::PathBuf) -> anyhow::Result<()> {
// Regular indexer process starts here
tracing::info!(target: INDEXER, "Instantiating the stream...");
let stream = indexer.streamer();
let (view_client, _) = indexer.client_actors();
let (view_client, client) = indexer.client_actors();

let near_client = near_client::NearViewClient::new(view_client.clone());
let protocol_config_view = near_client.protocol_config().await?;
Expand All @@ -91,6 +98,21 @@ async fn run(home_dir: std::path::PathBuf) -> anyhow::Result<()> {
near_client.clone(),
));

// Initiate the job of updating the optimistic blocks to Redis
tokio::spawn(utils::update_block_in_redis_by_finality(
view_client.clone(),
client.clone(),
finality_blocks_storage.clone(),
near_indexer_primitives::near_primitives::types::Finality::None,
));
// And the same job for the final blocks
tokio::spawn(utils::update_block_in_redis_by_finality(
view_client.clone(),
client.clone(),
finality_blocks_storage.clone(),
near_indexer_primitives::near_primitives::types::Finality::Final,
));

// ! Note that the `handle_streamer_message` doesn't interact with the Redis
tracing::info!(target: INDEXER, "Starting near_state_indexer...");
let mut handlers = tokio_stream::wrappers::ReceiverStream::new(stream)
Expand Down
Loading

0 comments on commit 168622d

Please sign in to comment.