Skip to content

Commit

Permalink
Refactor in accordance with other intear indexers
Browse files Browse the repository at this point in the history
Changed redis key to block-*, and bin+lib crate
  • Loading branch information
Sliman4 committed May 4, 2024
1 parent 80326df commit 819b8ea
Show file tree
Hide file tree
Showing 5 changed files with 216 additions and 214 deletions.
6 changes: 5 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@ async-trait = "0.1.80"
tokio = { version = "1.37.0", features = ["macros", "rt-multi-thread"] }
log = "0.4.21"
simple_logger = "5.0.0"
redis = { version = "0.25.3", features = [ "tokio-rustls-comp", "connection-manager" ] }
redis = { version = "0.25.3", features = [ "tokio-rustls-comp", "connection-manager" ], optional = true }
serde = { version = "1.0.199", features = [ "derive" ] }
serde_json = "1.0.116"
dotenv = "0.15.0"

[features]
redis-handler = [ "redis" ]
default = [ "redis-handler" ]
175 changes: 175 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
#[cfg(feature = "redis-handler")]
pub mod redis_handler;

use std::collections::HashMap;

use async_trait::async_trait;
use inindexer::near_indexer_primitives::types::{AccountId, Balance};
use inindexer::near_indexer_primitives::views::{ActionView, ExecutionStatusView, ReceiptEnumView};
use inindexer::near_indexer_primitives::CryptoHash;
use inindexer::near_indexer_primitives::StreamerMessage;
use inindexer::near_utils::{
dec_format, dec_format_map, dec_format_vec, EventLogData, NftBurnEvent, NftBurnLog,
NftMintEvent, NftMintLog, NftTransferEvent, NftTransferLog,
};
use inindexer::{IncompleteTransaction, Indexer, TransactionReceipt};
use serde::{Deserialize, Serialize};

#[async_trait]
pub trait NftEventHandler: Send + Sync {
async fn handle_mint(&mut self, mint: NftMintEvent, context: EventContext);
async fn handle_transfer(&mut self, transfer: ExtendedNftTransferEvent, context: EventContext);
async fn handle_burn(&mut self, burn: NftBurnEvent, context: EventContext);
}

#[derive(Serialize, Deserialize, Debug, PartialEq)]
pub struct ExtendedNftTransferEvent {
pub event: NftTransferEvent,
pub trade: NftTradeDetails,
}

impl ExtendedNftTransferEvent {
pub fn from_event(event: NftTransferEvent, receipt: &TransactionReceipt) -> Self {
let mut prices = vec![None; event.token_ids.len()];
if let ReceiptEnumView::Action { actions, .. } = &receipt.receipt.receipt.receipt {
for action in actions {
if let ActionView::FunctionCall {
method_name, args, ..
} = action
{
if method_name == "nft_transfer_payout" {
if let ExecutionStatusView::SuccessValue(value) =
&receipt.receipt.execution_outcome.outcome.status
{
if let Ok(args) = serde_json::from_slice::<NftTransferPayoutArgs>(args)
{
if let Some(index) = event
.token_ids
.iter()
.position(|token_id| **token_id == args.token_id)
{
if let Ok(payout) =
serde_json::from_slice::<PayoutResponse>(value)
{
// Is this always the same as args.balance?
let price = payout.payout.values().sum::<Balance>();
prices[index] = Some(price);
}
}
}
}
}
}
}
}
ExtendedNftTransferEvent {
event,
trade: NftTradeDetails {
prices_near: prices,
},
}
}
}

#[derive(Serialize, Deserialize, Debug, PartialEq)]
pub struct NftTradeDetails {
/// None if it's a simple transfer, Some if it's a trade. Guaranteed to have the same length as NftTransferEvent::token_ids
#[serde(with = "dec_format_vec")]
pub prices_near: Vec<Option<Balance>>,
}

#[allow(dead_code)]
#[derive(Deserialize, Debug)]
struct NftTransferPayoutArgs {
receiver_id: AccountId,
token_id: String,
#[serde(with = "dec_format")]
approval_id: Option<u64>,
memo: Option<String>,
#[serde(with = "dec_format")]
balance: Balance,
max_len_payout: Option<u32>,
}

#[allow(dead_code)]
#[derive(Deserialize, Debug)]
struct PayoutResponse {
#[serde(with = "dec_format_map")]
payout: HashMap<AccountId, Balance>,
}

