Skip to content

Commit

Permalink
Move redis handler to a separate module
Browse files Browse the repository at this point in the history
  • Loading branch information
Sliman4 committed Apr 29, 2024
1 parent 89bd332 commit bccd93f
Show file tree
Hide file tree
Showing 3 changed files with 102 additions and 98 deletions.
99 changes: 7 additions & 92 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
mod redis_handler;
#[cfg(test)]
mod tests;

Expand All @@ -14,9 +15,7 @@ use inindexer::{
run_indexer, AutoContinue, BlockIterator, CompleteTransaction, Indexer, IndexerOptions,
PreprocessTransactionsSettings,
};
use redis::aio::MultiplexedConnection;
use redis::streams::StreamMaxlen;
use redis::AsyncCommands;
use redis_handler::PushToRedisStream;

#[async_trait]
trait NftEventHandler: Send + Sync {
Expand All @@ -25,9 +24,7 @@ trait NftEventHandler: Send + Sync {
async fn handle_burn(&mut self, burn: NftBurnEvent, context: EventContext);
}

struct NftIndexer<T: NftEventHandler + Send + Sync + 'static> {
handler: T,
}
struct NftIndexer<T: NftEventHandler + Send + Sync + 'static>(T);

#[async_trait]
impl<T: NftEventHandler + Send + Sync + 'static> Indexer for NftIndexer<T> {
Expand Down Expand Up @@ -61,25 +58,23 @@ impl<T: NftEventHandler + Send + Sync + 'static> Indexer for NftIndexer<T> {
if mint_log.validate() {
log::debug!("Mint log: {mint_log:?}");
for mint in mint_log.data.0 {
self.handler.handle_mint(mint, get_context_lazy()).await;
self.0.handle_mint(mint, get_context_lazy()).await;
}
}
}
if let Ok(transfer_log) = EventLogData::<NftTransferLog>::deserialize(log) {
if transfer_log.validate() {
log::debug!("Transfer log: {transfer_log:?}");
for transfer in transfer_log.data.0 {
self.handler
.handle_transfer(transfer, get_context_lazy())
.await;
self.0.handle_transfer(transfer, get_context_lazy()).await;
}
}
}
if let Ok(burn_log) = EventLogData::<NftBurnLog>::deserialize(log) {
if burn_log.validate() {
log::debug!("Burn log: {burn_log:?}");
for burn in burn_log.data.0 {
self.handler.handle_burn(burn, get_context_lazy()).await;
self.0.handle_burn(burn, get_context_lazy()).await;
}
}
}
Expand All @@ -90,81 +85,6 @@ impl<T: NftEventHandler + Send + Sync + 'static> Indexer for NftIndexer<T> {
}
}

struct PushToRedisStream {
connection: MultiplexedConnection,
max_blocks: usize,
}

#[async_trait]
impl NftEventHandler for PushToRedisStream {
async fn handle_mint(&mut self, mint: NftMintEvent, context: EventContext) {
let response: String = self
.connection
.xadd_maxlen(
"nft_mint",
StreamMaxlen::Approx(self.max_blocks),
&format!("{}-*", context.block_height),
&[
("owner_id", mint.owner_id.as_str()),
("token_ids", mint.token_ids.join(",").as_str()),
("memo", mint.memo.as_deref().unwrap_or("")),
("txid", context.txid.to_string().as_str()),
("block_height", context.block_height.to_string().as_str()),
("sender_id", context.sender_id.as_str()),
("contract_id", context.contract_id.as_str()),
],
)
.await
.unwrap();
log::debug!("Adding to stream: {response}");
}

async fn handle_transfer(&mut self, transfer: NftTransferEvent, context: EventContext) {
let response: String = self
.connection
.xadd_maxlen(
"nft_transfer",
StreamMaxlen::Approx(self.max_blocks),
&format!("{}-*", context.block_height),
&[
("old_owner_id", transfer.old_owner_id.as_str()),
("new_owner_id", transfer.new_owner_id.as_str()),
("token_ids", transfer.token_ids.join(",").as_str()),
("memo", transfer.memo.as_deref().unwrap_or("")),
("txid", context.txid.to_string().as_str()),
("block_height", context.block_height.to_string().as_str()),
("sender_id", context.sender_id.as_str()),
("contract_id", context.contract_id.as_str()),
],
)
.await
.unwrap();
log::debug!("Adding to stream: {response}");
}

async fn handle_burn(&mut self, burn: NftBurnEvent, context: EventContext) {
let response: String = self
.connection
.xadd_maxlen(
"nft_burn",
StreamMaxlen::Approx(self.max_blocks),
&format!("{}-*", context.block_height),
&[
("owner_id", burn.owner_id.as_str()),
("token_ids", burn.token_ids.join(",").as_str()),
("memo", burn.memo.as_deref().unwrap_or("")),
("txid", context.txid.to_string().as_str()),
("block_height", context.block_height.to_string().as_str()),
("sender_id", context.sender_id.as_str()),
("contract_id", context.contract_id.as_str()),
],
)
.await
.unwrap();
log::debug!("Adding to stream: {response}");
}
}

