diff --git a/src/main.rs b/src/main.rs index 4392227..eac8b0b 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,3 +1,4 @@ +mod redis_handler; #[cfg(test)] mod tests; @@ -14,9 +15,7 @@ use inindexer::{ run_indexer, AutoContinue, BlockIterator, CompleteTransaction, Indexer, IndexerOptions, PreprocessTransactionsSettings, }; -use redis::aio::MultiplexedConnection; -use redis::streams::StreamMaxlen; -use redis::AsyncCommands; +use redis_handler::PushToRedisStream; #[async_trait] trait NftEventHandler: Send + Sync { @@ -25,9 +24,7 @@ trait NftEventHandler: Send + Sync { async fn handle_burn(&mut self, burn: NftBurnEvent, context: EventContext); } -struct NftIndexer { - handler: T, -} +struct NftIndexer(T); #[async_trait] impl Indexer for NftIndexer { @@ -61,7 +58,7 @@ impl Indexer for NftIndexer { if mint_log.validate() { log::debug!("Mint log: {mint_log:?}"); for mint in mint_log.data.0 { - self.handler.handle_mint(mint, get_context_lazy()).await; + self.0.handle_mint(mint, get_context_lazy()).await; } } } @@ -69,9 +66,7 @@ impl Indexer for NftIndexer { if transfer_log.validate() { log::debug!("Transfer log: {transfer_log:?}"); for transfer in transfer_log.data.0 { - self.handler - .handle_transfer(transfer, get_context_lazy()) - .await; + self.0.handle_transfer(transfer, get_context_lazy()).await; } } } @@ -79,7 +74,7 @@ impl Indexer for NftIndexer { if burn_log.validate() { log::debug!("Burn log: {burn_log:?}"); for burn in burn_log.data.0 { - self.handler.handle_burn(burn, get_context_lazy()).await; + self.0.handle_burn(burn, get_context_lazy()).await; } } } @@ -90,81 +85,6 @@ impl Indexer for NftIndexer { } } -struct PushToRedisStream { - connection: MultiplexedConnection, - max_blocks: usize, -} - -#[async_trait] -impl NftEventHandler for PushToRedisStream { - async fn handle_mint(&mut self, mint: NftMintEvent, context: EventContext) { - let response: String = self - .connection - .xadd_maxlen( - "nft_mint", - StreamMaxlen::Approx(self.max_blocks), - &format!("{}-*", context.block_height), - &[ - ("owner_id", mint.owner_id.as_str()), - ("token_ids", mint.token_ids.join(",").as_str()), - ("memo", mint.memo.as_deref().unwrap_or("")), - ("txid", context.txid.to_string().as_str()), - ("block_height", context.block_height.to_string().as_str()), - ("sender_id", context.sender_id.as_str()), - ("contract_id", context.contract_id.as_str()), - ], - ) - .await - .unwrap(); - log::debug!("Adding to stream: {response}"); - } - - async fn handle_transfer(&mut self, transfer: NftTransferEvent, context: EventContext) { - let response: String = self - .connection - .xadd_maxlen( - "nft_transfer", - StreamMaxlen::Approx(self.max_blocks), - &format!("{}-*", context.block_height), - &[ - ("old_owner_id", transfer.old_owner_id.as_str()), - ("new_owner_id", transfer.new_owner_id.as_str()), - ("token_ids", transfer.token_ids.join(",").as_str()), - ("memo", transfer.memo.as_deref().unwrap_or("")), - ("txid", context.txid.to_string().as_str()), - ("block_height", context.block_height.to_string().as_str()), - ("sender_id", context.sender_id.as_str()), - ("contract_id", context.contract_id.as_str()), - ], - ) - .await - .unwrap(); - log::debug!("Adding to stream: {response}"); - } - - async fn handle_burn(&mut self, burn: NftBurnEvent, context: EventContext) { - let response: String = self - .connection - .xadd_maxlen( - "nft_burn", - StreamMaxlen::Approx(self.max_blocks), - &format!("{}-*", context.block_height), - &[ - ("owner_id", burn.owner_id.as_str()), - ("token_ids", burn.token_ids.join(",").as_str()), - ("memo", burn.memo.as_deref().unwrap_or("")), - ("txid", context.txid.to_string().as_str()), - ("block_height", context.block_height.to_string().as_str()), - ("sender_id", context.sender_id.as_str()), - ("contract_id", context.contract_id.as_str()), - ], - ) - .await - .unwrap(); - log::debug!("Adding to stream: {response}"); - } -} - #[derive(Clone, Debug, PartialEq)] struct EventContext { pub txid: CryptoHash, @@ -187,12 +107,7 @@ async fn main() { .unwrap(); let connection = client.get_multiplexed_tokio_connection().await.unwrap(); - let mut indexer = NftIndexer { - handler: PushToRedisStream { - connection, - max_blocks: 10_000, - }, - }; + let mut indexer = NftIndexer(PushToRedisStream::new(connection, 10_000)); run_indexer( &mut indexer, diff --git a/src/redis_handler.rs b/src/redis_handler.rs new file mode 100644 index 0000000..0d96b16 --- /dev/null +++ b/src/redis_handler.rs @@ -0,0 +1,89 @@ +use async_trait::async_trait; +use inindexer::indexer_utils::{NftBurnEvent, NftMintEvent, NftTransferEvent}; +use redis::{aio::MultiplexedConnection, streams::StreamMaxlen, AsyncCommands}; + +use crate::{EventContext, NftEventHandler}; + +pub struct PushToRedisStream { + connection: MultiplexedConnection, + max_blocks: usize, +} + +impl PushToRedisStream { + pub fn new(connection: MultiplexedConnection, max_blocks: usize) -> Self { + Self { + connection, + max_blocks, + } + } +} + +#[async_trait] +impl NftEventHandler for PushToRedisStream { + async fn handle_mint(&mut self, mint: NftMintEvent, context: EventContext) { + let response: String = self + .connection + .xadd_maxlen( + "nft_mint", + StreamMaxlen::Approx(self.max_blocks), + &format!("{}-*", context.block_height), + &[ + ("owner_id", mint.owner_id.as_str()), + ("token_ids", mint.token_ids.join(",").as_str()), + ("memo", mint.memo.as_deref().unwrap_or("")), + ("txid", context.txid.to_string().as_str()), + ("block_height", context.block_height.to_string().as_str()), + ("sender_id", context.sender_id.as_str()), + ("contract_id", context.contract_id.as_str()), + ], + ) + .await + .unwrap(); + log::debug!("Adding to stream: {response}"); + } + + async fn handle_transfer(&mut self, transfer: NftTransferEvent, context: EventContext) { + let response: String = self + .connection + .xadd_maxlen( + "nft_transfer", + StreamMaxlen::Approx(self.max_blocks), + &format!("{}-*", context.block_height), + &[ + ("old_owner_id", transfer.old_owner_id.as_str()), + ("new_owner_id", transfer.new_owner_id.as_str()), + ("token_ids", transfer.token_ids.join(",").as_str()), + ("memo", transfer.memo.as_deref().unwrap_or("")), + ("txid", context.txid.to_string().as_str()), + ("block_height", context.block_height.to_string().as_str()), + ("sender_id", context.sender_id.as_str()), + ("contract_id", context.contract_id.as_str()), + ], + ) + .await + .unwrap(); + log::debug!("Adding to stream: {response}"); + } + + async fn handle_burn(&mut self, burn: NftBurnEvent, context: EventContext) { + let response: String = self + .connection + .xadd_maxlen( + "nft_burn", + StreamMaxlen::Approx(self.max_blocks), + &format!("{}-*", context.block_height), + &[ + ("owner_id", burn.owner_id.as_str()), + ("token_ids", burn.token_ids.join(",").as_str()), + ("memo", burn.memo.as_deref().unwrap_or("")), + ("txid", context.txid.to_string().as_str()), + ("block_height", context.block_height.to_string().as_str()), + ("sender_id", context.sender_id.as_str()), + ("contract_id", context.contract_id.as_str()), + ], + ) + .await + .unwrap(); + log::debug!("Adding to stream: {response}"); + } +} diff --git a/src/tests.rs b/src/tests.rs index 359fe09..7479e3c 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -34,7 +34,7 @@ async fn detects_mints() { mint_events: HashMap::new(), }; - let mut indexer = NftIndexer { handler }; + let mut indexer = NftIndexer(handler); run_indexer( &mut indexer, @@ -53,7 +53,7 @@ async fn detects_mints() { assert_eq!( *indexer - .handler + .0 .mint_events .get(&"minter1.sharddog.near".parse::().unwrap()) .unwrap(), @@ -100,7 +100,7 @@ async fn detects_transfers() { transfer_events: HashMap::new(), }; - let mut indexer = NftIndexer { handler }; + let mut indexer = NftIndexer(handler); run_indexer( &mut indexer, @@ -119,7 +119,7 @@ async fn detects_transfers() { assert_eq!( *indexer - .handler + .0 .transfer_events .get(&"slimegirl.near".parse::().unwrap()) .unwrap(), @@ -167,7 +167,7 @@ async fn detects_burns() { burn_events: HashMap::new(), }; - let mut indexer = NftIndexer { handler }; + let mut indexer = NftIndexer(handler); run_indexer( &mut indexer, @@ -186,7 +186,7 @@ async fn detects_burns() { assert_eq!( *indexer - .handler + .0 .burn_events .get(&"bonehedz.near".parse::().unwrap()) .unwrap(),