Skip to content

Commit

Permalink
FAB-6200 Java serialize channels.
Browse files Browse the repository at this point in the history
PS2 Check for shutdown channel.
PS3 Minor cleanup no need for thread local.
PS4 Remove unnecessary added code.
PS5 Remove added commented code.
PS6 Minor testcode change.
PS7 Refactor cleaner to serialize on client itself.
PS8 Refactor samplestore to use above.
PS9 Typo fix.
PS10 ""
PS11 make sure stream closed.
PS12 Make sure client is updated with channel.
PS13 Move checks on both channels.
PS14 sync on channels
PS15 Do query peer channels on both again.

Change-Id: I0e379d5dab626a4fd71cb66298612f5611215b78
Signed-off-by: rickr <cr22rc@gmail.com>
  • Loading branch information
cr22rc committed Sep 19, 2017
1 parent 24390b4 commit 9282be9
Show file tree
Hide file tree
Showing 11 changed files with 326 additions and 83 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ The SDK acts on behave of a particular User which is defined by the embedding ap

Note, the SDK does ***not*** provide a means of persistence
for the application defined channels and user artifacts on the client. This is left for the embedding application to best manage.
Channels may be serialized via Java serialization in the context of a client.
Channels deserialized are not in an initialized state.

The SDK also provides a client for Hyperledger's certificate authority. The SDK is however not dependent on this
particular implementation of a certificate authority. Other Certificate authority's maybe used by implementing the
Expand Down
150 changes: 116 additions & 34 deletions src/main/java/org/hyperledger/fabric/sdk/Channel.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,15 @@

