Skip to content
This repository has been archived by the owner on Dec 8, 2023. It is now read-only.

Latest commit

 

History

History
838 lines (668 loc) · 59.6 KB

1.0_bitcoin_core_architecture.asciidoc

File metadata and controls

838 lines (668 loc) · 59.6 KB

1.0 Bitcoin Architecture

This text provides an overview of Bitcoin Core Architecture, describes the main components, how they interact, and shows relevant parts of the code.

It also answers common questions such as "How does the node find other peers on the network?", "How is a new block or transaction received and validated?" or "How is a transaction created and broadcasted?" along with others.

The commit 4b5659c6b1 can be used as a reference for the project’s codebase at the time of writing.

git clone https://github.com/bitcoin/bitcoin.git
cd bitcoin
git checkout -b text_branch 4b5659c6b1

Executables

To be able to interact with the Bitcoin network, the user needs to connect to a Bitcoin node, a software whose main purposes are:

  • Download the blockchain.

  • Enforce the rules of the network.

  • Validate and relay the transactions.

Running your own node is of utmost importance when spending or transferring bitcoin. The other option is trusting other nodes, which is a significant security hole. When doing it, the user is leaking personal data and trusting in the data and in the rules defined by others, which can be malicious agents or harmful to the network, or even harmful to the user.

The other component necessary to store and move coins is a wallet. The primary function of the wallet is to manage the private keys and sign transactions.

Bitcoin protocol does not have the concept of accounts, like banks. Instead, wallets manage a pool of unfathomable random numbers called private keys, which the user should keep secret. Bitcoin addresses are derived from these private keys and can be used to receive coins (and move them later). Only one receiving address must be used for each transaction.

Signing a transaction means the user is moving the money and has authorized the transaction. To create the signature, the wallet will use the private key associated with the coin(s) the user wants to spend.

Nodes and wallets are completely different things, although they can eventually come together in the same software. Node is related to the bitcoin network and protocol, while a wallet is related to one’s private keys and transactions. It is crucial to know the difference between these two concepts to have a better understanding of Bitcoin architecture.

Bitcoin Core has been the reference implementation since its first version. It is not just a single software. Bitcoin Core is a solution that includes a node, a graphical interface, and a command-line interface. There are also wallet features (including a sophisticated coin selection), but since version v0.21, wallets are no longer created by default. The reason for this is to make the use of multiple wallets more intuitive.

