Skip to content

dfuse-io/dkafka

Folders and files

NameName
Last commit message
Last commit date

Latest commit

44928ca · Aug 17, 2021

History

35 Commits
Aug 17, 2021
Mar 2, 2021
Jun 1, 2021
Jun 2, 2021
Aug 17, 2021
Dec 14, 2020
Feb 17, 2021
Aug 17, 2021
Aug 17, 2021
Aug 17, 2021
Aug 17, 2021
Jun 2, 2021
Jun 1, 2021
Jun 2, 2021

Repository files navigation

dkafka

kafka cloudevent integration

Usage

  • Compile with go install -v ./cmd/dkafka
  • Run with:
  # assuming a dfuse instance with blocks running on localhost:9000
  # assuming a kafka instance running on localhost:9092 without auth

  dkafka publish \
     --dfuse-firehose-grpc-addr=localhost:9000 \
     --start-block-num=2  \ 
     --stop-block-num=180 \
     --dfuse-firehose-include-expr='action!="onblock"' \
     --kafka-endpoints=localhost:9092 \
     --kafka-topic=dkafka \
     --kafka-cursor-topic=_dkafka_cursor \
     --kafka-cursor-partition=0

Notes on transaction status and meaning of 'executed' in EOSIO

  • Reference: https://github.com/dfuse-io/dkafka/blob/main/pb/eosio-codec/codec.pb.go#L61-L68
  • Transaction status can be one of [NONE EXECUTED SOFTFAIL HARDFAIL DELAYED EXPIRED UNKNOWN CANCELED]
  • From a streaming perspective, you are probably only interested in the transactions that are executed, in the sense that their actions can modify the state of the chain
  • There is an edge case where a transaction can have a status of SOFTFAIL, but includes an successful call to an error handler (account::action == eosio::onerror, handled by the receiver) -- in this case, the the actions of this transaction are actually applied to the chain, and should probably be treated the same way as the other executed transactoins.
  • If you want only include all actions that are executed in that sense (including the weird SOFTFAIL case), use the field executed in your CEL filtering or look for a 'true' value on the executed field in your event.

CEL expression language

  • Reference: https://github.com/google/cel-spec/blob/master/doc/langdef.md

  • The following flags on dkafka accept CEL expressions to generate a string or an array of strings:

    • --event-type-expr "CEL" --> string
    • --event-keys-expr "CEL" --> [array,of,strings]
    • --event-extensions-expr : "key1:CEL1[,key2:CEL2...]" where each CEL expression --> string
    • --dfuse-firehose-include-expr "CEL" --> bool
  • the following names are available to be resolved from the EOS blocks, transactions, traces and actions.

    • receiver: receiver account, should be the same as the account unless it is a notification, ex: eosio.token, johndoe12345.
    • account: namespace of the action, ex: eosio.token
    • action: name of the action, ex: transfer
    • block_num: block number, ex: 234522
    • block_id: unique ID for the block, ex: 0002332afef44aad7d8f49374398436349372fcdb
    • block_time: timestamp of the block
    • step: one of: NEW,UNDO,IRREVERSIBLE
    • transaction_id: unique ID for the transaction, ex: 6d0aae37ab3b81b6783b877f2d54d4708f9f137cc6b23374641be362ff010803
    • transaction_index: position of that transaction in the block (ex: 5)
    • global_seq: unique sequence number of that action in the chain (ex: 4354352435)
    • execution_index: position of that action in the transaction, ordered by execution
    • data: map[string]Any corresponding to the params given to the action
    • auth: array of strings corresponding to the authorizations given to the action (who signed it?)
    • input: bool, if true, the action is top-level (declared in the transaction, not a consequence of another action)
    • notif: bool, if true, the action is a 'notification' (receiver!=account)
    • executed: bool, if true, the action was executed successfully (including the error handling of a failed deferred transaction as a SOFTFAIL)
    • scheduled: bool, if true, the action was scheduled (delayed or deferred)
    • trx_action_count: number of actions within that transaction
    • top5_trx_actors: array of the 5 most recurrent actors in a transaction (useful for big transactions with lots of actions)
  • examples:

    • to generate two events per action, one with 'account' as the key, one with the 'receiver' as the key (duplicates are removed automatically) --event-keys-expr="[account,receiver]"
    • to set the key to 'updateauth' when the action match, but 'account-{action}' for any other action: --event-keys-expr="action=='updateauth'?[action] : [account+'-'+action]"
    • to add a header ce_newaccount in kafka mesage with the value "yes" it is the action eosio::newaccount "no" ortherwise: --event-extensions-expr="ce_newaccount:account+':'+action=='eosio:newaccount'?'yes':'no'"

Format of a kafka event PAYLOAD

{
  "block_num": 5,
  "block_id": "000000053e6fe6497c3f609d1cae0d30dbb529cee0821c522b0d97c48e618744",
  "status": "EXECUTED",
  "executed": true,
  "block_step": "NEW",
  "trx_id": "4163246ac2fc096d949a7f1627e5c1252b44366fed52eb4bded960de7701831b",
  "act_info": {
    "account": "eosio",
    "receiver": "eosio",
    "action": "newaccount",
    "global_seq": 41,
    "authorizations": [
      "eosio@active"
    ],
    "db_ops": [
      {
        "operation": 1,
        "action_index": 1,
        "code": "battlefield1",
        "scope": "battlefield1",
        "table_name": "variant",
        "primary_key": ".........15n1",
        "new_payer": "battlefield1",
        "new_data": "MUsAAAAAAAAAAAAA9wAAAAAAAAA=",
        "new_json": "{\"amount\":\"234545.231 BTF\"}"
        "old_data": "BBEDDFE0wweFFFFFFFFFFFffewax",
        "old_json": "{\"amount\":\"0.000 BTF\"}"
      }
    ],
    "json_data": {
      "creator": "eosio",
      "name": "zzzzzzzzzzzz",
      "owner": {...}
       ...
    }
  }
}

Decoding DBOps using ABIs

  • --local-abi-files flag allows you to specify local JSON files as ABIs for the contracts for which you want to decode DB operations, ex:
curl -H "Content-type: application/json" -XPOST "localhost:8888/v1/chain/get_abi" --data  '{"account_name":"eosio"}' | jq -c .abi >eosio.abi
curl -H "Content-type: application/json" -XPOST "localhost:8888/v1/chain/get_abi" --data  '{"account_name":"eosio.token"}' | jq -c .abi >eosio.token.abi
dkafka publish \
  --dfuse-firehose-grpc-addr=localhost:9000 \
  --start-block-num=2 \
  --stop-block-num=180 \
  --dfuse-firehose-include-expr="executed && action!='onblock'" \
  --batch-mode \
  --dry-run \
  --local-abi-files=eosio.token:./eosio.token.abi,eosio:./eosio.abi
  • --abicodec-grpc-addr flag allows you to specify the GRPC address of a dfuse "abicodec" service, so dkafka can fetch the ABIs on demand, ex:
dkafka publish \
  --dfuse-firehose-grpc-addr=localhost:9000 \
  --start-block-num=2 \
  --stop-block-num=180 \
  --dfuse-firehose-include-expr="executed && action!='onblock'" \
  --batch-mode \
  --dry-run \
  --abicodec-grpc-addr=localhost:9001
  • --fail-on-undecodable-db-op flag allows you to specify if you want dkafka to fail any time it cannot decode a given dbop to JSON