package org.hyperledger.fabric.sdk;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
Expand All @@ -30,7 +39,6 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -112,7 +120,8 @@
* The class representing a channel with which the client SDK interacts.
* <p>
*/
public class Channel {
public class Channel implements Serializable {
private static final long serialVersionUID = -3266164166893832538L;
private static final Log logger = LogFactory.getLog(Channel.class);
private static final boolean IS_DEBUG_LEVEL = logger.isDebugEnabled();
private static final boolean IS_TRACE_LEVEL = logger.isTraceEnabled();
Expand All @@ -129,7 +138,7 @@ public class Channel {
private final String name;

// The peers on this channel to which the client can connect
private final Collection<Peer> peers = new Vector<>();
final Collection<Peer> peers = new Vector<>();

// Temporary variables to control how long to wait for deploy and invoke to complete before
// emitting events. This will be removed when the SDK is able to receive events from the
Expand All @@ -141,10 +150,26 @@ public class Channel {

// The crypto primitives object
// private CryptoSuite cryptoSuite;
private final Collection<Orderer> orderers = new LinkedList<>();
HFClient client;
private boolean initialized = false;
private boolean shutdown = false;
final Collection<Orderer> orderers = new LinkedList<>();
transient HFClient client;
private transient boolean initialized = false;
private transient boolean shutdown = false;

private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {

in.defaultReadObject();
initialized = false;
shutdown = false;
msps = new HashMap<>();
txListeners = new LinkedHashMap<>();
channelEventQue = new ChannelEventQue();
blockListeners = new LinkedHashMap<>();

for (EventHub eventHub : getEventHubs()) {
eventHub.setEventQue(channelEventQue);
}

}

/**
* Get all Event Hubs on this channel.
Expand All @@ -155,9 +180,8 @@ public Collection<EventHub> getEventHubs() {
return Collections.unmodifiableCollection(eventHubs);
}

private final Collection<EventHub> eventHubs = new LinkedList<>();
private ExecutorService executorService;
private Block genesisBlock;
final Collection<EventHub> eventHubs = new LinkedList<>();
private transient Block genesisBlock;
private final boolean systemChannel;

private Channel(String name, HFClient hfClient, Orderer orderer, ChannelConfiguration channelConfiguration, byte[][] signers) throws InvalidArgumentException, TransactionException {
Expand Down Expand Up @@ -461,8 +485,6 @@ private Channel(String name, HFClient client, final boolean systemChannel) throw
}
this.name = name;
this.client = client;
this.executorService = client.getExecutorService();

logger.debug(format("Creating channel: %s, client context %s", isSystemChannel() ? "SYSTEM_CHANNEL" : name, client.getUserContext().getName()));

}
Expand Down Expand Up @@ -731,10 +753,10 @@ public Channel initialize() throws InvalidArgumentException, TransactionExceptio
* @throws InvalidArgumentException
* @throws CryptoException
*/
private void loadCACertificates() throws InvalidArgumentException, CryptoException {
protected void loadCACertificates() throws InvalidArgumentException, CryptoException {
logger.debug(format("Channel %s loadCACertificates", name));

if (msps == null) {
if (msps == null || msps.isEmpty()) {
throw new InvalidArgumentException("Unable to load CA certificates. Channel " + name + " does not have any MSPs.");
}

Expand Down Expand Up @@ -822,7 +844,7 @@ private Block getGenesisBlock(Orderer orderer) throws TransactionException {
return genesisBlock;
}

private Map<String, MSP> msps = new HashMap<>();
private transient Map<String, MSP> msps = new HashMap<>();

boolean isSystemChannel() {
return systemChannel;
Expand Down Expand Up @@ -2585,7 +2607,7 @@ public boolean unRegisterBlockListener(String handle) throws InvalidArgumentExce
* A queue each eventing hub will write events to.
*/

private final ChannelEventQue channelEventQue = new ChannelEventQue();
private transient ChannelEventQue channelEventQue = new ChannelEventQue();

class ChannelEventQue {

Expand Down Expand Up @@ -2659,15 +2681,15 @@ BlockEvent getNextEvent() throws EventHubException {
* Runs processing events from event hubs.
*/

Thread eventQueueThread = null;
transient Thread eventQueueThread = null;

private void startEventQue() {

if (eventQueueThread != null) {
return;
}

executorService.execute(() -> {
client.getExecutorService().execute(() -> {
eventQueueThread = Thread.currentThread();

while (!shutdown) {
Expand Down Expand Up @@ -2700,7 +2722,7 @@ private void startEventQue() {

for (BL l : blcopy) {
try {
executorService.execute(() -> l.listener.received(blockEvent));
client.getExecutorService().execute(() -> l.listener.received(blockEvent));
} catch (Throwable e) { //Don't let one register stop rest.
logger.error("Error trying to call block listener on channel " + blockEvent.getChannelId(), e);
}
Expand All @@ -2717,7 +2739,7 @@ private void startEventQue() {

private static final String BLOCK_LISTENER_TAG = "BLOCK_LISTENER_HANDLE";

private final LinkedHashMap<String, BL> blockListeners = new LinkedHashMap<>();
private transient LinkedHashMap<String, BL> blockListeners = new LinkedHashMap<>();

class BL {

Expand Down Expand Up @@ -2791,7 +2813,7 @@ private String registerTransactionListenerProcessor() throws InvalidArgumentExce
});
}

private final LinkedHashMap<String, LinkedList<TL>> txListeners = new LinkedHashMap<>();
private transient LinkedHashMap<String, LinkedList<TL>> txListeners = new LinkedHashMap<>();

private class TL {
final String txID;
Expand Down Expand Up @@ -2841,9 +2863,9 @@ void fire(BlockEvent.TransactionEvent transactionEvent) {
}

if (transactionEvent.isValid()) {
executorService.execute(() -> future.complete(transactionEvent));
client.getExecutorService().execute(() -> future.complete(transactionEvent));
} else {
executorService.execute(() -> future.completeExceptionally(
client.getExecutorService().execute(() -> future.completeExceptionally(
new TransactionEventException(format("Received invalid transaction event. Transaction ID %s status %s",
transactionEvent.getTransactionID(),
transactionEvent.getValidationCode()),
Expand Down Expand Up @@ -2905,7 +2927,7 @@ boolean isMatch(ChaincodeEvent chaincodeEvent) {

void fire(BlockEvent blockEvent, ChaincodeEvent ce) {

executorService.execute(() -> chaincodeEventListener.received(handle, blockEvent, ce));
client.getExecutorService().execute(() -> chaincodeEventListener.received(handle, blockEvent, ce));

}
}
Expand Down Expand Up @@ -2949,7 +2971,7 @@ public String registerChaincodeEventListener(Pattern chaincodeId, Pattern eventN

}

private String blh = null;
private transient String blh = null;

/**
* Unregister an existing chaincode event listener.
Expand Down Expand Up @@ -3063,16 +3085,22 @@ public synchronized void shutdown(boolean force) {

initialized = false;
shutdown = true;
if (chainCodeListeners != null) {
chainCodeListeners.clear();

executorService = null;
}

chainCodeListeners.clear();
if (blockListeners != null) {
blockListeners.clear();
}

blockListeners.clear();
if (client != null) {
client.removeChannel(this);
}

client.removeChannel(this);
client = null;

for (EventHub eh : getEventHubs()) {
for (EventHub eh : eventHubs) {

try {
eh.shutdown();
Expand All @@ -3098,12 +3126,66 @@ public synchronized void shutdown(boolean force) {

orderers.clear();

if (eventQueueThread != null) {
eventQueueThread.interrupt();
if (null != eventQueueThread) {

if (eventQueueThread != null) {
eventQueueThread.interrupt();
}
eventQueueThread = null;
}
}

/**
* Serialize channel to a file using Java serialization.
* Deserialized channel will NOT be in an initialized state.
*
* @param file file
* @throws IOException
* @throws InvalidArgumentException
*/

public void serializeChannel(File file) throws IOException, InvalidArgumentException {

if (null == file) {
throw new InvalidArgumentException("File parameter may not be null");
}

Files.write(Paths.get(file.getAbsolutePath()), serializeChannel(),
StandardOpenOption.CREATE, StandardOpenOption.TRUNCATE_EXISTING, StandardOpenOption.WRITE);

}

/**
* Serialize channel to a byte array using Java serialization.
* Deserialized channel will NOT be in an initialized state.
*
* @throws InvalidArgumentException
* @throws IOException
*/
public byte[] serializeChannel() throws IOException, InvalidArgumentException {

if (isShutdown()) {
throw new InvalidArgumentException(format("Channel %s has been shutdown.", getName()));
}

ObjectOutputStream out = null;

try {
ByteArrayOutputStream bai = new ByteArrayOutputStream();
out = new ObjectOutputStream(bai);
out.writeObject(this);
out.flush();
return bai.toByteArray();
} finally {
if (null != out) {
try {
out.close();
} catch (IOException e) {
logger.error(e); // best effort.
}
}
}
eventQueueThread = null;

client = null;
}

@Override
Expand Down
29 changes: 15 additions & 14 deletions src/main/java/org/hyperledger/fabric/sdk/EventHub.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package org.hyperledger.fabric.sdk;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -46,27 +47,29 @@
* Feeds Channel event queues with events
*/

public class EventHub {
public class EventHub implements Serializable {
private static final long serialVersionUID = 2882609588201108148L;
private static final Log logger = LogFactory.getLog(EventHub.class);
private static final Config config = Config.getConfig();
private static final long EVENTHUB_CONNECTION_WAIT_TIME = config.getEventHubConnectionWaitTime();
private final ExecutorService executorService;

private final transient ExecutorService executorService;

private final String url;
private final String name;
private final Properties properties;
private ManagedChannel managedChannel;
private boolean connected = false;
private EventsGrpc.EventsStub events;
private StreamObserver<PeerEvents.SignedEvent> sender;
private transient ManagedChannel managedChannel;
private transient boolean connected = false;
private transient EventsGrpc.EventsStub events;
private transient StreamObserver<PeerEvents.SignedEvent> sender;
/**
* Event queue for all events from eventhubs in the channel
*/
private Channel.ChannelEventQue eventQue;
private long connectedTime = 0L; // 0 := never connected
private boolean shutdown = false;
private transient Channel.ChannelEventQue eventQue;
private transient long connectedTime = 0L; // 0 := never connected
private transient boolean shutdown = false;
private Channel channel;
private TransactionContext transactionContext;
private transient TransactionContext transactionContext;

/**
* Get disconnected time.
Expand Down Expand Up @@ -170,7 +173,7 @@ boolean connect() throws EventHubException {

}

private StreamObserver<PeerEvents.Event> eventStream = null; // Saved here to avoid potential garbage collection
private transient StreamObserver<PeerEvents.Event> eventStream = null; // Saved here to avoid potential garbage collection

synchronized boolean connect(final TransactionContext transactionContext) throws EventHubException {
if (connected) {
Expand Down Expand Up @@ -290,8 +293,6 @@ public void onCompleted() {
logger.error(e);
}



if (!threw.isEmpty()) {
eventStream = null;
connected = false;
Expand Down Expand Up @@ -393,7 +394,7 @@ public interface EventHubDisconnected {
* Default reconnect event hub implementation. Applications are free to replace
*/

protected EventHubDisconnected disconnectedHandler = new EventHub.EventHubDisconnected() {
protected transient EventHubDisconnected disconnectedHandler = new EventHub.EventHubDisconnected() {
@Override
public synchronized void disconnected(final EventHub eventHub) throws EventHubException {
logger.info(format("Detected disconnect %s", eventHub.toString()));
Expand Down
Loading

0 comments on commit 9282be9

Please sign in to comment.