From 8a2703127728d9b872003cef000f5a81c818a418 Mon Sep 17 00:00:00 2001 From: Stanislav Jakuschevskij Date: Mon, 9 Dec 2024 19:13:27 +0100 Subject: [PATCH] Implement block and transaction processor Added block processor struct and the process method. Implemented getting valid transactions from the last processed index. Added data structures needed for the store. Decomposed the parser.Block.Transactions() method into readable chunks. Added transaction processor struct and process method. Unwrapping read write set data from the transaction, mapping to a new "write" data structure and passing down to the store. Store is an empty function and will be implemented next. Signed-off-by: Stanislav Jakuschevskij --- off_chain_data/application-go/listen.go | 192 +++++++++++++++++- .../application-go/parser/blockParser.go | 62 ++++-- 2 files changed, 232 insertions(+), 22 deletions(-) diff --git a/off_chain_data/application-go/listen.go b/off_chain_data/application-go/listen.go index 34f6f0efa..63b1ecef2 100644 --- a/off_chain_data/application-go/listen.go +++ b/off_chain_data/application-go/listen.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "math" + "offChainData/parser" "strconv" "github.com/hyperledger/fabric-gateway/pkg/client" @@ -12,6 +13,34 @@ import ( var checkpointFile = envOrDefault("CHECKPOINT_FILE", "checkpoint.json") var simulatedFailureCount = getSimulatedFailureCount() +// Apply writes for a given transaction to off-chain data store, ideally in a single operation for fault tolerance. +type store = func(data ledgerUpdate) + +// Ledger update made by a specific transaction. +type ledgerUpdate struct { + blockNumber uint64 + transactionId string + writes []write +} + +// Description of a ledger write that can be applied to an off-chain data store. +type write struct { + // Channel whose ledger is being updated. + channelName string + // Namespace within the ledger. + namespace string + // Key name within the ledger namespace. + key string + // Whether the key and associated value are being deleted. + isDelete bool + // If `isDelete` is false, the value written to the key; otherwise ignored. + value []byte +} + +// Apply writes for a given transaction to off-chain data store, ideally in a single operation for fault tolerance. +// This implementation just writes to a file. +var applyWritesToOffChainStore = func(data ledgerUpdate) { +} func listen(clientConnection *grpc.ClientConn) { id, options := newConnectOptions(clientConnection) @@ -35,6 +64,7 @@ func listen(clientConnection *grpc.ClientConn) { fmt.Printf("Simulating a write failure every %d transactions", simulatedFailureCount) } + // TODO put into infinite loop like in public docs example ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -48,8 +78,101 @@ func listen(clientConnection *grpc.ClientConn) { } for blockProto := range blocks { - checkpointer.CheckpointBlock(blockProto.GetHeader().GetNumber()) + aBlockProcessor := blockProcessor{ + parser.ParseBlock(blockProto), + checkpointer, + applyWritesToOffChainStore, + } + aBlockProcessor.process() + } +} + +type blockProcessor struct { + block *parser.Block + checkpointer *client.FileCheckpointer + storeWrites store +} + +func (b *blockProcessor) process() { + blockNumber := b.block.Number() + + fmt.Println("Received block", blockNumber) + + for _, transaction := range b.validTransactions() { + aTransactionProcessor := transactionProcessor{ + blockNumber, + transaction, + b.storeWrites, + } + aTransactionProcessor.process() + + transactionId := transaction.ChannelHeader().GetTxId() + b.checkpointer.CheckpointTransaction(blockNumber, transactionId) + } + + b.checkpointer.CheckpointBlock(b.block.Number()) +} + +func (b blockProcessor) validTransactions() []*parser.Transaction { + result := []*parser.Transaction{} + for _, transaction := range b.getNewTransactions() { + if transaction.IsValid() { + result = append(result, transaction) + } + } + return result +} + +func (b *blockProcessor) getNewTransactions() []*parser.Transaction { + transactions := b.block.Transactions() + + lastTransactionId := b.checkpointer.TransactionID() + if lastTransactionId == "" { + // No previously processed transactions within this block so all are new + return transactions + } + + // Ignore transactions up to the last processed transaction ID + lastProcessedIndex := b.findLastProcessedIndex() + return transactions[lastProcessedIndex+1:] +} + +func (b blockProcessor) findLastProcessedIndex() int { + blockTransactionIds := []string{} + for _, transaction := range b.block.Transactions() { + blockTransactionIds = append(blockTransactionIds, transaction.ChannelHeader().GetTxId()) } + + lastTransactionId := b.checkpointer.TransactionID() + lastProcessedIndex := -1 + for index, id := range blockTransactionIds { + if id == lastTransactionId { + lastProcessedIndex = index + } + } + if lastProcessedIndex < 0 { + panic( + fmt.Errorf( + "checkpoint transaction ID %s not found in block %d containing transactions: %s", + lastTransactionId, + b.block.Number(), + joinByComma(blockTransactionIds), + ), + ) + } + return lastProcessedIndex +} + +func joinByComma(list []string) string { + result := "" + for index, item := range list { + if len(list)-1 == index { + result += item + } else { + result += item + ", " + } + } + return result } func getSimulatedFailureCount() uint { @@ -66,3 +189,70 @@ func getSimulatedFailureCount() uint { return uint(result) } + +type transactionProcessor struct { + blockNumber uint64 + transaction *parser.Transaction + storeWrites store +} + +func (t *transactionProcessor) process() { + transactionId := t.transaction.ChannelHeader().GetTxId() + + fmt.Println("Process transaction", transactionId) + + writes := t.writes() + if len(writes) == 0 { + fmt.Println("Skipping read-only or system transaction", transactionId) + return + } + + fmt.Println("Process transaction", transactionId) + + t.storeWrites(ledgerUpdate{ + t.blockNumber, + transactionId, + writes, + }) +} + +func (t *transactionProcessor) writes() []write { + channelName = t.transaction.ChannelHeader().GetChannelId() + + nonSystemCCReadWriteSets := []parser.NamespaceReadWriteSet{} + for _, nsReadWriteSet := range t.transaction.NamespaceReadWriteSets() { + if !t.isSystemChaincode(nsReadWriteSet.Namespace()) { + nonSystemCCReadWriteSets = append(nonSystemCCReadWriteSets, nsReadWriteSet) + } + } + + writes := []write{} + for _, readWriteSet := range nonSystemCCReadWriteSets { + namespace := readWriteSet.Namespace() + + for _, kvWrite := range readWriteSet.ReadWriteSet().GetWrites() { + aWrite := write{ + channelName, + namespace, + kvWrite.GetKey(), + kvWrite.GetIsDelete(), + kvWrite.GetValue(), + } + writes = append(writes, aWrite) + } + } + + return writes +} + +func (t *transactionProcessor) isSystemChaincode(chaincodeName string) bool { + systemChaincodeNames := []string{ + "_lifecycle", + "cscc", + "escc", + "lscc", + "qscc", + "vscc", + } + return slices.Contains(systemChaincodeNames, chaincodeName) +} diff --git a/off_chain_data/application-go/parser/blockParser.go b/off_chain_data/application-go/parser/blockParser.go index 5e3ab56e5..b9539ea36 100644 --- a/off_chain_data/application-go/parser/blockParser.go +++ b/off_chain_data/application-go/parser/blockParser.go @@ -22,28 +22,30 @@ func (b *Block) Number() uint64 { return header.GetNumber() } -// TODO: needs cache; decompose +// TODO: needs cache func (b *Block) Transactions() []*Transaction { - envelopes := []*common.Envelope{} - for _, blockData := range b.block.GetData().GetData() { - envelope := &common.Envelope{} - if err := proto.Unmarshal(blockData, envelope); err != nil { - panic(err) - } - envelopes = append(envelopes, envelope) - } + envelopes := b.unmarshalEnvelopesFromBlockData() - commonPayloads := []*common.Payload{} - for _, envelope := range envelopes { - commonPayload := &common.Payload{} - if err := proto.Unmarshal(envelope.GetPayload(), commonPayload); err != nil { - panic(err) - } - commonPayloads = append(commonPayloads, commonPayload) + commonPayloads := b.unmarshalPayloadsFrom(envelopes) + + payloads := b.parse(commonPayloads) + + result := b.createTransactionsFrom(payloads) + + return result +} + +func (*Block) createTransactionsFrom(payloads []*PayloadImpl) []*Transaction { + result := []*Transaction{} + for _, payload := range payloads { + result = append(result, NewTransaction(payload)) } + return result +} +func (b *Block) parse(commonPayloads []*common.Payload) []*PayloadImpl { validationCodes := b.extractTransactionValidationCodes() - payloads := []*PayloadImpl{} + result := []*PayloadImpl{} for i, commonPayload := range commonPayloads { payload := ParsePayload( commonPayload, @@ -54,15 +56,33 @@ func (b *Block) Transactions() []*Transaction { ), ) if payload.IsEndorserTransaction() { - payloads = append(payloads, payload) + result = append(result, payload) } } + return result +} - result := []*Transaction{} - for _, payload := range payloads { - result = append(result, NewTransaction(payload)) +func (*Block) unmarshalPayloadsFrom(envelopes []*common.Envelope) []*common.Payload { + result := []*common.Payload{} + for _, envelope := range envelopes { + commonPayload := &common.Payload{} + if err := proto.Unmarshal(envelope.GetPayload(), commonPayload); err != nil { + panic(err) + } + result = append(result, commonPayload) } + return result +} +func (b *Block) unmarshalEnvelopesFromBlockData() []*common.Envelope { + result := []*common.Envelope{} + for _, blockData := range b.block.GetData().GetData() { + envelope := &common.Envelope{} + if err := proto.Unmarshal(blockData, envelope); err != nil { + panic(err) + } + result = append(result, envelope) + } return result }