Skip to content

Commit

Permalink
inevents
Browse files Browse the repository at this point in the history
  • Loading branch information
Sliman4 committed Jun 17, 2024
1 parent 95fe9f0 commit bc51f0d
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 62 deletions.
10 changes: 4 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,14 @@ edition = "2021"
license = "MIT OR Apache-2.0"

[dependencies]
inindexer = { git = "https://github.com/INTEARnear/inindexer.git", features = [ "fastnear-data-server" ] }
inindexer = { git = "https://github.com/INTEARnear/inindexer.git", features = [ "neardata-server" ] }
async-trait = "0.1.80"
tokio = { version = "1.37.0", features = ["macros", "rt-multi-thread"] }
log = "0.4.21"
simple_logger = "5.0.0"
redis = { version = "0.25.3", features = [ "tokio-rustls-comp", "connection-manager" ], optional = true }
serde = { version = "1.0.199", features = [ "derive" ] }
serde_json = "1.0.116"
dotenv = "0.15.0"

[features]
redis-handler = [ "redis" ]
default = [ "redis-handler" ]
redis = { version = "0.25.3", features = [ "tokio-rustls-comp", "connection-manager" ] }
inevents-redis = { git = "https://github.com/INTEARnear/inevents" }
intear-events = { git = "https://github.com/INTEARnear/inevents" }
1 change: 0 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
#[cfg(feature = "redis-handler")]
pub mod redis_handler;

use std::collections::HashMap;
Expand Down
6 changes: 3 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
#[cfg(test)]
mod tests;