pub struct NftIndexer<T: NftEventHandler + Send + Sync + 'static>(pub T);

#[async_trait]
impl<T: NftEventHandler + Send + Sync + 'static> Indexer for NftIndexer<T> {
type Error = String;

async fn on_receipt(
&mut self,
receipt: &TransactionReceipt,
transaction: &IncompleteTransaction,
_block: &StreamerMessage,
) -> Result<(), Self::Error> {
let get_context_lazy = || {
let tx_sender_id = receipt.receipt.receipt.predecessor_id.clone();
let contract_id = receipt.receipt.receipt.receiver_id.clone();
let transaction_id = transaction.transaction.transaction.hash;
let receipt_id = receipt.receipt.receipt.receipt_id;
let block_height = receipt.block_height;
EventContext {
transaction_id,
receipt_id,
block_height,
tx_sender_id,
contract_id,
}
};
if receipt.is_successful(false) {
for log in &receipt.receipt.execution_outcome.outcome.logs {
if !log.contains("nep171") {
// Don't even start parsing logs if they don't even contain the NEP-171 standard
continue;
}
if let Ok(mint_log) = EventLogData::<NftMintLog>::deserialize(log) {
if mint_log.validate() {
log::debug!("Mint log: {mint_log:?}");
for mint in mint_log.data.0 {
self.0.handle_mint(mint, get_context_lazy()).await;
}
}
}
if let Ok(transfer_log) = EventLogData::<NftTransferLog>::deserialize(log) {
if transfer_log.validate() {
log::debug!("Transfer log: {transfer_log:?}");
for transfer in transfer_log.data.0 {
self.0
.handle_transfer(
ExtendedNftTransferEvent::from_event(transfer, receipt),
get_context_lazy(),
)
.await;
}
}
}
if let Ok(burn_log) = EventLogData::<NftBurnLog>::deserialize(log) {
if burn_log.validate() {
log::debug!("Burn log: {burn_log:?}");
for burn in burn_log.data.0 {
self.0.handle_burn(burn, get_context_lazy()).await;
}
}
}
}
}
Ok(())
}
}

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct EventContext {
pub transaction_id: CryptoHash,
pub receipt_id: CryptoHash,
pub block_height: u64,
pub tx_sender_id: AccountId,
pub contract_id: AccountId,
}
176 changes: 3 additions & 173 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,183 +1,13 @@
mod redis_handler;
#[cfg(test)]
mod tests;

use std::collections::HashMap;

use async_trait::async_trait;
use inindexer::fastnear_data_server::FastNearDataServerProvider;
use inindexer::near_indexer_primitives::types::{AccountId, Balance};
use inindexer::near_indexer_primitives::views::{ActionView, ExecutionStatusView, ReceiptEnumView};
use inindexer::near_indexer_primitives::CryptoHash;
use inindexer::near_indexer_primitives::StreamerMessage;
use inindexer::near_utils::{
dec_format, dec_format_map, dec_format_vec, EventLogData, NftBurnEvent, NftBurnLog,
NftMintEvent, NftMintLog, NftTransferEvent, NftTransferLog,
};
use inindexer::{
run_indexer, AutoContinue, BlockIterator, CompleteTransaction, Indexer, IndexerOptions,
PreprocessTransactionsSettings, TransactionReceipt,
run_indexer, AutoContinue, BlockIterator, IndexerOptions, PreprocessTransactionsSettings,
};
use nft_indexer::redis_handler;
use redis::aio::ConnectionManager;
use redis_handler::PushToRedisStream;
use serde::{Deserialize, Serialize};

#[async_trait]
trait NftEventHandler: Send + Sync {
async fn handle_mint(&mut self, mint: NftMintEvent, context: EventContext);
async fn handle_transfer(&mut self, transfer: ExtendedNftTransferEvent, context: EventContext);
async fn handle_burn(&mut self, burn: NftBurnEvent, context: EventContext);
}

#[derive(Serialize, Deserialize, Debug, PartialEq)]
pub struct ExtendedNftTransferEvent {
event: NftTransferEvent,
trade: NftTradeDetails,
}