#[derive(Clone, Debug, PartialEq)]
struct EventContext {
pub txid: CryptoHash,
Expand All @@ -187,12 +107,7 @@ async fn main() {
.unwrap();
let connection = client.get_multiplexed_tokio_connection().await.unwrap();

let mut indexer = NftIndexer {
handler: PushToRedisStream {
connection,
max_blocks: 10_000,
},
};
let mut indexer = NftIndexer(PushToRedisStream::new(connection, 10_000));

run_indexer(
&mut indexer,
Expand Down
89 changes: 89 additions & 0 deletions src/redis_handler.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
use async_trait::async_trait;
use inindexer::indexer_utils::{NftBurnEvent, NftMintEvent, NftTransferEvent};
use redis::{aio::MultiplexedConnection, streams::StreamMaxlen, AsyncCommands};

use crate::{EventContext, NftEventHandler};

pub struct PushToRedisStream {
connection: MultiplexedConnection,
max_blocks: usize,
}

impl PushToRedisStream {
pub fn new(connection: MultiplexedConnection, max_blocks: usize) -> Self {
Self {
connection,
max_blocks,
}
}
}

#[async_trait]
impl NftEventHandler for PushToRedisStream {
async fn handle_mint(&mut self, mint: NftMintEvent, context: EventContext) {
let response: String = self
.connection
.xadd_maxlen(
"nft_mint",
StreamMaxlen::Approx(self.max_blocks),
&format!("{}-*", context.block_height),
&[
("owner_id", mint.owner_id.as_str()),
("token_ids", mint.token_ids.join(",").as_str()),
("memo", mint.memo.as_deref().unwrap_or("")),
("txid", context.txid.to_string().as_str()),
("block_height", context.block_height.to_string().as_str()),
("sender_id", context.sender_id.as_str()),
("contract_id", context.contract_id.as_str()),
],
)
.await
.unwrap();
log::debug!("Adding to stream: {response}");
}

async fn handle_transfer(&mut self, transfer: NftTransferEvent, context: EventContext) {
let response: String = self
.connection
.xadd_maxlen(
"nft_transfer",
StreamMaxlen::Approx(self.max_blocks),
&format!("{}-*", context.block_height),
&[
("old_owner_id", transfer.old_owner_id.as_str()),
("new_owner_id", transfer.new_owner_id.as_str()),
("token_ids", transfer.token_ids.join(",").as_str()),
("memo", transfer.memo.as_deref().unwrap_or("")),
("txid", context.txid.to_string().as_str()),
("block_height", context.block_height.to_string().as_str()),
("sender_id", context.sender_id.as_str()),
("contract_id", context.contract_id.as_str()),
],
)
.await
.unwrap();
log::debug!("Adding to stream: {response}");
}

async fn handle_burn(&mut self, burn: NftBurnEvent, context: EventContext) {
let response: String = self
.connection
.xadd_maxlen(
"nft_burn",
StreamMaxlen::Approx(self.max_blocks),
&format!("{}-*", context.block_height),
&[
("owner_id", burn.owner_id.as_str()),
("token_ids", burn.token_ids.join(",").as_str()),
("memo", burn.memo.as_deref().unwrap_or("")),
("txid", context.txid.to_string().as_str()),
("block_height", context.block_height.to_string().as_str()),
("sender_id", context.sender_id.as_str()),
("contract_id", context.contract_id.as_str()),
],
)
.await
.unwrap();
log::debug!("Adding to stream: {response}");
}
}
12 changes: 6 additions & 6 deletions src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ async fn detects_mints() {
mint_events: HashMap::new(),
};

let mut indexer = NftIndexer { handler };
let mut indexer = NftIndexer(handler);

run_indexer(
&mut indexer,
Expand All @@ -53,7 +53,7 @@ async fn detects_mints() {

assert_eq!(
*indexer
.handler
.0
.mint_events
.get(&"minter1.sharddog.near".parse::<AccountId>().unwrap())
.unwrap(),
Expand Down Expand Up @@ -100,7 +100,7 @@ async fn detects_transfers() {
transfer_events: HashMap::new(),
};

let mut indexer = NftIndexer { handler };
let mut indexer = NftIndexer(handler);

run_indexer(
&mut indexer,
Expand All @@ -119,7 +119,7 @@ async fn detects_transfers() {

assert_eq!(
*indexer
.handler
.0
.transfer_events
.get(&"slimegirl.near".parse::<AccountId>().unwrap())
.unwrap(),
Expand Down Expand Up @@ -167,7 +167,7 @@ async fn detects_burns() {
burn_events: HashMap::new(),
};

let mut indexer = NftIndexer { handler };
let mut indexer = NftIndexer(handler);

run_indexer(
&mut indexer,
Expand All @@ -186,7 +186,7 @@ async fn detects_burns() {

assert_eq!(
*indexer
.handler
.0
.burn_events
.get(&"bonehedz.near".parse::<AccountId>().unwrap())
.unwrap(),
Expand Down

0 comments on commit bccd93f

Please sign in to comment.