use inindexer::fastnear_data_server::FastNearDataServerProvider;
use inindexer::neardata_server::NeardataServerProvider;
use inindexer::{
run_indexer, AutoContinue, BlockIterator, IndexerOptions, PreprocessTransactionsSettings,
};
Expand All @@ -24,11 +24,11 @@ async fn main() {
.unwrap();
let connection = ConnectionManager::new(client).await.unwrap();

let mut indexer = nft_indexer::NftIndexer(PushToRedisStream::new(connection, 10_000));
let mut indexer = nft_indexer::NftIndexer(PushToRedisStream::new(connection, 10_000).await);

run_indexer(
&mut indexer,
FastNearDataServerProvider::mainnet(),
NeardataServerProvider::mainnet(),
IndexerOptions {
range: if std::env::args().len() > 1 {
// For debugging
Expand Down
108 changes: 62 additions & 46 deletions src/redis_handler.rs
Original file line number Diff line number Diff line change
@@ -1,78 +1,94 @@
use async_trait::async_trait;
use redis::{streams::StreamMaxlen, AsyncCommands};
use inevents_redis::RedisEventStream;
use intear_events::events::nft::{
nft_burn::NftBurnEventData, nft_mint::NftMintEventData, nft_transfer::NftTransferEventData,
};
use redis::aio::ConnectionManager;

use crate::{
EventContext, ExtendedNftBurnEvent, ExtendedNftMintEvent, ExtendedNftTransferEvent,
NftEventHandler,
};

pub struct PushToRedisStream<C: AsyncCommands + Sync> {
connection: C,
pub struct PushToRedisStream {
mint_stream: RedisEventStream<NftMintEventData>,
transfer_stream: RedisEventStream<NftTransferEventData>,
burn_stream: RedisEventStream<NftBurnEventData>,
max_stream_size: usize,
}

impl<C: AsyncCommands + Sync> PushToRedisStream<C> {
pub fn new(connection: C, max_stream_size: usize) -> Self {
impl PushToRedisStream {
pub async fn new(connection: ConnectionManager, max_stream_size: usize) -> Self {
Self {
connection,
mint_stream: RedisEventStream::new(connection.clone(), "nft_mint").await,
transfer_stream: RedisEventStream::new(connection.clone(), "nft_transfer").await,
burn_stream: RedisEventStream::new(connection.clone(), "nft_burn").await,
max_stream_size,
}
}
}

#[async_trait]
impl<C: AsyncCommands + Sync> NftEventHandler for PushToRedisStream<C> {
impl NftEventHandler for PushToRedisStream {
async fn handle_mint(&mut self, mint: ExtendedNftMintEvent, context: EventContext) {
let response: String = self
.connection
.xadd_maxlen(
"nft_mint",
StreamMaxlen::Approx(self.max_stream_size),
format!("{}-*", context.block_height),
&[
("mint", serde_json::to_string(&mint).unwrap().as_str()),
("context", serde_json::to_string(&context).unwrap().as_str()),
],
self.mint_stream
.emit_event(
context.block_height,
NftMintEventData {
owner_id: mint.event.owner_id,
token_ids: mint.event.token_ids,
memo: mint.event.memo,
transaction_id: context.transaction_id,
receipt_id: context.receipt_id,
block_height: context.block_height,
block_timestamp_nanosec: context.block_timestamp_nanosec,
contract_id: context.contract_id,
},
self.max_stream_size,
)
.await
.unwrap();
log::debug!("Adding to stream: {response}");
.expect("Failed to emit mint event");
}

async fn handle_transfer(&mut self, transfer: ExtendedNftTransferEvent, context: EventContext) {
let response: String = self
.connection
.xadd_maxlen(
"nft_transfer",
StreamMaxlen::Approx(self.max_stream_size),
format!("{}-*", context.block_height),
&[
(
"transfer",
serde_json::to_string(&transfer).unwrap().as_str(),
),
("context", serde_json::to_string(&context).unwrap().as_str()),
],
self.transfer_stream
.emit_event(
context.block_height,
NftTransferEventData {
old_owner_id: transfer.event.old_owner_id,
new_owner_id: transfer.event.new_owner_id,
token_ids: transfer.event.token_ids,
memo: transfer.event.memo,
token_prices_near: transfer.trade.token_prices_near,
transaction_id: context.transaction_id,
receipt_id: context.receipt_id,
block_height: context.block_height,
block_timestamp_nanosec: context.block_timestamp_nanosec,
contract_id: context.contract_id,
},
self.max_stream_size,
)
.await
.unwrap();
log::debug!("Adding to stream: {response}");
.expect("Failed to emit transfer event");
}

async fn handle_burn(&mut self, burn: ExtendedNftBurnEvent, context: EventContext) {
let response: String = self
.connection
.xadd_maxlen(
"nft_burn",
StreamMaxlen::Approx(self.max_stream_size),
format!("{}-*", context.block_height),
&[
("burn", serde_json::to_string(&burn).unwrap().as_str()),
("context", serde_json::to_string(&context).unwrap().as_str()),
],
self.burn_stream
.emit_event(
context.block_height,
NftBurnEventData {
owner_id: burn.event.owner_id,
token_ids: burn.event.token_ids,
memo: burn.event.memo,
transaction_id: context.transaction_id,
receipt_id: context.receipt_id,
block_height: context.block_height,
block_timestamp_nanosec: context.block_timestamp_nanosec,
contract_id: context.contract_id,
},
self.max_stream_size,
)
.await
.unwrap();
log::debug!("Adding to stream: {response}");
.expect("Failed to emit burn event");
}
}
12 changes: 6 additions & 6 deletions src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@ use std::collections::HashMap;

use async_trait::async_trait;
use inindexer::{
fastnear_data_server::FastNearDataServerProvider,
near_indexer_primitives::types::AccountId,
near_utils::{NftBurnEvent, NftMintEvent, NftTransferEvent},
neardata_server::NeardataServerProvider,
run_indexer, BlockIterator, IndexerOptions, PreprocessTransactionsSettings,
};

Expand Down Expand Up @@ -46,7 +46,7 @@ async fn detects_mints() {

run_indexer(
&mut indexer,
FastNearDataServerProvider::mainnet(),
NeardataServerProvider::mainnet(),
IndexerOptions {
range: BlockIterator::iterator(117_189_143..=117_189_146),
preprocess_transactions: Some(PreprocessTransactionsSettings {
Expand Down Expand Up @@ -122,7 +122,7 @@ async fn detects_transfers() {

run_indexer(
&mut indexer,
FastNearDataServerProvider::mainnet(),
NeardataServerProvider::mainnet(),
IndexerOptions {
range: BlockIterator::iterator(117_487_093..=117_487_095),
preprocess_transactions: Some(PreprocessTransactionsSettings {
Expand Down Expand Up @@ -203,7 +203,7 @@ async fn detects_burns() {

run_indexer(
&mut indexer,
FastNearDataServerProvider::mainnet(),
NeardataServerProvider::mainnet(),
IndexerOptions {
range: BlockIterator::iterator(117_752_571..=117_752_573),
preprocess_transactions: Some(PreprocessTransactionsSettings {
Expand Down Expand Up @@ -280,7 +280,7 @@ async fn detects_paras_trade() {

run_indexer(
&mut indexer,
FastNearDataServerProvider::mainnet(),
NeardataServerProvider::mainnet(),
IndexerOptions {
range: BlockIterator::iterator(117_998_763..=117_998_773),
preprocess_transactions: Some(PreprocessTransactionsSettings {
Expand Down Expand Up @@ -364,7 +364,7 @@ async fn detects_mintbase_trade() {

run_indexer(
&mut indexer,
FastNearDataServerProvider::mainnet(),
NeardataServerProvider::mainnet(),
IndexerOptions {
range: BlockIterator::iterator(116_934_524..=116_934_529),
preprocess_transactions: Some(PreprocessTransactionsSettings {
Expand Down

0 comments on commit bc51f0d

Please sign in to comment.