To start a node, just run the main app bitcoind implemented in src/bitcoind.cpp. This executable is expected to run as a background service (a daemon). It also provides an authorized [JSON-RPC](https://github.com/bitcoin/bitcoin/blob/4b5659c6b115315c9fd2902b4edd4b960a5e066e/doc/JSON-RPC-interface.md) and an unauthorized (limited) [REST](https://github.com/bitcoin/bitcoin/blob/4b5659c6b115315c9fd2902b4edd4b960a5e066e/doc/REST-interface.md) interface, so users and applications can access and work with the node.

After running the daemon, the user will be able to interact with the node through a command-line application called bitcoin-cli that is implemented in src/bitcoin-cli.cpp. It interfaces with bitcoind’s JSON-RPC server over HTTP and displays the results.

Another simpler and friendlier option to start the node and operate it is starting bitcoin-qt, implemented in src/qt/main.cpp. This is an intuitive graphical interface, where all interactions take place via buttons. The user can create multiple wallets and check other information about the node, such as the peer connection and network statistics.

executables
Figure 1. Bitcoin Core Executables


Protocol - P2P

Bitcoin is a peer-to-peer protocol. There is no central server that can determine the rules. So, to communicate with other peers and exchange information, the nodes need to establish a standard protocol so that they can understand each other.

The file src/protocol.h defines all types of messages (namespace NetMsgType) that will be used in this communication. As can be seen in the code below, each message has a comment with a succinct description of its purpose.

/**
 * Bitcoin protocol message types. When adding new message types, don't forget
 * to update allNetMessageTypes in protocol.cpp.
 */
namespace NetMsgType {

/**
 * The version message provides information about the transmitting node to the
 * receiving node at the beginning of a connection.
 */
extern const char* VERSION;
/**
 * The verack message acknowledges a previously-received version message,
 * informing the connecting node that it can begin to send other messages.
 */
extern const char* VERACK;
// ...
/**
 * The inv message (inventory message) transmits one or more inventories of
 * objects known to the transmitting peer.
 */
extern const char* INV;
/**
 * The getdata message requests one or more data objects from another node.
 */
extern const char* GETDATA;
// ...
}

But how does the node find the other peers to exchange messages? When running for the first time, the node connects to a bunch of servers denominated DNS Seeds which provide a list of IP addresses that have recently been running a Bitcoin client. After connecting to those IP addresses, the node starts to exchange messages with its peers.
DNS seeds are hardcoded and stored in src/chainparams.cpp.

vSeeds.emplace_back("seed.bitcoin.sipa.be"); // Pieter Wuille, only supports x1, x5, x9, and xd
vSeeds.emplace_back("dnsseed.bluematt.me"); // Matt Corallo, only supports x9
vSeeds.emplace_back("dnsseed.bitcoin.dashjr.org"); // Luke Dashjr
vSeeds.emplace_back("seed.bitcoinstats.com"); // Christian Decker, supports x1 - xf
vSeeds.emplace_back("seed.bitcoin.jonasschnelli.ch"); // Jonas Schnelli, only supports x1, x5, x9, and xd

New peers can also be manually added with the command -addnode=<addr>. The connection parameters, like DEFAULT_MAX_PEER_CONNECTIONS or MAX_ADDNODE_CONNECTIONS, can be found in the net.h file.

Concurrency model

Bitcoin Core does a lot of things at the same time. It downloads the blockchain, processes new transactions, validates new blocks, responds to user events and network events, etc.

Therefore, a multithreaded application seems appropriate for this case. Threads allow multiple functions to be executed concurrently, improving the application’s responsiveness considerably. Multithreading also allows the use of multiprocessors efficiently, enabling parallelism to perform intensive tasks.

An example of a task in the Bitcoin Core that can use multiple threads is the verification of scripts in a block. Since there are many transactions in a block, parallelizing the execution greatly improves performance.

Most threads are started (directly or indirectly) in init.cpp:AppInitMain(…​). This is the Bitcoin node’s main function. If the node is started through the bitcoind daemon, this function will be called inside the src/bitcoind.cpp:AppInit(…​). If it is started through the bitcoin-qt graphic interface, src/node/interfaces.cpp:appInitMain(…​) will call the function.

Another relevant function is CConnman::Start(…​) since network-related threads are instantiated and executed in it.

bool AppInitMain(...)
{
    // ...
    if (!node.connman->Start(*node.scheduler, connOptions)) {
        return false;
    }
    // ...
}

The table below shows the threads that will be presented next.

Purpose # threads Task run

Script Verification

nproc or 16

ThreadScriptCheck()

Loading Blocks

1

ThreadImport()

Servicing RPC Calls

4 or more

ThreadHTTP()

Load Peer Addresses From DNS Seeds

1

ThreadDNSAddressSeed()

Send And Receive Messages To And From Peers

1

ThreadSocketHandler()

Initializing Network Connections

1

ThreadOpenConnections()

Opening Added Network Connections

1

ThreadOpenAddedConnections()

Process Messages from netnet_processing

1

ThreadMessageHandler()

TraceThread

TraceThread is a wrapper for a function that will be called only once. In Bitcoin Core code, it is usually used as fn argument to thread constructor std::thread (Fn&& fn, Args&&…​ args). It is defined in src/util/system.h file.

template <typename Callable> void TraceThread(const char* name,  Callable func)
{
    util::ThreadRename(name);
    try
    {
        LogPrintf("%s thread start\n", name);
        func();
        LogPrintf("%s thread exit\n", name);
    }
    catch (const boost::thread_interrupted&)
    {
        LogPrintf("%s thread interrupt\n", name);
        throw;
    }
    catch (const std::exception& e) {
        PrintExceptionContinue(&e, name);
        throw;
    }
    catch (...) {
        PrintExceptionContinue(nullptr, name);
        throw;
    }
}

Script Verification

The function that perform the script verification is bool src/script/interpreter.cpp:VerifyScript(…​). It is called in at least three points of the application:

In the first two cases, static bool validation.cpp:AcceptToMemoryPool(…​) function is called to handle the new transaction, as can be seen in ProcessMessage(…​) and BroadcastTransaction(…​), which will try to add the transaction to mempool.
In the last case, the function that will handle the new block is bool src/validation.cpp:CChainState::ConnectBlock(…​).
All three cases end up calling src/validation.cpp:bool CheckInputScripts(…​).

CheckInputScripts(…​) receives const CTransaction& tx transaction as a parameter and validates the scripts of all its inputs. However, the relevant parameter in this context is the std::vector<CScriptCheck> *pvChecks = nullptr. CScriptCheck is a closure representing one script verification and it stores references to the spending transaction.

class CScriptCheck
{
    private:
        CTxOut m_tx_out;
        const CTransaction *ptxTo;
        unsigned int nIn;
        unsigned int nFlags;
        bool cacheStore;
        ScriptError error;
        PrecomputedTransactionData *txdata;
    // ...
}

The bool src/validation.cpp:CScriptCheck::operator()() method overloads the operator () and performs the script validation (VerifyScript(…​)).

bool CScriptCheck::operator()() {
    const CScript &scriptSig = ptxTo->vin[nIn].scriptSig;
    const CScriptWitness *witness = &ptxTo->vin[nIn].scriptWitness;
    return VerifyScript(scriptSig, m_tx_out.scriptPubKey, witness, nFlags, CachingTransactionSignatureChecker(ptxTo, nIn, m_tx_out.nValue, cacheStore, *txdata), &error);
}

So if the std::vector<CScriptCheck> *pvChecks is not null, the CheckInputScripts(…​) will add each script validation (CScriptCheck check) to the vector, so they can be executed in parallel. Otherwise, the script is verified immediately.

bool CheckInputScripts(const CTransaction& tx, ..., std::vector<CScriptCheck> *pvChecks) EXCLUSIVE_LOCKS_REQUIRED(cs_main)
{
    // ...
    for (unsigned int i = 0; i < tx.vin.size(); i++) {
        CScriptCheck check(txdata.m_spent_outputs[i], tx, i, flags, cacheSigStore, &txdata);
        if (pvChecks) {
            pvChecks->push_back(CScriptCheck());
            check.swap(pvChecks->back());
        } else if (!check()) {
            // ...
        }
        // ...
    }
    // ...
}

The only function that makes use of script validation parallelization is the aforementioned bool CChainState::ConnectBlock(…​) due to the number of transactions in a block. If the g_parallel_script_checks is true, the script verification vector that has been filled in CheckInputScripts(…​) is allocated in CCheckQueueControl<CScriptCheck> control(…​). The control.Wait() initiates the execution of each script and waits for the execution to be finished.
g_parallel_script_checks is a global variable and will be described in further detail soon.

bool CChainState::ConnectBlock(const CBlock& block, ...)
{
    // ...
    CCheckQueueControl<CScriptCheck> control(fScriptChecks && g_parallel_script_checks ? &scriptcheckqueue : nullptr);
    // ...

    for (unsigned int i = 0; i < block.vtx.size(); i++)
    {
        if (!tx.IsCoinBase())
        {
            std::vector<CScriptCheck> vChecks;
            if (!CheckInputScripts(tx,..., g_parallel_script_checks ? &vChecks : nullptr)) { /*...*/ }
            control.Add(vChecks);
        }
    }

    if (!control.Wait()) {
        LogPrintf("ERROR: %s: CheckQueue failed\n", __func__);
        return state.Invalid(BlockValidationResult::BLOCK_CONSENSUS, "block-validation-failed");
    }
}

The bool src/checkqueue.h:CCheckQueue::Loop(…​) method calls check() to excute the verification work (in that case, the script verification).

// src/checkqueue.h
template <typename T>
class CCheckQueue
{
private:
    /** Internal function that does bulk of the verification work. */
    bool Loop(bool fMaster = false)
    {
        // ...
        do {
            // ...
            // execute work
            for (T& check : vChecks)
                if (fOk)
                    fOk = check();
            vChecks.clear();
        } while (true);
}

The number of script-checking threads is defined in init.cpp:AppInitMain(…​). The user can set the number of threads using the argument -par. If the number is negative, it will limit the threads.
If the user does not pass the -par parameter, src/util/system.cpp:GetNumCores() is called to get the number of concurrent threads supported by the implementation. Then 1 is subtracted from this number because the the main thread is already being used. GetNumCores() is just a wrapper for C++ standard function std::thread::hardware_concurrency().
There is also a maximum number of dedicated script-checking threads allowed, which is 15 (MAX_SCRIPTCHECK_THREADS). Note that g_parallel_script_checks is set to true, allowing parallelization in the ConnectBlock(…​) function.

bool AppInitMain(...)
{
    //...
    int script_threads = args.GetArg("-par", DEFAULT_SCRIPTCHECK_THREADS);
    if (script_threads <= 0) {
        // -par=0 means autodetect (number of cores - 1 script threads)
        // -par=-n means "leave n cores free" (number of cores - n - 1 script threads)
        script_threads += GetNumCores();
    }

    // Subtract 1 because the main thread counts towards the par threads
    script_threads = std::max(script_threads - 1, 0);

    // Number of script-checking threads <= MAX_SCRIPTCHECK_THREADS
    script_threads = std::min(script_threads, MAX_SCRIPTCHECK_THREADS);

    LogPrintf("Script verification uses %d additional threads\n", script_threads);
    if (script_threads >= 1) {
        g_parallel_script_checks = true;
        StartScriptCheckWorkerThreads(script_threads);
    }
    //...
}

And finally the command StartScriptCheckWorkerThreads(script_threads) simply initiates a new worker thread one or several times, according to the script_threads value. Its implementation can be found in src/checkqueue.h:StartWorkerThreads(…​).

class CCheckQueue
{
    // ....
    //! Create a pool of new worker threads.
    void StartWorkerThreads(const int threads_num)
    {
        // ...
        assert(m_worker_threads.empty());
        for (int n = 0; n < threads_num; ++n) {
            m_worker_threads.emplace_back([this, n]() {
                util::ThreadRename(strprintf("scriptch.%i", n));
                Loop(false /* worker thread */);
            });
        }
    }
    // ...
}

The worker thread mechanism was not originally like that. It has been changed recently in PR #18710, making it more efficient and reducing the dependency on <boost/thread>. There is also an interesting CCheckQueue unit tests, implemented in PR #9497.

Loading Blocks

One of the first things the node needs to do is load the blocks and decide which chain to work.

The thread std::thread m_load_block invokes the function void ThreadImport(…​) to load the blocks on startup. If the user is rebuilding the blockchain index (-reindex) or is loading blocks directly from files (-loadblock), it will be handled in this thread. After loading the blocks, it tries to find the best chain in CChainState::ActivateBestChain(…​).

This happens in the init.cpp:AppInitMain(…​).

// src/validation.h
class ChainstateManager
{
    // ...
public:
    std::thread m_load_block;
    // ...
}

// src/init.cpp
bool AppInitMain(NodeContext& node, interfaces::BlockAndHeaderTipInfo* tip_info)
{
    // ...
    chainman.m_load_block = std::thread(&TraceThread<std::function<void()>>, "loadblk", [=, &chainman, &args] {
        ThreadImport(chainman, vImportFiles, args);
    });
    // ...
}

Note that m_load_block is a member field of the ChainstateManager class. Originally, it was a global variable called g_load_block but has been changed in PR #21575 to break down the src/init.cpp into smaller logical units.
ChainstateManager will be explained in the in [validationhcpp] section.

Servicing RPC Calls

To allow the user to interact with the node, an HTTP server should be enabled to process the requests. In order to do so, the init.cpp:AppInitServers(…​) calls httpserver.cpp:InitHTTPServer() that, as the name implies, initializes the server and httpserver.cpp:StartHTTPServer() which constructs new thread objects.

g_thread_http is the event dispatcher thread that manages the http event loop. It is interrupted when InterruptHTTPServer() is called.

g_thread_http_workers distributes the work over multiple threads and handles longer requests off the event loop thread. HTTPWorkQueueRun is a simple wrapper to set the thread name and run the work queue. The number of threads to service RPC calls is defined by the configuration argument -rpcthreads or httpserver.h:DEFAULT_HTTP_THREADS=4, whichever is greater.

static std::thread g_thread_http;
static std::vector<std::thread> g_thread_http_workers;

void StartHTTPServer()
{
    LogPrint(BCLog::HTTP, "Starting HTTP server\n");
    int rpcThreads = std::max((long)gArgs.GetArg("-rpcthreads", DEFAULT_HTTP_THREADS), 1L);
    LogPrintf("HTTP: starting %d worker threads\n", rpcThreads);
    g_thread_http = std::thread(ThreadHTTP, eventBase);

    for (int i = 0; i < rpcThreads; i++) {
        g_thread_http_workers.emplace_back(HTTPWorkQueueRun, workQueue, i);
    }
}

Load Peer Addresses From DNS Seeds

As said before, the node initially queries the hardcoded DNS Seeds to find new peers to connect to.

net.h:std::thread threadDNSAddressSeed is a thread created with CConnman::ThreadDNSAddressSeed(…​) wrapped into TraceThread(…​). It will run one time when node starts.

It is called in init.cpp:AppInitMain(…​) function when the command node.connman→Start(*node.scheduler, connOptions) is executed. Note that if the -dnsseed argument is given as false on startup, this thread will not be instantiated. The default value is true (defined in DEFAULT_DNSSEED).

if (!gArgs.GetBoolArg("-dnsseed", DEFAULT_DNSSEED))
    LogPrintf("DNS seeding disabled\n");
else
    threadDNSAddressSeed = std::thread(&TraceThread<std::function<void()> >, "dnsseed", std::function<void()>(std::bind(&CConnman::ThreadDNSAddressSeed, this)));

Send And Receive Messages To And From Peers

bool CConnman::Start(...)
{
    threadSocketHandler = std::thread(&TraceThread<std::function<void()> >, "net", std::function<void()>(std::bind(&CConnman::ThreadSocketHandler, this)));
}

It seems strange at first because TraceThread(…​) ensures unique execution, and the node will send and receive messages several times while connected, not just one time.

But a close look into the CConnman::ThreadSocketHandler() code shows it has a loop that keeps running until it is eventually interrupted by the interruptNet flag.

void CConnman::ThreadSocketHandler()
{
    while (!interruptNet)
    {
        DisconnectNodes();
        NotifyNumConnectionsChanged();
        SocketHandler();
    }
}

This flag is set to true only in the CConnman::Interrupt() that interrupts the connection. Note that the class CThreadInterrupt overloads the () operator. When this method is called, the flag is set to true.

CConnman::DisconnectNodes() disconnects any connected nodes if the fNetworkActive is false. It can be disabled / enabled by setnetworkactive RPC command. The function also disconnects unused nodes and deletes disconnected nodes.

NotifyNumConnectionsChanged() updates the number of connections and notifies the client interface, if it is enabled, when the number of connections changes.

SocketHandler() handles socket connections, incoming messages (pnode→vRecvMsg) and the messages to be sent (pnode→vSend);

Initializing Network Connections

The thread std::thread threadOpenConnections opens and manages connections to other peers. The way this thread gets started depends on the -connect=<ip> parameter.
If -connect is set to 0, this threadOpenConnections thread will not be created.
If a specific IP is set, there will be only one active outbound connection with that IP.
If the -connect parameter is not passed, all the outbound network connections will be initiated.

if (connOptions.m_use_addrman_outgoing || !connOptions.m_specified_outgoing.empty())
        threadOpenConnections = std::thread(&TraceThread<std::function<void()> >, "opencon", std::function<void()>(std::bind(&CConnman::ThreadOpenConnections, this, connOptions.m_specified_outgoing)));

The total number of outbound connections m_max_outbound is defined in src/net.h. It usually will be 11, the sum of the full relay (8), block relay (2) and feeler (1) connections.

/** Maximum number of automatic outgoing nodes over which we'll relay everything (blocks, tx, addrs, etc) */
static const int MAX_OUTBOUND_FULL_RELAY_CONNECTIONS = 8;
/** Maximum number of addnode outgoing nodes */
static const int MAX_ADDNODE_CONNECTIONS = 8;
/** Maximum number of block-relay-only outgoing connections */
static const int MAX_BLOCK_RELAY_ONLY_CONNECTIONS = 2;
/** Maximum number of feeler connections */
static const int MAX_FEELER_CONNECTIONS = 1;

void Init(...) {
    m_max_outbound = m_max_outbound_full_relay + m_max_outbound_block_relay + nMaxFeeler;
}

The use of -connect=0 to disable automatic outbound connections has been implemented in v0.14, with PR #9002.

Opening Added Network Connections

// Initiate manual connections
threadOpenAddedConnections = std::thread(&TraceThread<std::function<void()> >, "addcon", std::function<void()>(std::bind(&CConnman::ThreadOpenAddedConnections, this)));

Process Messages from netnet_processing

bool CConnman::Start(...)
{
    // Process messages
    threadMessageHandler = std::thread(&TraceThread<std::function<void()> >, "msghand", std::function<void()>(std::bind(&CConnman::ThreadMessageHandler, this)));
}

As already seen in Send And Receive Messages To And From Peers, this code will not be executed once. TraceThread(…​) ensures unique execution but the CConnman::ThreadMessageHandler() has a loop that keeps running until it is eventually interrupted by the flagInterruptMsgProc flag.

This flag is set true only in the CConnman::Interrupt() that interrupts all connections.

void CConnman::ThreadMessageHandler()
{
    while (!flagInterruptMsgProc)
    {
        // ...

        for (CNode* pnode : vNodesCopy)
        {
            if (pnode->fDisconnect)
                continue;

            // Receive messages
            bool fMoreNodeWork = m_msgproc->ProcessMessages(pnode, flagInterruptMsgProc);
            // ...
            // Send messages
            {
                LOCK(pnode->cs_sendProcessing);
                m_msgproc->SendMessages(pnode);
            }
            // ...
        }

        // ...
    }
}

Notifications Mechanism (ValidationInterface)

A lot of events happen simultaneously in Bitcoin Core: new messages arrive all the time, are processed, and sometimes, announcements need to be made. For example, if a wallet is connected to Bitcoin Core and a transaction related to this wallet arrives, the wallet needs to be notified; when a new block arrives, the chain and the wallet need to be updated; a transaction can also be removed from mempool, and it needs to be notified and so on.

In good software architecture, the components that trigger notifications and listen to them are completely decoupled. The message producer sends the notification to the listeners, but it does not know (and does not care) how the recipient will process the message. The sender’s primary concern should be to ensure that the message is delivered and do this asynchronously so as not to block any execution.

A known pattern for asynchronous message service is called message queue. When a relevant event is triggered, a message will be stored on the queue until it is processed by the consumer and deleted. The class that implements this kind of service in Bitcoin Core is the CScheduler and the method that keeps the queue running is void CScheduler::serviceQueue(). The queue service is started as soon as the application is initiated on AppInitMain(…​). This service will be described in more detail later.

bool AppInitMain(...)
{
    // Start the lightweight task scheduler thread
    threadGroup.create_thread([&] { TraceThread("scheduler", [&] { node.scheduler->serviceQueue(); }); });
}

In Bitcoin Core, there are two main classes that implement the notification between the components, the CValidationInterface, which works as notification receivers (also known as the subscribers ) and the CMainSignals, which works as only notification sender (also known as the publisher). When some event needs to be published, the message is sent by static CMainSignals g_signals to all the subscribers.

CValidationInterface is the interface that any class interested in listening to the events should implement. The events are: UpdatedBlockTip, TransactionAddedToMempool, TransactionRemovedFromMempool, BlockConnected, BlockDisconnected, ChainStateFlushed, BlockChecked and NewPoWValidBlock.

class CValidationInterface {
protected:
    ~CValidationInterface() = default;
    virtual void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) {}

    virtual void TransactionAddedToMempool(const CTransactionRef& tx, uint64_t mempool_sequence) {}

    virtual void TransactionRemovedFromMempool(const CTransactionRef& tx, MemPoolRemovalReason reason, uint64_t mempool_sequence) {}

    virtual void BlockConnected(const std::shared_ptr<const CBlock> &block, const CBlockIndex *pindex) {}

    virtual void BlockDisconnected(const std::shared_ptr<const CBlock> &block, const CBlockIndex* pindex) {}

    virtual void ChainStateFlushed(const CBlockLocator &locator) {}

    virtual void BlockChecked(const CBlock&, const BlockValidationState&) {}

    virtual void NewPoWValidBlock(const CBlockIndex *pindex, const std::shared_ptr<const CBlock>& block) {};
    friend class CMainSignals;
};

All of these methods represent the events, and although they are defined as virtual, they have an empty default implementation {}. So the subclasses only need to implement the methods/events that matter.

The code below shows src/net_processing.h:PeerManager implementing CValidationInterface. Note that the class does not implement the TransactionAddedToMempool(…​), TransactionRemovedFromMempool(…​), ChainStateFlushed(…​), which means it has no interest in these events.

class PeerManager final : public CValidationInterface, public NetEventsInterface {
    /**
     * Overridden from CValidationInterface.
     */
    void BlockConnected(const std::shared_ptr<const CBlock>& pblock, const CBlockIndex* pindexConnected) override;
    void BlockDisconnected(const std::shared_ptr<const CBlock> &block, const CBlockIndex* pindex) override;
    /**
     * Overridden from CValidationInterface.
     */
    void UpdatedBlockTip(const CBlockIndex *pindexNew, const CBlockIndex *pindexFork, bool fInitialDownload) override;
    /**
     * Overridden from CValidationInterface.
     */
    void BlockChecked(const CBlock& block, const BlockValidationState& state) override;
    /**
     * Overridden from CValidationInterface.
     */
    void NewPoWValidBlock(const CBlockIndex *pindex, const std::shared_ptr<const CBlock>& pblock) override;
    // ..
}

But it is not enough to just implement those methods. To listen to these events, it is necessary to register them as subscribers of CMainSignals, which is the only publisher so that they can receive the notifications. It is done by registering the CValidationInterface object through the RegisterSharedValidationInterface(…​) or RegisterValidationInterface(…​) functions.

bool AppInitMain(...)
{
    // ...
    node.peerman.reset(new PeerManager(chainparams, *node.connman, node.banman.get(), *node.scheduler, chainman, *node.mempool));
    RegisterValidationInterface(node.peerman.get());
    // ...
#if ENABLE_ZMQ
    g_zmq_notification_interface = CZMQNotificationInterface::Create();

    if (g_zmq_notification_interface) {
        RegisterValidationInterface(g_zmq_notification_interface);
    }
#endif
    //...
}
static RPCHelpMan submitblock()
{
    // ...
    auto sc = std::make_shared<submitblock_StateCatcher>(block.GetHash());
    RegisterSharedValidationInterface(sc);
    bool accepted = EnsureChainman(request.context).ProcessNewBlock(Params(), blockptr, /* fForceProcessing */ true, /* fNewBlock */ &new_block);
    UnregisterSharedValidationInterface(sc);
    // ...
}

Calling either of the two methods has the same effect. RegisterValidationInterface(…​) receives raw pointer as a parameter, then converts it to a shared pointer with an empty block control and sends it to the RegisterSharedValidationInterface(…​). Note that the src/node/interfaces.cpp:NotificationsProxy and submitblock_StateCatcher classes, that call directly RegisterSharedValidationInterface(…​) use std::make_shared to wrap the argument in a std::shared_ptr. The others call RegisterValidationInterface(…​).
Using shared pointers instead of raw pointers ensures the pointer is only deleted when the last reference is deleted. More details can be found in PR #18338.

void RegisterSharedValidationInterface(std::shared_ptr<CValidationInterface> callbacks)
{
    // Each connection captures the shared_ptr to ensure that each callback is
    // executed before the subscriber is destroyed. For more details see #18338.
    g_signals.m_internals->Register(std::move(callbacks));
}

void RegisterValidationInterface(CValidationInterface* callbacks)
{
    // Create a shared_ptr with a no-op deleter - CValidationInterface lifecycle
    // is managed by the caller.
    RegisterSharedValidationInterface({callbacks, [](CValidationInterface*){}});
}

To register a new subscriber, RegisterSharedValidationInterface(…​) calls g_signals.m_internals→Register(…​).
g_signals is a static CMainSignals that, as mentioned before, is the only publisher and m_internals is a MainSignalsInstance struct.

This struct has two important properties: std::list<ListEntry> m_list and SingleThreadedSchedulerClient m_schedulerClient. The first one is the list that stores the references for all the subscribers (objects that implement CValidationInterface interface), and the second one queues the messages to be sent and executes them serially.

struct MainSignalsInstance {
private:
    struct ListEntry { std::shared_ptr<CValidationInterface> callbacks; int count = 1; };
    std::list<ListEntry> m_list GUARDED_BY(m_mutex);
    // ...
public:
    SingleThreadedSchedulerClient m_schedulerClient;

    void Register(std::shared_ptr<CValidationInterface> callbacks)
    {
        // Register a new CValidationInterface subscriber
    }

    // ...
}

CMainSignals is the class that broadcasts the notifications to all the subscribers. Note that some methods of this class have the same name as CValidationInterface class. This way, it is easy to identify which event is triggered since both the publisher and the subscriber use the same method name. Note that CMainSignals does not implement CValidationInterface. That the methods have the same name is just a design decision.

// src/validationinterface.h
class CMainSignals {
private:
    std::unique_ptr<MainSignalsInstance> m_internals;

    // ...

public:

    // ...

    void UpdatedBlockTip(const CBlockIndex *, const CBlockIndex *, bool fInitialDownload);
    void TransactionAddedToMempool(const CTransactionRef&, uint64_t mempool_sequence);
    void TransactionRemovedFromMempool(const CTransactionRef&, MemPoolRemovalReason, uint64_t mempool_sequence);
    void BlockConnected(const std::shared_ptr<const CBlock> &, const CBlockIndex *pindex);
    void BlockDisconnected(const std::shared_ptr<const CBlock> &, const CBlockIndex* pindex);
    void ChainStateFlushed(const CBlockLocator &);
    void BlockChecked(const CBlock&, const BlockValidationState&);
    void NewPoWValidBlock(const CBlockIndex *, const std::shared_ptr<const CBlock>&);
};

To notify each of the subscribers, the MainSignalsInstance m_internals iterates each CValidationInterface element (which is also called callback) and constructs a lambda with the params that the message has (in the case of TransactionAddedToMempool, they are the tx and mempool_sequence).
The lambda body is the execution of CValidationInterface::TransactionAddedToMempool(…​). Instead of running the lambda immediately, it is allocated in the SingleThreadedSchedulerClient m_schedulerClient to be executed serially.

#define ENQUEUE_AND_LOG_EVENT(event, fmt, name, ...)           \
    do {                                                       \
        auto local_name = (name);                              \
        LOG_EVENT("Enqueuing " fmt, local_name, __VA_ARGS__);  \
        m_internals->m_schedulerClient.AddToProcessQueue([=] { \
            LOG_EVENT(fmt, local_name, __VA_ARGS__);           \
            event();                                           \
        });                                                    \
    } while (0)
// ...
void CMainSignals::TransactionAddedToMempool(const CTransactionRef& tx, uint64_t mempool_sequence) {
    auto event = [tx, mempool_sequence, this] {
        m_internals->Iterate([&](CValidationInterface& callbacks) { callbacks.TransactionAddedToMempool(tx, mempool_sequence); });
    };
    ENQUEUE_AND_LOG_EVENT(event, "%s: txid=%s wtxid=%s", __func__,
                          tx->GetHash().ToString(),
                          tx->GetWitnessHash().ToString());
}

And finally, to trigger an event, all that is needed is to call GetMainSignals().[event_name]. The MemPoolAccept::AcceptSingleTransaction function below illustrates this, sending the notification when a new transaction is added to mempool, passing the transaction and the mempool sequence as parameters.

bool MemPoolAccept::AcceptSingleTransaction(const CTransactionRef& ptx, ATMPArgs& args)
{
    // ...

    GetMainSignals().TransactionAddedToMempool(ptx, m_pool.GetAndIncrementSequence());

    return true;
}

The diagram below shows the notifications classes (and some of their fields) presented so far.

notification classes
Figure 2. Notification Class Diagram