Skip to content

Commit

Permalink
Implement block and transaction processor
Browse files Browse the repository at this point in the history
Added block processor struct and the process method.
Implemented getting valid transactions from the last processed index.
Added data structures needed for the store.

Decomposed the parser.Block.Transactions() method into readable chunks.

Added transaction processor struct and process method. Unwrapping
read write set data from the transaction, mapping to a new "write"
data structure and passing down to the store.

Store is an empty function and will be implemented next.

Signed-off-by: Stanislav Jakuschevskij <stas@two-giants.com>
  • Loading branch information
twoGiants committed Dec 28, 2024
1 parent c6d784a commit 8a27031
Show file tree
Hide file tree
Showing 2 changed files with 232 additions and 22 deletions.
192 changes: 191 additions & 1 deletion off_chain_data/application-go/listen.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"math"
"offChainData/parser"
"strconv"

"github.com/hyperledger/fabric-gateway/pkg/client"
Expand All @@ -12,6 +13,34 @@ import (

var checkpointFile = envOrDefault("CHECKPOINT_FILE", "checkpoint.json")
var simulatedFailureCount = getSimulatedFailureCount()
// Apply writes for a given transaction to off-chain data store, ideally in a single operation for fault tolerance.
type store = func(data ledgerUpdate)

// Ledger update made by a specific transaction.
type ledgerUpdate struct {
blockNumber uint64
transactionId string
writes []write
}

// Description of a ledger write that can be applied to an off-chain data store.
type write struct {
// Channel whose ledger is being updated.
channelName string
// Namespace within the ledger.
namespace string
// Key name within the ledger namespace.
key string
// Whether the key and associated value are being deleted.
isDelete bool
// If `isDelete` is false, the value written to the key; otherwise ignored.
value []byte
}

// Apply writes for a given transaction to off-chain data store, ideally in a single operation for fault tolerance.
// This implementation just writes to a file.
var applyWritesToOffChainStore = func(data ledgerUpdate) {
}

func listen(clientConnection *grpc.ClientConn) {
id, options := newConnectOptions(clientConnection)
Expand All @@ -35,6 +64,7 @@ func listen(clientConnection *grpc.ClientConn) {
fmt.Printf("Simulating a write failure every %d transactions", simulatedFailureCount)
}

// TODO put into infinite loop like in public docs example
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

Expand All @@ -48,8 +78,101 @@ func listen(clientConnection *grpc.ClientConn) {
}

for blockProto := range blocks {
checkpointer.CheckpointBlock(blockProto.GetHeader().GetNumber())
aBlockProcessor := blockProcessor{
parser.ParseBlock(blockProto),
checkpointer,
applyWritesToOffChainStore,
}
aBlockProcessor.process()
}
}

type blockProcessor struct {
block *parser.Block
checkpointer *client.FileCheckpointer
storeWrites store
}

func (b *blockProcessor) process() {
blockNumber := b.block.Number()

fmt.Println("Received block", blockNumber)

for _, transaction := range b.validTransactions() {
aTransactionProcessor := transactionProcessor{
blockNumber,
transaction,
b.storeWrites,
}
aTransactionProcessor.process()

transactionId := transaction.ChannelHeader().GetTxId()
b.checkpointer.CheckpointTransaction(blockNumber, transactionId)
}

b.checkpointer.CheckpointBlock(b.block.Number())
}

func (b blockProcessor) validTransactions() []*parser.Transaction {
result := []*parser.Transaction{}
for _, transaction := range b.getNewTransactions() {
if transaction.IsValid() {
result = append(result, transaction)
}
}
return result
}

func (b *blockProcessor) getNewTransactions() []*parser.Transaction {
transactions := b.block.Transactions()

lastTransactionId := b.checkpointer.TransactionID()
if lastTransactionId == "" {
// No previously processed transactions within this block so all are new
return transactions
}

// Ignore transactions up to the last processed transaction ID
lastProcessedIndex := b.findLastProcessedIndex()
return transactions[lastProcessedIndex+1:]
}

func (b blockProcessor) findLastProcessedIndex() int {
blockTransactionIds := []string{}
for _, transaction := range b.block.Transactions() {
blockTransactionIds = append(blockTransactionIds, transaction.ChannelHeader().GetTxId())
}

lastTransactionId := b.checkpointer.TransactionID()
lastProcessedIndex := -1
for index, id := range blockTransactionIds {
if id == lastTransactionId {
lastProcessedIndex = index
}
}
if lastProcessedIndex < 0 {
panic(
fmt.Errorf(
"checkpoint transaction ID %s not found in block %d containing transactions: %s",
lastTransactionId,
b.block.Number(),
joinByComma(blockTransactionIds),
),
)
}
return lastProcessedIndex
}

func joinByComma(list []string) string {
result := ""
for index, item := range list {
if len(list)-1 == index {
result += item
} else {
result += item + ", "
}
}
return result
}

func getSimulatedFailureCount() uint {
Expand All @@ -66,3 +189,70 @@ func getSimulatedFailureCount() uint {

return uint(result)
}

type transactionProcessor struct {
blockNumber uint64
transaction *parser.Transaction
storeWrites store
}

func (t *transactionProcessor) process() {
transactionId := t.transaction.ChannelHeader().GetTxId()

fmt.Println("Process transaction", transactionId)

writes := t.writes()
if len(writes) == 0 {
fmt.Println("Skipping read-only or system transaction", transactionId)
return
}

fmt.Println("Process transaction", transactionId)

t.storeWrites(ledgerUpdate{
t.blockNumber,
transactionId,
writes,
})
}

func (t *transactionProcessor) writes() []write {
channelName = t.transaction.ChannelHeader().GetChannelId()

nonSystemCCReadWriteSets := []parser.NamespaceReadWriteSet{}
for _, nsReadWriteSet := range t.transaction.NamespaceReadWriteSets() {
if !t.isSystemChaincode(nsReadWriteSet.Namespace()) {
nonSystemCCReadWriteSets = append(nonSystemCCReadWriteSets, nsReadWriteSet)
}
}

writes := []write{}
for _, readWriteSet := range nonSystemCCReadWriteSets {
namespace := readWriteSet.Namespace()

for _, kvWrite := range readWriteSet.ReadWriteSet().GetWrites() {
aWrite := write{
channelName,
namespace,
kvWrite.GetKey(),
kvWrite.GetIsDelete(),
kvWrite.GetValue(),
}
writes = append(writes, aWrite)
}
}

return writes
}

func (t *transactionProcessor) isSystemChaincode(chaincodeName string) bool {
systemChaincodeNames := []string{
"_lifecycle",
"cscc",
"escc",
"lscc",
"qscc",
"vscc",
}
return slices.Contains(systemChaincodeNames, chaincodeName)
}
62 changes: 41 additions & 21 deletions off_chain_data/application-go/parser/blockParser.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,28 +22,30 @@ func (b *Block) Number() uint64 {
return header.GetNumber()
}

// TODO: needs cache; decompose
// TODO: needs cache
func (b *Block) Transactions() []*Transaction {
envelopes := []*common.Envelope{}
for _, blockData := range b.block.GetData().GetData() {
envelope := &common.Envelope{}
if err := proto.Unmarshal(blockData, envelope); err != nil {
panic(err)
}
envelopes = append(envelopes, envelope)
}
envelopes := b.unmarshalEnvelopesFromBlockData()

commonPayloads := []*common.Payload{}
for _, envelope := range envelopes {
commonPayload := &common.Payload{}
if err := proto.Unmarshal(envelope.GetPayload(), commonPayload); err != nil {
panic(err)
}
commonPayloads = append(commonPayloads, commonPayload)
commonPayloads := b.unmarshalPayloadsFrom(envelopes)

payloads := b.parse(commonPayloads)

result := b.createTransactionsFrom(payloads)

return result
}

func (*Block) createTransactionsFrom(payloads []*PayloadImpl) []*Transaction {
result := []*Transaction{}
for _, payload := range payloads {
result = append(result, NewTransaction(payload))
}
return result
}

func (b *Block) parse(commonPayloads []*common.Payload) []*PayloadImpl {
validationCodes := b.extractTransactionValidationCodes()
payloads := []*PayloadImpl{}
result := []*PayloadImpl{}
for i, commonPayload := range commonPayloads {
payload := ParsePayload(
commonPayload,
Expand All @@ -54,15 +56,33 @@ func (b *Block) Transactions() []*Transaction {
),
)
if payload.IsEndorserTransaction() {
payloads = append(payloads, payload)
result = append(result, payload)
}
}
return result
}

result := []*Transaction{}
for _, payload := range payloads {
result = append(result, NewTransaction(payload))
func (*Block) unmarshalPayloadsFrom(envelopes []*common.Envelope) []*common.Payload {
result := []*common.Payload{}
for _, envelope := range envelopes {
commonPayload := &common.Payload{}
if err := proto.Unmarshal(envelope.GetPayload(), commonPayload); err != nil {
panic(err)
}
result = append(result, commonPayload)
}
return result
}

func (b *Block) unmarshalEnvelopesFromBlockData() []*common.Envelope {
result := []*common.Envelope{}
for _, blockData := range b.block.GetData().GetData() {
envelope := &common.Envelope{}
if err := proto.Unmarshal(blockData, envelope); err != nil {
panic(err)
}
result = append(result, envelope)
}
return result
}

Expand Down

0 comments on commit 8a27031

Please sign in to comment.