Skip to content

Commit

Permalink
feat: filter watchmempool on predicates (#383)
Browse files Browse the repository at this point in the history
Signed-off-by: Chris Gianelloni <wolf31o2@blinklabs.io>
  • Loading branch information
wolf31o2 authored Feb 1, 2025
1 parent 8523c90 commit 5dc67ef
Show file tree
Hide file tree
Showing 4 changed files with 186 additions and 12 deletions.
4 changes: 3 additions & 1 deletion config/cardano/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,9 @@ func (c *CardanoNodeConfig) loadGenesisConfigs() error {
if c.ShelleyGenesisFile != "" {
shelleyGenesisPath := path.Join(c.path, c.ShelleyGenesisFile)
// TODO: check genesis file hash
shelleyGenesis, err := shelley.NewShelleyGenesisFromFile(shelleyGenesisPath)
shelleyGenesis, err := shelley.NewShelleyGenesisFromFile(
shelleyGenesisPath,
)
if err != nil {
return err
}
Expand Down
24 changes: 19 additions & 5 deletions state/chainsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,9 @@ func (ls *LedgerState) processGenesisBlock(
// Use Shelley genesis hash for initial epoch nonce for post-Byron eras
var tmpNonce []byte
if ls.currentEra.Id > 0 { // Byron
genesisHashBytes, _ := hex.DecodeString(ls.config.CardanoNodeConfig.ShelleyGenesisHash)
genesisHashBytes, _ := hex.DecodeString(
ls.config.CardanoNodeConfig.ShelleyGenesisHash,
)
tmpNonce = genesisHashBytes
}
newEpoch := models.Epoch{
Expand Down Expand Up @@ -328,7 +330,9 @@ func (ls *LedgerState) calculateEpochNonce(
) ([]byte, error) {
// Use Shelley genesis hash for initial epoch nonce
if len(ls.currentEpoch.Nonce) == 0 && ls.currentEra.Id > 0 { // Byron
genesisHashBytes, err := hex.DecodeString(ls.config.CardanoNodeConfig.ShelleyGenesisHash)
genesisHashBytes, err := hex.DecodeString(
ls.config.CardanoNodeConfig.ShelleyGenesisHash,
)
return genesisHashBytes, err
}
// Calculate stability window
Expand All @@ -343,20 +347,30 @@ func (ls *LedgerState) calculateEpochNonce(
).Num().Uint64()
stabilityWindowStartSlot := epochStartSlot - stabilityWindow
// Get last block before stability window
blockBeforeStabilityWindow, err := models.BlockBeforeSlotTxn(txn, stabilityWindowStartSlot)
blockBeforeStabilityWindow, err := models.BlockBeforeSlotTxn(
txn,
stabilityWindowStartSlot,
)
if err != nil {
return nil, err
}
// Get last block in previous epoch
blockLastPrevEpoch, err := models.BlockBeforeSlotTxn(txn, ls.currentEpoch.StartSlot)
blockLastPrevEpoch, err := models.BlockBeforeSlotTxn(
txn,
ls.currentEpoch.StartSlot,
)
if err != nil {
if err == models.ErrBlockNotFound {
return blockBeforeStabilityWindow.Nonce, nil
}
return nil, err
}
// Calculate nonce from inputs
ret, err := lcommon.CalculateEpochNonce(blockBeforeStabilityWindow.Nonce, blockLastPrevEpoch.PrevHash, nil)
ret, err := lcommon.CalculateEpochNonce(
blockBeforeStabilityWindow.Nonce,
blockLastPrevEpoch.PrevHash,
nil,
)
return ret.Bytes(), err
}

Expand Down
5 changes: 4 additions & 1 deletion state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,10 @@ func (ls *LedgerState) rollback(point ocommon.Point) error {
}
if len(recentBlocks) > 0 {
ls.currentTip = ochainsync.Tip{
Point: ocommon.NewPoint(recentBlocks[0].Slot, recentBlocks[0].Hash),
Point: ocommon.NewPoint(
recentBlocks[0].Slot,
recentBlocks[0].Hash,
),
BlockNumber: recentBlocks[0].Number,
}
if err := models.TipUpdateTxn(txn, ls.currentTip); err != nil {
Expand Down
165 changes: 160 additions & 5 deletions utxorpc/submit.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2024 Blink Labs Software
// Copyright 2025 Blink Labs Software
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -15,6 +15,7 @@
package utxorpc

import (
"bytes"
"context"
"encoding/hex"
"fmt"
Expand All @@ -23,6 +24,7 @@ import (
"connectrpc.com/connect"
"github.com/blinklabs-io/gouroboros/ledger"
lcommon "github.com/blinklabs-io/gouroboros/ledger/common"
cardano "github.com/utxorpc/go-codegen/utxorpc/v1alpha/cardano"
submit "github.com/utxorpc/go-codegen/utxorpc/v1alpha/submit"
"github.com/utxorpc/go-codegen/utxorpc/v1alpha/submit/submitconnect"

Expand Down Expand Up @@ -234,10 +236,163 @@ func (s *submitServiceServer) WatchMempool(
return err
}
} else {
// TODO: filter from all Predicate types
err := stream.Send(resp)
if err != nil {
return err
found := false
assetFound := false

// Check Predicate
addressPattern := predicate.GetMatch().GetCardano().GetHasAddress()
mintAssetPattern := predicate.GetMatch().GetCardano().GetMintsAsset()
moveAssetPattern := predicate.GetMatch().GetCardano().GetMovesAsset()

var addresses []ledger.Address
if addressPattern != nil {
// Handle Exact Address
exactAddressBytes := addressPattern.GetExactAddress()
if exactAddressBytes != nil {
var addr ledger.Address
err := addr.UnmarshalCBOR(exactAddressBytes)
if err != nil {
return fmt.Errorf(
"failed to decode exact address: %w",
err,
)
}
addresses = append(addresses, addr)
}

// Handle Payment Part
paymentPart := addressPattern.GetPaymentPart()
if paymentPart != nil {
s.utxorpc.config.Logger.Info("PaymentPart is present, decoding...")
var paymentAddr ledger.Address
err := paymentAddr.UnmarshalCBOR(paymentPart)
if err != nil {
return fmt.Errorf("failed to decode payment part: %w", err)
}
addresses = append(addresses, paymentAddr)
}

// Handle Delegation Part
delegationPart := addressPattern.GetDelegationPart()
if delegationPart != nil {
s.utxorpc.config.Logger.Info(
"DelegationPart is present, decoding...",
)
var delegationAddr ledger.Address
err := delegationAddr.UnmarshalCBOR(delegationPart)
if err != nil {
return fmt.Errorf(
"failed to decode delegation part: %w",
err,
)
}
addresses = append(addresses, delegationAddr)
}
}

var assetPatterns []*cardano.AssetPattern
if mintAssetPattern != nil {
assetPatterns = append(assetPatterns, mintAssetPattern)
}
if moveAssetPattern != nil {
assetPatterns = append(assetPatterns, moveAssetPattern)
}

// Convert everything to utxos (ledger.TransactionOutput) for matching
var utxos []ledger.TransactionOutput
utxos = append(tx.Outputs(), tx.CollateralReturn())
var inputs []ledger.TransactionInput
inputs = append(tx.Inputs(), tx.ReferenceInputs()...)
inputs = append(inputs, tx.Collateral()...)
for _, input := range inputs {
utxo, err := s.utxorpc.config.LedgerState.UtxoByRef(
input.Id().Bytes(),
input.Index(),
)
if err != nil {
return fmt.Errorf(
"failed to look up input: %w",
err,
)
}
ret, err := utxo.Decode() // ledger.TransactionOutput
if err != nil {
return err
}
if ret == nil {
return fmt.Errorf("decode returned empty utxo")
}
utxos = append(utxos, ret)
}

// Check UTxOs for addresses
for _, address := range addresses {
if found {
break
}
if assetFound {
found = true
break
}
for _, utxo := range utxos {
if found {
break
}
if assetFound {
found = true
break
}
if utxo.Address().String() == address.String() {
if found {
break
}
if assetFound {
found = true
break
}
// We matched address, check assetPatterns
for _, assetPattern := range assetPatterns {
// Address found, no assetPattern
if assetPattern == nil {
found = true
break
}
// Filter on assetPattern
for _, policyId := range utxo.Assets().Policies() {
if assetFound {
found = true
break
}
if bytes.Equal(
policyId.Bytes(),
assetPattern.PolicyId,
) {
for _, asset := range utxo.Assets().Assets(policyId) {
if bytes.Equal(asset, assetPattern.AssetName) {
found = true
assetFound = true
break
}
}
}
}
}
if found {
break
}
// Asset not found; skip this UTxO
if !assetFound {
continue
}
found = true
}
}
}
if found {
err := stream.Send(resp)
if err != nil {
return err
}
}
}
}
Expand Down

0 comments on commit 5dc67ef

Please sign in to comment.