Skip to content
This repository has been archived by the owner on Aug 23, 2020. It is now read-only.

Commit

Permalink
Feature: Sync check added to broadcast stage (#1856)
Browse files Browse the repository at this point in the history
* Added inSyncService for use in BroadcastStage

* Changed to atomicboolean
  • Loading branch information
Brord van Wierst authored May 12, 2020
1 parent 4f8e07d commit 6828cc4
Show file tree
Hide file tree
Showing 12 changed files with 299 additions and 153 deletions.
28 changes: 20 additions & 8 deletions src/main/java/com/iota/iri/MainInjectionConfiguration.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,14 @@
import com.iota.iri.service.API;
import com.iota.iri.service.ledger.LedgerService;
import com.iota.iri.service.ledger.impl.LedgerServiceImpl;
import com.iota.iri.service.milestone.*;
import com.iota.iri.service.milestone.impl.*;
import com.iota.iri.service.milestone.InSyncService;
import com.iota.iri.service.milestone.MilestoneService;
import com.iota.iri.service.milestone.MilestoneSolidifier;
import com.iota.iri.service.milestone.SeenMilestonesRetriever;
import com.iota.iri.service.milestone.impl.MilestoneInSyncService;
import com.iota.iri.service.milestone.impl.MilestoneServiceImpl;
import com.iota.iri.service.milestone.impl.MilestoneSolidifierImpl;
import com.iota.iri.service.milestone.impl.SeenMilestonesRetrieverImpl;
import com.iota.iri.service.snapshot.LocalSnapshotManager;
import com.iota.iri.service.snapshot.SnapshotProvider;
import com.iota.iri.service.snapshot.SnapshotService;
Expand All @@ -32,14 +38,14 @@
import com.iota.iri.storage.Tangle;
import com.iota.iri.storage.rocksDB.RocksDBPersistenceProvider;

import javax.annotation.Nullable;

import java.security.SecureRandom;

import com.google.inject.AbstractModule;
import com.google.inject.Provides;
import com.google.inject.Singleton;

import java.security.SecureRandom;

import javax.annotation.Nullable;

/**
* Guice module. Configuration class for dependency injection.
*/
Expand Down Expand Up @@ -114,9 +120,15 @@ TransactionPruner provideTransactionPruner(Tangle tangle, SnapshotProvider snaps

@Singleton
@Provides
LocalSnapshotManager provideLocalSnapshotManager(SnapshotProvider snapshotProvider, SnapshotService snapshotService, @Nullable TransactionPruner transactionPruner) {
InSyncService provideInSyncService(SnapshotProvider snapshotProvider, MilestoneSolidifier milestoneSolidifier) {
return new MilestoneInSyncService(snapshotProvider, milestoneSolidifier);
}

@Singleton
@Provides
LocalSnapshotManager provideLocalSnapshotManager(SnapshotProvider snapshotProvider, SnapshotService snapshotService, @Nullable TransactionPruner transactionPruner, InSyncService inSyncService) {
return configuration.getLocalSnapshotsEnabled()
? new LocalSnapshotManagerImpl(snapshotProvider, snapshotService, transactionPruner, configuration)
? new LocalSnapshotManagerImpl(snapshotProvider, snapshotService, transactionPruner, configuration, inSyncService)
: null;
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
package com.iota.iri.network;

import com.google.inject.AbstractModule;
import com.google.inject.Provides;
import com.google.inject.Singleton;
import com.iota.iri.service.validation.TransactionSolidifier;
import com.iota.iri.service.validation.TransactionValidator;
import com.iota.iri.conf.IotaConfig;
import com.iota.iri.controllers.TipsViewModel;
import com.iota.iri.network.impl.TipsRequesterImpl;
import com.iota.iri.network.pipeline.TransactionProcessingPipeline;
import com.iota.iri.network.pipeline.TransactionProcessingPipelineImpl;
import com.iota.iri.service.milestone.MilestoneSolidifier;
import com.iota.iri.service.milestone.InSyncService;
import com.iota.iri.service.milestone.MilestoneService;
import com.iota.iri.service.milestone.MilestoneSolidifier;
import com.iota.iri.service.snapshot.SnapshotProvider;
import com.iota.iri.service.validation.TransactionSolidifier;
import com.iota.iri.service.validation.TransactionValidator;
import com.iota.iri.storage.Tangle;

import com.google.inject.AbstractModule;
import com.google.inject.Provides;
import com.google.inject.Singleton;

/**
* Guice module for network package. Configuration class for dependency injection.
*/
Expand Down Expand Up @@ -49,10 +51,10 @@ TransactionProcessingPipeline provideTransactionProcessingPipeline(NeighborRoute
TransactionValidator txValidator, Tangle tangle, SnapshotProvider snapshotProvider,
TipsViewModel tipsViewModel, TransactionRequester transactionRequester,
TransactionSolidifier transactionSolidifier, MilestoneService milestoneService,
MilestoneSolidifier milestoneSolidifier) {
MilestoneSolidifier milestoneSolidifier, InSyncService inSyncService) {
return new TransactionProcessingPipelineImpl(neighborRouter, configuration, txValidator, tangle,
snapshotProvider, tipsViewModel, milestoneSolidifier, transactionRequester, transactionSolidifier,
milestoneService);
milestoneService, inSyncService);
}

@Singleton
Expand Down
16 changes: 13 additions & 3 deletions src/main/java/com/iota/iri/network/pipeline/BroadcastStage.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
import com.iota.iri.controllers.TransactionViewModel;
import com.iota.iri.network.NeighborRouter;
import com.iota.iri.network.neighbor.Neighbor;
import com.iota.iri.service.milestone.InSyncService;
import com.iota.iri.service.validation.TransactionSolidifier;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -21,14 +23,20 @@ public class BroadcastStage implements Stage {

private TransactionSolidifier transactionSolidifier;

/**
* Service used to determine if we send back tx to the original neighbour
*/
private InSyncService inSyncService;

/**
* Creates a new {@link BroadcastStage}.
*
* @param neighborRouter The {@link NeighborRouter} instance to use to broadcast
*/
public BroadcastStage(NeighborRouter neighborRouter, TransactionSolidifier transactionSolidifier) {
public BroadcastStage(NeighborRouter neighborRouter, TransactionSolidifier transactionSolidifier, InSyncService inSyncService) {
this.neighborRouter = neighborRouter;
this.transactionSolidifier = transactionSolidifier;
this.inSyncService = inSyncService;
}

/**
Expand All @@ -47,8 +55,10 @@ public ProcessingContext process(ProcessingContext ctx) {
// racy
Map<String, Neighbor> currentlyConnectedNeighbors = neighborRouter.getConnectedNeighbors();
for (Neighbor neighbor : currentlyConnectedNeighbors.values()) {
// don't send back to origin neighbor
if (neighbor.equals(originNeighbor)) {

// don't send back to origin neighbor, unless we are not in sync yet
// Required after PR: #1745 which removes ping pong behaviour
if (neighbor.equals(originNeighbor) && inSyncService.isInSync()) {
continue;
}
try {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
package com.iota.iri.network.pipeline;

import com.iota.iri.service.milestone.MilestoneService;
import com.iota.iri.service.milestone.MilestoneSolidifier;
import com.iota.iri.service.validation.TransactionSolidifier;
import com.iota.iri.service.validation.TransactionValidator;
import com.iota.iri.conf.NodeConfig;
import com.iota.iri.controllers.TipsViewModel;
import com.iota.iri.crypto.batched.BatchedHasher;
Expand All @@ -13,23 +9,28 @@
import com.iota.iri.model.persistables.Transaction;
import com.iota.iri.network.FIFOCache;
import com.iota.iri.network.NeighborRouter;
import com.iota.iri.network.TransactionRequester;
import com.iota.iri.network.TransactionCacheDigester;
import com.iota.iri.network.TransactionRequester;
import com.iota.iri.network.neighbor.Neighbor;
import com.iota.iri.service.milestone.InSyncService;
import com.iota.iri.service.milestone.MilestoneService;
import com.iota.iri.service.milestone.MilestoneSolidifier;
import com.iota.iri.service.snapshot.SnapshotProvider;
import com.iota.iri.service.validation.TransactionSolidifier;
import com.iota.iri.service.validation.TransactionValidator;
import com.iota.iri.storage.Tangle;
import com.iota.iri.utils.Converter;
import com.iota.iri.utils.IotaUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.ByteBuffer;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import com.iota.iri.utils.IotaUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.LinkedBlockingQueue;

/**
* The {@link TransactionProcessingPipelineImpl} processes transactions which either came from {@link Neighbor} instances or
Expand Down Expand Up @@ -91,17 +92,18 @@ public class TransactionProcessingPipelineImpl implements TransactionProcessingP
* @param tangle The {@link Tangle} database to use to store and load transactions.
* @param snapshotProvider The {@link SnapshotProvider} to use to store transactions with.
* @param tipsViewModel The {@link TipsViewModel} to load tips from in the reply stage
* @param inSyncService The {@link InSyncService} to check if we are in sync
*/
public TransactionProcessingPipelineImpl(NeighborRouter neighborRouter, NodeConfig config,
TransactionValidator txValidator, Tangle tangle, SnapshotProvider snapshotProvider,
TipsViewModel tipsViewModel, MilestoneSolidifier milestoneSolidifier,
TransactionRequester transactionRequester, TransactionSolidifier txSolidifier,
MilestoneService milestoneService) {
MilestoneService milestoneService, InSyncService inSyncService) {
FIFOCache<Long, Hash> recentlySeenBytesCache = new FIFOCache<>(config.getCacheSizeBytes());
this.preProcessStage = new PreProcessStage(recentlySeenBytesCache);
this.replyStage = new ReplyStage(neighborRouter, config, tangle, tipsViewModel, milestoneSolidifier,
snapshotProvider, recentlySeenBytesCache);
this.broadcastStage = new BroadcastStage(neighborRouter, txSolidifier);
this.broadcastStage = new BroadcastStage(neighborRouter, txSolidifier, inSyncService);
this.validationStage = new ValidationStage(txValidator, recentlySeenBytesCache);
this.receivedStage = new ReceivedStage(tangle, txSolidifier, snapshotProvider, transactionRequester,
milestoneService, config.getCoordinator());
Expand Down
16 changes: 16 additions & 0 deletions src/main/java/com/iota/iri/service/milestone/InSyncService.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.iota.iri.service.milestone;

/**
*
* Service for checking our node status
*
*/
public interface InSyncService {

/**
* Verifies if this node is currently considered in sync
*
* @return <code>true</code> if we are in sync, otherwise <code>false</code>
*/
boolean isInSync();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package com.iota.iri.service.milestone.impl;

import com.iota.iri.service.milestone.InSyncService;
import com.iota.iri.service.milestone.MilestoneSolidifier;
import com.iota.iri.service.snapshot.SnapshotProvider;

import com.google.common.annotations.VisibleForTesting;

import java.util.concurrent.atomic.AtomicBoolean;

/**
*
* A node is defined in sync when the latest snapshot milestone index and the
* latest milestone index are equal. In order to prevent a bounce between in and
* out of sync, a buffer is added when a node became in sync.
*
* This will always return false if we are not done scanning milestone
* candidates during initialization.
*
*/
public class MilestoneInSyncService implements InSyncService {

/**
* To prevent jumping back and forth in and out of sync, there is a buffer in between.
* Only when the latest milestone and latest snapshot differ more than this number, we fall out of sync
*/
@VisibleForTesting
static final int LOCAL_SNAPSHOT_SYNC_BUFFER = 5;

/**
* If this node is currently seen as in sync
*/
private AtomicBoolean isInSync;

/**
* Data provider for the latest solid index
*/
private SnapshotProvider snapshotProvider;

/**
* Data provider for the latest index
*/
private MilestoneSolidifier milestoneSolidifier;

/**
* @param snapshotProvider data provider for the snapshots that are relevant for the node
*/
public MilestoneInSyncService(SnapshotProvider snapshotProvider, MilestoneSolidifier milestoneSolidifier) {
this.snapshotProvider = snapshotProvider;
this.milestoneSolidifier = milestoneSolidifier;
this.isInSync = new AtomicBoolean(false);
}

@Override
public boolean isInSync() {
if (!milestoneSolidifier.isInitialScanComplete()) {
return false;
}

int latestIndex = milestoneSolidifier.getLatestMilestoneIndex();
int latestSnapshot = snapshotProvider.getLatestSnapshot().getIndex();

// If we are out of sync, only a full sync will get us in
if (!isInSync.get() && latestIndex == latestSnapshot) {
isInSync.set(true);

// When we are in sync, only dropping below the buffer gets us out of sync
} else if (latestSnapshot < latestIndex - LOCAL_SNAPSHOT_SYNC_BUFFER) {
isInSync.set(false);
}

return isInSync.get();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,9 @@

import com.iota.iri.conf.BaseIotaConfig;
import com.iota.iri.conf.SnapshotConfig;
import com.iota.iri.service.milestone.InSyncService;
import com.iota.iri.service.milestone.MilestoneSolidifier;
import com.iota.iri.service.snapshot.LocalSnapshotManager;
import com.iota.iri.service.snapshot.Snapshot;
import com.iota.iri.service.snapshot.SnapshotCondition;
import com.iota.iri.service.snapshot.SnapshotException;
import com.iota.iri.service.snapshot.SnapshotProvider;
import com.iota.iri.service.snapshot.SnapshotService;
import com.iota.iri.service.snapshot.*;
import com.iota.iri.service.transactionpruning.PruningCondition;
import com.iota.iri.service.transactionpruning.TransactionPruner;
import com.iota.iri.service.transactionpruning.TransactionPruningException;
Expand Down Expand Up @@ -71,11 +67,6 @@ public class LocalSnapshotManagerImpl implements LocalSnapshotManager {
*/
private final SnapshotConfig config;

/**
* If this node is currently seen as in sync
*/
private boolean isInSync;

/**
* Holds a reference to the {@link ThreadIdentifier} for the monitor thread.
*
Expand All @@ -87,19 +78,21 @@ public class LocalSnapshotManagerImpl implements LocalSnapshotManager {
private SnapshotCondition[] snapshotConditions;
private PruningCondition[] pruningConditions;

private InSyncService inSyncService;

/**
* @param snapshotProvider data provider for the snapshots that are relevant for the node
* @param snapshotService service instance of the snapshot package that gives us access to packages' business logic
* @param transactionPruner manager for the pruning jobs that allows us to clean up old transactions
* @param config important snapshot related configuration parameters
*/
public LocalSnapshotManagerImpl(SnapshotProvider snapshotProvider, SnapshotService snapshotService,
TransactionPruner transactionPruner, SnapshotConfig config) {
TransactionPruner transactionPruner, SnapshotConfig config, InSyncService inSyncService) {
this.snapshotProvider = snapshotProvider;
this.snapshotService = snapshotService;
this.transactionPruner = transactionPruner;
this.config = config;
this.isInSync = false;
this.inSyncService = inSyncService;
}

/**
Expand Down Expand Up @@ -143,7 +136,7 @@ void monitorThread(MilestoneSolidifier milestoneSolidifier) {
}

private Snapshot handleSnapshot(MilestoneSolidifier milestoneSolidifier) {
boolean isInSync = isInSync(milestoneSolidifier);
boolean isInSync = inSyncService.isInSync();
int lowestSnapshotIndex = calculateLowestSnapshotIndex(isInSync);
if (canTakeSnapshot(lowestSnapshotIndex, milestoneSolidifier)) {
try {
Expand Down Expand Up @@ -247,38 +240,6 @@ private boolean canPrune(int pruningMilestoneIndex) {
&& !transactionPruner.hasActiveJobFor(MilestonePrunerJob.class);
}

/**
* A node is defined in sync when the latest snapshot milestone index and the
* latest milestone index are equal. In order to prevent a bounce between in and
* out of sync, a buffer is added when a node became in sync.
*
* This will always return false if we are not done scanning milestone
* candidates during initialization.
*
* @param milestoneSolidifier tracker we use to determine milestones
* @return <code>true</code> if we are in sync, otherwise <code>false</code>
*/
@VisibleForTesting
boolean isInSync(MilestoneSolidifier milestoneSolidifier) {
if (!milestoneSolidifier.isInitialScanComplete()) {
return false;
}

int latestIndex = milestoneSolidifier.getLatestMilestoneIndex();
int latestSnapshot = snapshotProvider.getLatestSnapshot().getIndex();

// If we are out of sync, only a full sync will get us in
if (!isInSync && latestIndex == latestSnapshot) {
isInSync = true;

// When we are in sync, only dropping below the buffer gets us out of sync
} else if (latestSnapshot < latestIndex - LOCAL_SNAPSHOT_SYNC_BUFFER) {
isInSync = false;
}

return isInSync;
}

@Override
public void addSnapshotCondition(SnapshotCondition... conditions) {
if (this.snapshotConditions == null) {
Expand Down
Loading

0 comments on commit 6828cc4

Please sign in to comment.