Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Modified order of event logs to be processed in processor create UTXO #605

Merged
merged 1 commit into from
Mar 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 33 additions & 88 deletions batch/processor_create_utxo.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,25 +87,19 @@ async def process(self):
LOG.info(f"Syncing from={block_from}, to={block_to}")
for token_contract in self.token_contract_list:
event_triggered = False
event_triggered = event_triggered | await self.__process_transfer(
db_session=db_session,
token_contract=token_contract,
block_from=block_from,
block_to=block_to,
)
event_triggered = event_triggered | await self.__process_issue(
db_session=db_session,
token_contract=token_contract,
block_from=block_from,
block_to=block_to,
)
event_triggered = event_triggered | await self.__process_redeem(
event_triggered = event_triggered | await self.__process_transfer(
db_session=db_session,
token_contract=token_contract,
block_from=block_from,
block_to=block_to,
)
event_triggered = event_triggered | await self.__process_unlock(
event_triggered = event_triggered | await self.__process_redeem(
db_session=db_session,
token_contract=token_contract,
block_from=block_from,
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Changed the order in which events are processed
  • Moved processing of Unlock events into processing of Transfer

Expand Down Expand Up @@ -182,6 +176,7 @@ async def __process_transfer(

- The process of updating UTXO data by capturing the following events
- `Transfer` event on Token contracts
- `Unlock` event on Token contracts
- `HolderChanged` event on Exchange contracts

:param db_session: database session
Expand Down Expand Up @@ -255,6 +250,28 @@ async def __process_transfer(
}
)

# Get "Unlock" events from token contract
token_unlock_events = await AsyncContractUtils.get_event_logs(
contract=token_contract,
event="Unlock",
block_from=block_from,
block_to=block_to,
)
for _event in token_unlock_events:
if (
_event["args"]["accountAddress"]
!= _event["args"]["recipientAddress"]
):
tmp_events.append(
{
"event": _event["event"],
"args": dict(_event["args"]),
"transaction_hash": _event["transactionHash"].hex(),
"block_number": _event["blockNumber"],
"log_index": _event["logIndex"],
}
)

# Marge & Sort: block_number > log_index
events = sorted(
tmp_events, key=lambda x: (x["block_number"], x["log_index"])
Expand All @@ -264,9 +281,14 @@ async def __process_transfer(
event_triggered = False
for event in events:
args = event["args"]
from_account = args.get("from", ZERO_ADDRESS)
to_account = args.get("to", ZERO_ADDRESS)
amount = int(args.get("value"))
if event["event"] == "Unlock":
from_account = args.get("accountAddress", ZERO_ADDRESS)
to_account = args.get("recipientAddress", ZERO_ADDRESS)
amount = args.get("value")
else:
from_account = args.get("from", ZERO_ADDRESS)
to_account = args.get("to", ZERO_ADDRESS)
amount = int(args.get("value"))

# Skip sinking in case of deposit to exchange or withdrawal from exchange
if (await web3.eth.get_code(from_account)).hex() != "0x" or (
Expand Down Expand Up @@ -432,83 +454,6 @@ async def __process_redeem(
LOG.exception(e)
return False

async def __process_unlock(
self,
db_session: AsyncSession,
token_contract: AsyncContract,
block_from: int,
block_to: int,
):
"""Process Unlock Event

- The process of updating UTXO data by capturing the following events
- `Unlock` event on Token contracts

:param db_session: database session
:param token_contract: Token contract
:param block_from: Block from
:param block_to: Block to
:return: Whether events have occurred or not
"""
try:
# Get "Unlock" events from token contract
events = await AsyncContractUtils.get_event_logs(
contract=token_contract,
event="Unlock",
block_from=block_from,
block_to=block_to,
)

# Sink
event_triggered = False
for event in events:
args = event["args"]
from_account = args.get("accountAddress", ZERO_ADDRESS)
to_account = args.get("recipientAddress", ZERO_ADDRESS)
amount = args.get("value")

transaction_hash = event["transactionHash"].hex()
block_number = event["blockNumber"]
block_timestamp = datetime.utcfromtimestamp(
(await web3.eth.get_block(block_number))["timestamp"]
) # UTC

if (
amount is not None
and amount <= sys.maxsize
and from_account != to_account
):
event_triggered = True

# Update UTXO(from account)
await self.__sink_on_utxo(
db_session=db_session,
spent=True,
transaction_hash=transaction_hash,
token_address=token_contract.address,
account_address=from_account,
amount=amount,
block_number=block_number,
block_timestamp=block_timestamp,
)

# Update UTXO(to account)
await self.__sink_on_utxo(
db_session=db_session,
spent=False,
transaction_hash=transaction_hash,
token_address=token_contract.address,
account_address=to_account,
amount=amount,
block_number=block_number,
block_timestamp=block_timestamp,
)

return event_triggered
except Exception as e:
LOG.exception(e)
return False

@staticmethod
async def __process_event_triggered(
db_session: AsyncSession, token_contract: AsyncContract, event_triggered: bool
Expand Down
Loading
Loading