impl ExtendedNftTransferEvent {
pub fn from_event(event: NftTransferEvent, receipt: &TransactionReceipt) -> Self {
let mut prices = vec![None; event.token_ids.len()];
if let ReceiptEnumView::Action { actions, .. } = &receipt.receipt.receipt.receipt {
for action in actions {
if let ActionView::FunctionCall {
method_name, args, ..
} = action
{
if method_name == "nft_transfer_payout" {
if let ExecutionStatusView::SuccessValue(value) =
&receipt.receipt.execution_outcome.outcome.status
{
if let Ok(args) = serde_json::from_slice::<NftTransferPayoutArgs>(args)
{
if let Some(index) = event
.token_ids
.iter()
.position(|token_id| **token_id == args.token_id)
{
if let Ok(payout) =
serde_json::from_slice::<PayoutResponse>(value)
{
// Is this always the same as args.balance?
let price = payout.payout.values().sum::<Balance>();
prices[index] = Some(price);
}
}
}
}
}
}
}
}
ExtendedNftTransferEvent {
event,
trade: NftTradeDetails {
prices_near: prices,
},
}
}
}

#[derive(Serialize, Deserialize, Debug, PartialEq)]
pub struct NftTradeDetails {
/// None if it's a simple transfer, Some if it's a trade. Guaranteed to have the same length as NftTransferEvent::token_ids
#[serde(with = "dec_format_vec")]
prices_near: Vec<Option<Balance>>,
}

#[allow(dead_code)]
#[derive(Deserialize, Debug)]
struct NftTransferPayoutArgs {
receiver_id: AccountId,
token_id: String,
#[serde(with = "dec_format")]
approval_id: Option<u64>,
memo: Option<String>,
#[serde(with = "dec_format")]
balance: Balance,
max_len_payout: Option<u32>,
}

#[allow(dead_code)]
#[derive(Deserialize, Debug)]
struct PayoutResponse {
#[serde(with = "dec_format_map")]
payout: HashMap<AccountId, Balance>,
}

struct NftIndexer<T: NftEventHandler + Send + Sync + 'static>(T);

#[async_trait]
impl<T: NftEventHandler + Send + Sync + 'static> Indexer for NftIndexer<T> {
type Error = String;

async fn on_transaction(
&mut self,
transaction: &CompleteTransaction,
_block: &StreamerMessage,
) -> Result<(), Self::Error> {
for receipt in transaction.receipts.iter() {
let get_context_lazy = || {
let tx_sender_id = receipt.receipt.receipt.predecessor_id.clone();
let contract_id = receipt.receipt.receipt.receiver_id.clone();
let txid = transaction.transaction.transaction.hash;
let block_height = receipt.block_height;
EventContext {
txid,
block_height,
tx_sender_id,
contract_id,
}
};
if receipt.is_successful(false) {
for log in &receipt.receipt.execution_outcome.outcome.logs {
if !log.contains("nep171") {
// Don't even start parsing logs if they don't even contain the NEP-171 standard
continue;
}
if let Ok(mint_log) = EventLogData::<NftMintLog>::deserialize(log) {
if mint_log.validate() {
log::debug!("Mint log: {mint_log:?}");
for mint in mint_log.data.0 {
self.0.handle_mint(mint, get_context_lazy()).await;
}
}
}
if let Ok(transfer_log) = EventLogData::<NftTransferLog>::deserialize(log) {
if transfer_log.validate() {
log::debug!("Transfer log: {transfer_log:?}");
for transfer in transfer_log.data.0 {
self.0
.handle_transfer(
ExtendedNftTransferEvent::from_event(transfer, receipt),
get_context_lazy(),
)
.await;
}
}
}
if let Ok(burn_log) = EventLogData::<NftBurnLog>::deserialize(log) {
if burn_log.validate() {
log::debug!("Burn log: {burn_log:?}");
for burn in burn_log.data.0 {
self.0.handle_burn(burn, get_context_lazy()).await;
}
}
}
}
}
}
Ok(())
}
}

#[derive(Clone, Debug, PartialEq)]
struct EventContext {
pub txid: CryptoHash,
pub block_height: u64,
pub tx_sender_id: AccountId,
pub contract_id: AccountId,
}

#[tokio::main]
async fn main() {
Expand All @@ -194,7 +24,7 @@ async fn main() {
.unwrap();
let connection = ConnectionManager::new(client).await.unwrap();

let mut indexer = NftIndexer(PushToRedisStream::new(connection, 10_000));
let mut indexer = nft_indexer::NftIndexer(PushToRedisStream::new(connection, 10_000));

run_indexer(
&mut indexer,
Expand Down
Loading

0 comments on commit 819b8ea

Please sign in to comment.