Skip to content

Commit

Permalink
Enhance state-indexer compatibility with NEAR Lake and NEAR Indexer F…
Browse files Browse the repository at this point in the history
…ramework (#196)

* add near-state-indexer

* solve compilation problem

* improvements

* add readme
  • Loading branch information
kobayurii authored Mar 7, 2024
1 parent ed631f7 commit 1d01888
Show file tree
Hide file tree
Showing 33 changed files with 4,880 additions and 669 deletions.
4,284 changes: 3,792 additions & 492 deletions Cargo.lock

Large diffs are not rendered by default.

22 changes: 22 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,31 @@ members = [
"configuration",
"database",
"epoch-indexer",
"near-state-indexer",
"perf-testing",
"readnode-primitives",
"rpc-server",
"state-indexer",
"tx-indexer",
]

[workspace.dependencies]

configuration = { path = "configuration" }
database = { path = "database" }
readnode-primitives = { path = "readnode-primitives" }
epoch-indexer = { path = "epoch-indexer" }

near-indexer = { git = 'https://github.com/near/nearcore.git', rev = '6216a118cc3833d916723b9724b9d747c3c4c563' }
near-client = { git = 'https://github.com/near/nearcore.git', rev = '6216a118cc3833d916723b9724b9d747c3c4c563' }
near-o11y = { git = 'https://github.com/near/nearcore.git', rev = '6216a118cc3833d916723b9724b9d747c3c4c563' }
near-indexer-primitives = { git = 'https://github.com/near/nearcore.git', rev = '6216a118cc3833d916723b9724b9d747c3c4c563' }
near-primitives = { git = 'https://github.com/near/nearcore.git', rev = '6216a118cc3833d916723b9724b9d747c3c4c563' }
near-chain-configs = { git = 'https://github.com/near/nearcore.git', rev = '6216a118cc3833d916723b9724b9d747c3c4c563' }
near-crypto = { git = 'https://github.com/near/nearcore.git', rev = '6216a118cc3833d916723b9724b9d747c3c4c563' }
near-jsonrpc-primitives = { git = 'https://github.com/near/nearcore.git', rev = '6216a118cc3833d916723b9724b9d747c3c4c563' }
near-parameters = { git = 'https://github.com/near/nearcore.git', rev = '6216a118cc3833d916723b9724b9d747c3c4c563' }
near-vm-runner = { git = 'https://github.com/near/nearcore.git', rev = '6216a118cc3833d916723b9724b9d747c3c4c563' }

near-lake-framework = { git = 'https://github.com/kobayurii/near-lake-framework-rs.git', branch = '0.7.8' }
near-jsonrpc-client = { git = 'https://github.com/kobayurii/near-jsonrpc-client-rs.git', branch = '0.8.1'}
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ The JSON RPC server implementation that repeats all the APIs current real NEAR J

The indexer built on top of Lake Framework that watches the network and stores the `StateChanges` into the Storage (ScyllaDB) using the designed data schemas.

### [near-state-indexer](near-state-indexer/README.md)

The indexer built on [NEAR Indexer Framework](https://github.com/nearprotocol/nearcore/tree/master/chain/indexer)

### [tx-indexer](tx-indexer/README.md)

The indexer built on top of Lake Framework that watches the network and stores the `Transactions` along with all the related entities (`Receipts`, `ExecutionOutcomes`) into the Storage (ScyllaDB) using the specifically defined `TransactionDetails` structure in a dumped way (using the simplest key-value schema)
Expand Down
4 changes: 4 additions & 0 deletions config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ indexer_id = "${STATE_INDEXER_ID}"
metrics_server_port = "${STATE_SERVER_PORT}"
concurrency = "${CONCURRENCY}"

[general.near_state_indexer]
metrics_server_port = "${NEAR_STATE_SERVER_PORT}"
concurrency = "${NEAR_STATE_CONCURRENCY}"

[general.epoch_indexer]
indexer_id = "${EPOCH_INDEXER_ID}"

Expand Down
2 changes: 1 addition & 1 deletion configuration/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ tracing-subscriber = { version = "0.3.15", features = [
tracing-opentelemetry = { version = "0.19", optional = true }
tracing-stackdriver = "0.7.2" # GCP logs

near-lake-framework = "0.7.7"
near-lake-framework.workspace = true

[features]
tracing-instrumentation = ["dep:opentelemetry-jaeger", "dep:tracing-opentelemetry"]
1 change: 1 addition & 0 deletions configuration/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ The configuration settings are stored in a TOML file. The settings include:
- RPC server settings like the server port and max gas burnt for contract function call.
- Transaction indexer settings like the indexer ID and port for the metrics server.
- State indexer settings like the indexer ID and port for the metrics server.
- Near State indexer settings like the port for the metrics server.
- Epoch indexer settings like the indexer ID.
- Rightsizing settings like the accounts and state changes to track.
- Lake framework settings like the AWS access key ID and secret access key.
Expand Down
11 changes: 11 additions & 0 deletions configuration/example.config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,17 @@ near_rpc_url = "https://beta.rpc.mainnet.near.org"
## Default value is 1
#concurrency = 1

### Near state indexer general configuration
[general.near_state_indexer]

## Port for metrics server
## By default it 8082
#metrics_server_port = 8082

## Concurrency for state-indexer
## Default value is 1
#concurrency = 1

## Epoch indexer general configuration
[general.epoch_indexer]

Expand Down
72 changes: 65 additions & 7 deletions configuration/src/configs/general.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ use std::str::FromStr;

use serde_derive::Deserialize;

use crate::configs::{deserialize_data_or_env, deserialize_optional_data_or_env};
use crate::configs::{
deserialize_data_or_env, deserialize_optional_data_or_env, required_value_or_panic,
};

#[derive(Debug, Clone)]
pub struct GeneralRpcServerConfig {
Expand Down Expand Up @@ -38,6 +40,13 @@ pub struct GeneralStateIndexerConfig {
pub concurrency: usize,
}

#[derive(Debug, Clone)]
pub struct GeneralNearStateIndexerConfig {
pub chain_id: ChainId,
pub metrics_server_port: u16,
pub concurrency: usize,
}

#[derive(Debug, Clone)]
pub struct GeneralEpochIndexerConfig {
pub chain_id: ChainId,
Expand All @@ -50,8 +59,8 @@ pub struct GeneralEpochIndexerConfig {
pub struct CommonGeneralConfig {
#[serde(deserialize_with = "deserialize_data_or_env")]
pub chain_id: ChainId,
#[serde(deserialize_with = "deserialize_data_or_env")]
pub near_rpc_url: String,
#[serde(deserialize_with = "deserialize_optional_data_or_env", default)]
pub near_rpc_url: Option<String>,
#[serde(deserialize_with = "deserialize_optional_data_or_env", default)]
pub near_archival_rpc_url: Option<String>,
#[serde(default)]
Expand All @@ -61,6 +70,8 @@ pub struct CommonGeneralConfig {
#[serde(default)]
pub state_indexer: CommonGeneralStateIndexerConfig,
#[serde(default)]
pub near_state_indexer: CommonGeneralNearStateIndexerConfig,
#[serde(default)]
pub epoch_indexer: CommonGeneralEpochIndexerConfig,
}

Expand All @@ -70,6 +81,8 @@ pub enum ChainId {
#[default]
Mainnet,
Testnet,
Betanet,
Localnet,
}

impl FromStr for ChainId {
Expand All @@ -79,6 +92,8 @@ impl FromStr for ChainId {
match s {
"mainnet" => Ok(ChainId::Mainnet),
"testnet" => Ok(ChainId::Testnet),
"localnet" => Ok(ChainId::Localnet),
"betanet" => Ok(ChainId::Betanet),
_ => Err(anyhow::anyhow!("Invalid chain id")),
}
}
Expand Down Expand Up @@ -210,6 +225,33 @@ impl Default for CommonGeneralStateIndexerConfig {
}
}

#[derive(Deserialize, Debug, Clone)]
pub struct CommonGeneralNearStateIndexerConfig {
#[serde(deserialize_with = "deserialize_optional_data_or_env", default)]
pub metrics_server_port: Option<u16>,
#[serde(deserialize_with = "deserialize_optional_data_or_env", default)]
pub concurrency: Option<usize>,
}

impl CommonGeneralNearStateIndexerConfig {
pub fn default_metrics_server_port() -> u16 {
8082
}

pub fn default_concurrency() -> usize {
1
}
}

impl Default for CommonGeneralNearStateIndexerConfig {
fn default() -> Self {
Self {
metrics_server_port: Some(Self::default_metrics_server_port()),
concurrency: Some(Self::default_concurrency()),
}
}
}

#[derive(Deserialize, Debug, Clone)]
pub struct CommonGeneralEpochIndexerConfig {
#[serde(deserialize_with = "deserialize_optional_data_or_env", default)]
Expand All @@ -234,7 +276,7 @@ impl From<CommonGeneralConfig> for GeneralRpcServerConfig {
fn from(common_config: CommonGeneralConfig) -> Self {
Self {
chain_id: common_config.chain_id,
near_rpc_url: common_config.near_rpc_url,
near_rpc_url: required_value_or_panic("near_rpc_url", common_config.near_rpc_url),
near_archival_rpc_url: common_config.near_archival_rpc_url,
referer_header_value: common_config
.rpc_server
Expand Down Expand Up @@ -269,7 +311,7 @@ impl From<CommonGeneralConfig> for GeneralTxIndexerConfig {
fn from(common_config: CommonGeneralConfig) -> Self {
Self {
chain_id: common_config.chain_id,
near_rpc_url: common_config.near_rpc_url,
near_rpc_url: required_value_or_panic("near_rpc_url", common_config.near_rpc_url),
near_archival_rpc_url: common_config.near_archival_rpc_url,
indexer_id: common_config
.tx_indexer
Expand All @@ -291,7 +333,7 @@ impl From<CommonGeneralConfig> for GeneralStateIndexerConfig {
fn from(common_config: CommonGeneralConfig) -> Self {
Self {
chain_id: common_config.chain_id,
near_rpc_url: common_config.near_rpc_url,
near_rpc_url: required_value_or_panic("near_rpc_url", common_config.near_rpc_url),
near_archival_rpc_url: common_config.near_archival_rpc_url,
indexer_id: common_config
.state_indexer
Expand All @@ -309,11 +351,27 @@ impl From<CommonGeneralConfig> for GeneralStateIndexerConfig {
}
}

impl From<CommonGeneralConfig> for GeneralNearStateIndexerConfig {
fn from(common_config: CommonGeneralConfig) -> Self {
Self {
chain_id: common_config.chain_id,
metrics_server_port: common_config
.near_state_indexer
.metrics_server_port
.unwrap_or_else(CommonGeneralNearStateIndexerConfig::default_metrics_server_port),
concurrency: common_config
.near_state_indexer
.concurrency
.unwrap_or_else(CommonGeneralNearStateIndexerConfig::default_concurrency),
}
}
}

impl From<CommonGeneralConfig> for GeneralEpochIndexerConfig {
fn from(common_config: CommonGeneralConfig) -> Self {
Self {
chain_id: common_config.chain_id,
near_rpc_url: common_config.near_rpc_url,
near_rpc_url: required_value_or_panic("near_rpc_url", common_config.near_rpc_url),
near_archival_rpc_url: common_config.near_archival_rpc_url,
indexer_id: common_config
.epoch_indexer
Expand Down
43 changes: 37 additions & 6 deletions configuration/src/configs/lake.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,13 @@
use near_lake_framework::near_indexer_primitives::near_primitives;
use serde_derive::Deserialize;

use crate::configs::deserialize_data_or_env;
use crate::configs::{deserialize_optional_data_or_env, required_value_or_panic};

#[derive(Deserialize, Debug, Clone, Default)]
#[derive(Debug, Clone)]
pub struct LakeConfig {
#[serde(deserialize_with = "deserialize_data_or_env")]
pub aws_access_key_id: String,
#[serde(deserialize_with = "deserialize_data_or_env")]
pub aws_secret_access_key: String,
#[serde(deserialize_with = "deserialize_data_or_env")]
pub aws_default_region: String,
#[serde(deserialize_with = "deserialize_data_or_env")]
pub aws_bucket_name: String,
}

Expand Down Expand Up @@ -53,3 +49,38 @@ impl LakeConfig {
))
}
}

#[derive(Deserialize, Debug, Clone, Default)]
pub struct CommonLakeConfig {
#[serde(deserialize_with = "deserialize_optional_data_or_env", default)]
pub aws_access_key_id: Option<String>,
#[serde(deserialize_with = "deserialize_optional_data_or_env", default)]
pub aws_secret_access_key: Option<String>,
#[serde(deserialize_with = "deserialize_optional_data_or_env", default)]
pub aws_default_region: Option<String>,
#[serde(deserialize_with = "deserialize_optional_data_or_env", default)]
pub aws_bucket_name: Option<String>,
}

impl From<CommonLakeConfig> for LakeConfig {
fn from(common_config: CommonLakeConfig) -> Self {
Self {
aws_access_key_id: required_value_or_panic(
"aws_access_key_id",
common_config.aws_access_key_id,
),
aws_secret_access_key: required_value_or_panic(
"aws_secret_access_key",
common_config.aws_secret_access_key,
),
aws_default_region: required_value_or_panic(
"aws_default_region",
common_config.aws_default_region,
),
aws_bucket_name: required_value_or_panic(
"aws_bucket_name",
common_config.aws_bucket_name,
),
}
}
}
41 changes: 36 additions & 5 deletions configuration/src/configs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,14 @@ lazy_static::lazy_static! {
static ref RE_NAME_ENV: regex::Regex = regex::Regex::new(r"\$\{(?<env_name>\w+)}").unwrap();
}

fn required_value_or_panic<T>(config_name: &str, value: Option<T>) -> T {
if let Some(value) = value {
value
} else {
panic!("Config `{}` is required!", config_name)
}
}

fn get_env_var<T>(env_var_name: &str) -> anyhow::Result<T>
where
T: FromStr,
Expand Down Expand Up @@ -70,7 +78,7 @@ pub struct CommonConfig {
pub general: general::CommonGeneralConfig,
#[serde(default)]
pub rightsizing: rightsizing::CommonRightsizingConfig,
pub lake_config: lake::LakeConfig,
pub lake_config: lake::CommonLakeConfig,
pub database: database::CommonDatabaseConfig,
}

Expand All @@ -89,7 +97,7 @@ impl Config for RpcServerConfig {
fn from_common_config(common_config: CommonConfig) -> Self {
Self {
general: common_config.general.into(),
lake_config: common_config.lake_config,
lake_config: common_config.lake_config.into(),
database: database::DatabaseRpcServerConfig::from(common_config.database).into(),
}
}
Expand Down Expand Up @@ -117,7 +125,7 @@ impl Config for TxIndexerConfig {
Self {
general: common_config.general.into(),
rightsizing: common_config.rightsizing.into(),
lake_config: common_config.lake_config,
lake_config: common_config.lake_config.into(),
database: database::DatabaseTxIndexerConfig::from(common_config.database).into(),
}
}
Expand All @@ -142,7 +150,30 @@ impl Config for StateIndexerConfig {
Self {
general: common_config.general.into(),
rightsizing: common_config.rightsizing.into(),
lake_config: common_config.lake_config,
lake_config: common_config.lake_config.into(),
database: database::DatabaseStateIndexerConfig::from(common_config.database).into(),
}
}
}

#[derive(Debug, Clone)]
pub struct NearStateIndexerConfig {
pub general: general::GeneralNearStateIndexerConfig,
pub rightsizing: rightsizing::RightsizingConfig,
pub database: database::DatabaseConfig,
}

impl NearStateIndexerConfig {
pub fn state_should_be_indexed(&self, state_change_value: &StateChangeValueView) -> bool {
self.rightsizing.state_should_be_indexed(state_change_value)
}
}

impl Config for NearStateIndexerConfig {
fn from_common_config(common_config: CommonConfig) -> Self {
Self {
general: common_config.general.into(),
rightsizing: common_config.rightsizing.into(),
database: database::DatabaseStateIndexerConfig::from(common_config.database).into(),
}
}
Expand All @@ -159,7 +190,7 @@ impl Config for EpochIndexerConfig {
fn from_common_config(common_config: CommonConfig) -> Self {
Self {
general: common_config.general.into(),
lake_config: common_config.lake_config,
lake_config: common_config.lake_config.into(),
database: database::DatabaseStateIndexerConfig::from(common_config.database).into(),
}
}
Expand Down
3 changes: 2 additions & 1 deletion configuration/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ mod configs;
pub use crate::configs::database::DatabaseConfig;
pub use crate::configs::general::ChainId;
pub use crate::configs::{
EpochIndexerConfig, RpcServerConfig, StateIndexerConfig, TxIndexerConfig,
EpochIndexerConfig, NearStateIndexerConfig, RpcServerConfig, StateIndexerConfig,
TxIndexerConfig,
};

pub async fn read_configuration<T>() -> anyhow::Result<T>
Expand Down
Loading

0 comments on commit 1d01888

Please sign in to comment.