Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Log Event Streaming for Plugin API #186

Merged
merged 21 commits into from
Nov 13, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ public void startNode(final BesuNode node) {
besuPluginContext.addService(
BesuEvents.class,
new BesuEventsImpl(
besuController.getProtocolContext().getBlockchain(),
besuController.getProtocolManager().getBlockBroadcaster(),
besuController.getTransactionPool(),
besuController.getSyncState()));
Expand Down
2 changes: 1 addition & 1 deletion besu/src/main/java/org/hyperledger/besu/RunnerBuilder.java
Original file line number Diff line number Diff line change
Expand Up @@ -646,7 +646,7 @@ private void createLogsSubscriptionService(
final LogsSubscriptionService logsSubscriptionService =
new LogsSubscriptionService(subscriptionManager);

blockchain.observeBlockAdded(logsSubscriptionService);
blockchain.observeLogs(logsSubscriptionService);
}

private void createSyncingSubscriptionService(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -937,6 +937,7 @@ private BesuCommand startPlugins() {
besuPluginContext.addService(
BesuEvents.class,
new BesuEventsImpl(
besuController.getProtocolContext().getBlockchain(),
besuController.getProtocolManager().getBlockBroadcaster(),
besuController.getTransactionPool(),
besuController.getSyncState()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,25 +14,37 @@
*/
package org.hyperledger.besu.services;

import static java.util.stream.Collectors.toUnmodifiableList;

import org.hyperledger.besu.ethereum.api.query.LogsQuery;
import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.core.LogTopic;
import org.hyperledger.besu.ethereum.core.LogWithMetadata;
import org.hyperledger.besu.ethereum.eth.sync.BlockBroadcaster;
import org.hyperledger.besu.ethereum.eth.sync.state.SyncState;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
import org.hyperledger.besu.plugin.data.Address;
import org.hyperledger.besu.plugin.data.BlockHeader;
import org.hyperledger.besu.plugin.data.PropagatedBlockContext;
import org.hyperledger.besu.plugin.data.Quantity;
import org.hyperledger.besu.plugin.data.UnformattedData;
import org.hyperledger.besu.plugin.services.BesuEvents;

import java.util.List;
import java.util.function.Supplier;

public class BesuEventsImpl implements BesuEvents {
private Blockchain blockchain;
private final BlockBroadcaster blockBroadcaster;
private final TransactionPool transactionPool;
private final SyncState syncState;

public BesuEventsImpl(
final Blockchain blockchain,
final BlockBroadcaster blockBroadcaster,
final TransactionPool transactionPool,
final SyncState syncState) {
this.blockchain = blockchain;
this.blockBroadcaster = blockBroadcaster;
this.transactionPool = transactionPool;
this.syncState = syncState;
Expand Down Expand Up @@ -83,6 +95,36 @@ public void removeSyncStatusListener(final long listenerIdentifier) {
syncState.unsubscribeSyncStatus(listenerIdentifier);
}

@Override
public long addLogListener(
final List<Address> addresses,
final List<List<UnformattedData>> topics,
final LogListener logListener) {
final List<org.hyperledger.besu.ethereum.core.Address> besuAddresses =
addresses.stream()
.map(org.hyperledger.besu.ethereum.core.Address::fromPlugin)
.collect(toUnmodifiableList());
final List<List<LogTopic>> besuTopics =
topics.stream()
.map(
subList -> subList.stream().map(LogTopic::fromPlugin).collect(toUnmodifiableList()))
.collect(toUnmodifiableList());

final LogsQuery logsQuery = new LogsQuery(besuAddresses, besuTopics);

return blockchain.observeLogs(
logWithMetadata -> {
if (logsQuery.matches(LogWithMetadata.fromPlugin(logWithMetadata))) {
logListener.onLogEmitted(logWithMetadata);
}
});
}

@Override
public void removeLogListener(final long listenerIdentifier) {
blockchain.removeObserver(listenerIdentifier);
}

private static PropagatedBlockContext blockPropagatedContext(
final Supplier<BlockHeader> blockHeaderSupplier,
final Supplier<Quantity> totalDifficultySupplier) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,11 @@

import org.hyperledger.besu.crypto.SECP256K1.KeyPair;
import org.hyperledger.besu.ethereum.ProtocolContext;
import org.hyperledger.besu.ethereum.chain.DefaultBlockchain;
import org.hyperledger.besu.ethereum.chain.MutableBlockchain;
import org.hyperledger.besu.ethereum.core.Block;
import org.hyperledger.besu.ethereum.core.BlockBody;
import org.hyperledger.besu.ethereum.core.BlockDataGenerator;
import org.hyperledger.besu.ethereum.core.BlockHeaderTestFixture;
import org.hyperledger.besu.ethereum.core.TransactionTestFixture;
import org.hyperledger.besu.ethereum.core.Wei;
Expand All @@ -39,19 +41,25 @@
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPool;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolConfiguration;
import org.hyperledger.besu.ethereum.eth.transactions.TransactionPoolFactory;
import org.hyperledger.besu.ethereum.mainnet.MainnetBlockHeaderFunctions;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSchedule;
import org.hyperledger.besu.ethereum.mainnet.ProtocolSpec;
import org.hyperledger.besu.ethereum.mainnet.TransactionValidator;
import org.hyperledger.besu.ethereum.mainnet.ValidationResult;
import org.hyperledger.besu.ethereum.storage.keyvalue.KeyValueStoragePrefixedKeyBlockchainStorage;
import org.hyperledger.besu.ethereum.worldstate.WorldStateArchive;
import org.hyperledger.besu.metrics.noop.NoOpMetricsSystem;
import org.hyperledger.besu.plugin.data.LogWithMetadata;
import org.hyperledger.besu.plugin.data.PropagatedBlockContext;
import org.hyperledger.besu.plugin.data.SyncStatus;
import org.hyperledger.besu.plugin.data.Transaction;
import org.hyperledger.besu.services.kvstore.InMemoryKeyValueStorage;
import org.hyperledger.besu.testutil.TestClock;
import org.hyperledger.besu.util.uint.UInt256;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
Expand All @@ -76,28 +84,29 @@ public class BesuEventsImplTest {
@Mock private EthContext mockEthContext;
@Mock private EthMessages mockEthMessages;
@Mock private EthScheduler mockEthScheduler;
@Mock private MutableBlockchain mockBlockchain;
@Mock private TransactionValidator mockTransactionValidator;
@Mock private ProtocolSpec<Void> mockProtocolSpec;
@Mock private WorldStateArchive mockWorldStateArchive;
@Mock private WorldState mockWorldState;
private org.hyperledger.besu.ethereum.core.BlockHeader fakeBlockHeader;
private TransactionPool transactionPool;
private BlockBroadcaster blockBroadcaster;
private BesuEventsImpl serviceImpl;
private MutableBlockchain blockchain;
private final BlockDataGenerator gen = new BlockDataGenerator();

@Before
public void setUp() {
fakeBlockHeader =
new org.hyperledger.besu.ethereum.core.BlockHeader(
null, null, null, null, null, null, null, null, 1, 1, 1, 1, null, null, 1, null);

when(mockBlockchain.getBlockHeader(any())).thenReturn(Optional.of(fakeBlockHeader));
blockchain =
DefaultBlockchain.createMutable(
gen.genesisBlock(),
new KeyValueStoragePrefixedKeyBlockchainStorage(
new InMemoryKeyValueStorage(), new MainnetBlockHeaderFunctions()),
new NoOpMetricsSystem());
when(mockEthContext.getEthMessages()).thenReturn(mockEthMessages);
when(mockEthContext.getEthPeers()).thenReturn(mockEthPeers);
when(mockEthContext.getScheduler()).thenReturn(mockEthScheduler);
when(mockEthPeers.streamAvailablePeers()).thenReturn(Stream.empty()).thenReturn(Stream.empty());
when(mockProtocolContext.getBlockchain()).thenReturn(mockBlockchain);
when(mockProtocolContext.getBlockchain()).thenReturn(blockchain);
when(mockProtocolContext.getWorldStateArchive()).thenReturn(mockWorldStateArchive);
when(mockProtocolSchedule.getByBlockNumber(anyLong())).thenReturn(mockProtocolSpec);
when(mockProtocolSpec.getTransactionValidator()).thenReturn(mockTransactionValidator);
Expand All @@ -107,6 +116,7 @@ public void setUp() {
when(mockWorldStateArchive.get(any())).thenReturn(Optional.of(mockWorldState));

blockBroadcaster = new BlockBroadcaster(mockEthContext);
syncState = new SyncState(blockchain, mockEthPeers);
transactionPool =
TransactionPoolFactory.createTransactionPool(
mockProtocolSchedule,
Expand All @@ -117,9 +127,8 @@ public void setUp() {
syncState,
Wei.ZERO,
TransactionPoolConfiguration.builder().txPoolMaxSize(1).build());
syncState = new SyncState(mockBlockchain, mockEthPeers);

serviceImpl = new BesuEventsImpl(blockBroadcaster, transactionPool, syncState);
serviceImpl = new BesuEventsImpl(blockchain, blockBroadcaster, transactionPool, syncState);
}

@Test
Expand Down Expand Up @@ -157,7 +166,10 @@ public void syncStatusEventDoesNotFireAfterUnsubscribe() {
}

private void setSyncTarget() {
syncState.setSyncTarget(mock(EthPeer.class), fakeBlockHeader);
syncState.setSyncTarget(
mock(EthPeer.class),
new org.hyperledger.besu.ethereum.core.BlockHeader(
null, null, null, null, null, null, null, null, 1, 1, 1, 1, null, null, 1, null));
}

private void clearSyncTarget() {
Expand Down Expand Up @@ -270,6 +282,43 @@ public void transactionDroppedEventDoesNotFireAfterUnsubscribe() {
assertThat(result.get()).isNull();
}

@Test
public void logEventFiresAfterSubscribe() {
final List<LogWithMetadata> result = new ArrayList<>();
blockchain.observeLogs(result::add);

assertThat(result).isEmpty();
final var block =
gen.block(
new BlockDataGenerator.BlockOptions()
.setParentHash(blockchain.getGenesisBlock().getHash()));
blockchain.appendBlock(block, gen.receipts(block));
assertThat(result).hasSize(4);
}

@Test
public void logEventDoesNotFireAfterUnsubscribe() {
final List<LogWithMetadata> result = new ArrayList<>();
final long id = blockchain.observeLogs(result::add);

assertThat(result).isEmpty();
final var block =
gen.block(
new BlockDataGenerator.BlockOptions()
.setParentHash(blockchain.getGenesisBlock().getHash()));
blockchain.appendBlock(block, gen.receipts(block));
assertThat(result).hasSize(4);

result.clear();

serviceImpl.removeLogListener(id);
final var block2 =
gen.block(new BlockDataGenerator.BlockOptions().setParentHash(block.getHash()));
blockchain.appendBlock(block2, gen.receipts(block2));

assertThat(result).isEmpty();
}

private Block generateBlock() {
final BlockBody body = new BlockBody(Collections.emptyList(), Collections.emptyList());
return new Block(new BlockHeaderTestFixture().buildHeader(), body);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,11 @@
import org.hyperledger.besu.ethereum.api.jsonrpc.internal.results.LogResult;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription.SubscriptionManager;
import org.hyperledger.besu.ethereum.api.jsonrpc.websocket.subscription.request.SubscriptionType;
import org.hyperledger.besu.ethereum.chain.BlockAddedEvent;
import org.hyperledger.besu.ethereum.chain.BlockAddedObserver;
import org.hyperledger.besu.ethereum.chain.Blockchain;
import org.hyperledger.besu.ethereum.core.LogWithMetadata;

import java.util.List;
import java.util.function.Consumer;

public class LogsSubscriptionService implements BlockAddedObserver {
public class LogsSubscriptionService implements Consumer<LogWithMetadata> {

private final SubscriptionManager subscriptionManager;

Expand All @@ -32,22 +30,12 @@ public LogsSubscriptionService(final SubscriptionManager subscriptionManager) {
}

@Override
public void onBlockAdded(final BlockAddedEvent event, final Blockchain __) {
final List<LogsSubscription> logsSubscriptions =
subscriptionManager.subscriptionsOfType(SubscriptionType.LOGS, LogsSubscription.class);

event
.getLogsWithMetadata()
public void accept(final LogWithMetadata logWithMetadata) {
subscriptionManager.subscriptionsOfType(SubscriptionType.LOGS, LogsSubscription.class).stream()
.filter(logsSubscription -> logsSubscription.getLogsQuery().matches(logWithMetadata))
.forEach(
logWithMetadata ->
logsSubscriptions.stream()
.filter(
logsSubscription ->
logsSubscription.getLogsQuery().matches(logWithMetadata))
.forEach(
logsSubscription ->
subscriptionManager.sendMessage(
logsSubscription.getSubscriptionId(),
new LogResult(logWithMetadata))));
logsSubscription ->
subscriptionManager.sendMessage(
logsSubscription.getSubscriptionId(), new LogResult(logWithMetadata)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public class LogsSubscriptionServiceTest {
@Before
public void before() {
logsSubscriptionService = new LogsSubscriptionService(subscriptionManager);
blockchain.observeBlockAdded(logsSubscriptionService);
blockchain.observeLogs(logsSubscriptionService);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
import org.hyperledger.besu.ethereum.core.BlockBody;
import org.hyperledger.besu.ethereum.core.BlockHeader;
import org.hyperledger.besu.ethereum.core.Hash;
import org.hyperledger.besu.ethereum.core.LogWithMetadata;
import org.hyperledger.besu.ethereum.core.Transaction;
import org.hyperledger.besu.ethereum.core.TransactionReceipt;
import org.hyperledger.besu.util.uint.UInt256;

import java.util.List;
import java.util.Optional;
import java.util.function.Consumer;

/** An interface for reading data from the blockchain. */
public interface Blockchain {
Expand Down Expand Up @@ -184,6 +186,19 @@ default boolean contains(final Hash blockHash) {
*/
long observeBlockAdded(BlockAddedObserver observer);

/**
* Adds an observer that will get called on for every added and removed log when a new block is
* added.
*
* <p><i>No guarantees are made about the order in which the observers are invoked.</i>
*
* @param logObserver the observer to call
* @return the observer ID that can be used to remove it later.
*/
default long observeLogs(final Consumer<LogWithMetadata> logObserver) {
return observeBlockAdded(((event, __) -> event.getLogsWithMetadata().forEach(logObserver)));
}

/**
* Removes an previously added observer of any type.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,10 @@ public static Address privateContractAddress(
})));
}

public static Address fromPlugin(final org.hyperledger.besu.plugin.data.Address logger) {
Copy link
Contributor

@shemnon shemnon Nov 13, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we do a type check first? There's a strong chance it is a compatible object we just can't staticly prove it.

if (logger instanceof Address) {
  return (Address) logger;
} else {
  return wrap(BytesValue.fromPlugin(logger));
}

return logger instanceof Address ? (Address) logger : wrap(BytesValue.fromPlugin(logger));
}

@Override
public Address copy() {
final BytesValue copiedStorage = wrapped.copy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ public static Hash fromHexStringLenient(final String str) {
return new Hash(Bytes32.fromHexStringLenient(str));
}

public static Hash fromPlugin(final org.hyperledger.besu.plugin.data.Hash blockHash) {
return blockHash instanceof Hash ? (Hash) blockHash : wrap(Bytes32.fromPlugin(blockHash));
}

@Override
public byte[] getByteArray() {
return super.getByteArray();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import org.hyperledger.besu.ethereum.rlp.RLPInput;
import org.hyperledger.besu.ethereum.rlp.RLPOutput;
import org.hyperledger.besu.plugin.data.UnformattedData;
import org.hyperledger.besu.util.bytes.BytesValue;
import org.hyperledger.besu.util.bytes.DelegatingBytesValue;

Expand Down Expand Up @@ -57,6 +58,10 @@ public static LogTopic readFrom(final RLPInput in) {
return new LogTopic(in.readBytesValue());
}

public static LogTopic fromPlugin(final UnformattedData data) {
return data instanceof LogTopic ? (LogTopic) data : wrap(BytesValue.fromPlugin(data));
}

/**
* Writes the log topic to the provided RLP output.
*
Expand Down
Loading