Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade netty-channel-fsm to 1.0.0 #1363

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2019 the Eclipse Milo Authors
* Copyright (c) 2025 the Eclipse Milo Authors
*
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
Expand All @@ -15,8 +15,8 @@
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledFuture;

import com.digitalpetri.strictmachine.Fsm;
import com.digitalpetri.strictmachine.FsmContext;
import com.digitalpetri.fsm.Fsm;
import com.digitalpetri.fsm.FsmContext;
import org.eclipse.milo.opcua.sdk.client.OpcUaSession;
import org.eclipse.milo.opcua.sdk.client.SessionActivityListener;
import org.eclipse.milo.opcua.stack.client.UaStackClient;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2024 the Eclipse Milo Authors
* Copyright (c) 2025 the Eclipse Milo Authors
*
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
Expand All @@ -16,18 +16,21 @@
import java.security.cert.CertificateEncodingException;
import java.security.cert.X509Certificate;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
import java.util.stream.Stream;

import com.digitalpetri.fsm.Fsm;
import com.digitalpetri.fsm.FsmContext;
import com.digitalpetri.fsm.dsl.ActionContext;
import com.digitalpetri.fsm.dsl.FsmBuilder;
import com.digitalpetri.netty.fsm.ChannelFsm;
import com.digitalpetri.strictmachine.Fsm;
import com.digitalpetri.strictmachine.FsmContext;
import com.digitalpetri.strictmachine.dsl.ActionContext;
import com.digitalpetri.strictmachine.dsl.FsmBuilder;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Streams;
import com.google.common.primitives.Bytes;
Expand Down Expand Up @@ -83,6 +86,8 @@
import org.eclipse.milo.opcua.stack.core.util.Unit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.MDC;
import org.slf4j.MDC.MDCCloseable;

import static java.util.concurrent.CompletableFuture.completedFuture;
import static org.eclipse.milo.opcua.sdk.client.session.SessionFsm.KEY_CLOSE_FUTURE;
Expand All @@ -103,15 +108,24 @@ public class SessionFsmFactory {

private static final Logger LOGGER = LoggerFactory.getLogger(SessionFsm.LOGGER_NAME);

private static final AtomicLong INSTANCE_ID = new AtomicLong();

private static final int MAX_WAIT_SECONDS = 16;

private SessionFsmFactory() {
}

public static SessionFsm newSessionFsm(OpcUaClient client) {
Long instanceId = INSTANCE_ID.incrementAndGet();

Map<String, String> mdc = new HashMap<>();
mdc.put("instance-id", String.valueOf(instanceId));

FsmBuilder<State, Event> builder = new FsmBuilder<>(
SessionFsm.LOGGER_NAME,
mdc,
client.getConfig().getExecutor(),
SessionFsm.LOGGER_NAME
instanceId
);

configureSessionFsm(builder, client);
Expand Down Expand Up @@ -281,12 +295,18 @@ private static void configureCreatingState(FsmBuilder<State, Event> fb, OpcUaCli
//noinspection Duplicates
createSession(ctx, client).whenComplete((csr, ex) -> {
if (csr != null) {
LOGGER.debug("[{}] CreateSession succeeded: {}", ctx.getInstanceId(), csr.getSessionId());
try (MDCCloseable ignored =
MDC.putCloseable("instance-id", ctx.getUserContext().toString())) {

LOGGER.debug("CreateSession succeeded: {}", csr.getSessionId());
}
ctx.fireEvent(new Event.CreateSessionSuccess(csr));
} else {
LOGGER.debug("[{}] CreateSession failed: {}", ctx.getInstanceId(), ex.getMessage(), ex);
try (MDCCloseable ignored =
MDC.putCloseable("instance-id", ctx.getUserContext().toString())) {

LOGGER.debug("CreateSession failed: {}", ex.getMessage(), ex);
}
ctx.fireEvent(new Event.CreateSessionFailure(ex));
}
});
Expand All @@ -299,12 +319,18 @@ private static void configureCreatingState(FsmBuilder<State, Event> fb, OpcUaCli
//noinspection Duplicates
createSession(ctx, client).whenComplete((csr, ex) -> {
if (csr != null) {
LOGGER.debug("[{}] CreateSession succeeded: {}", ctx.getInstanceId(), csr.getSessionId());
try (MDCCloseable ignored =
MDC.putCloseable("instance-id", ctx.getUserContext().toString())) {

LOGGER.debug("CreateSession succeeded: {}", csr.getSessionId());
}
ctx.fireEvent(new Event.CreateSessionSuccess(csr));
} else {
LOGGER.debug("[{}] CreateSession failed: {}", ctx.getInstanceId(), ex.getMessage(), ex);
try (MDCCloseable ignored =
MDC.putCloseable("instance-id", ctx.getUserContext().toString())) {

LOGGER.debug("CreateSession failed: {}", ex.getMessage(), ex);
}
ctx.fireEvent(new Event.CreateSessionFailure(ex));
}
});
Expand Down Expand Up @@ -353,12 +379,18 @@ private static void configureActivatingState(FsmBuilder<State, Event> fb, OpcUaC

activateSession(ctx, client, event.response).whenComplete((session, ex) -> {
if (session != null) {
LOGGER.debug("[{}] Session activated: {}", ctx.getInstanceId(), session);
try (MDCCloseable ignored =
MDC.putCloseable("instance-id", ctx.getUserContext().toString())) {

LOGGER.debug("Session activated: {}", session);
}
ctx.fireEvent(new Event.ActivateSessionSuccess(session));
} else {
LOGGER.debug("[{}] ActivateSession failed: {}", ctx.getInstanceId(), ex.getMessage(), ex);
try (MDCCloseable ignored =
MDC.putCloseable("instance-id", ctx.getUserContext().toString())) {

LOGGER.debug("ActivateSession failed: {}", ex.getMessage(), ex);
}
ctx.fireEvent(new Event.ActivateSessionFailure(ex));
}
});
Expand Down Expand Up @@ -407,14 +439,18 @@ private static void configureTransferringState(FsmBuilder<State, Event> fb, OpcU

transferSubscriptions(ctx, client, event.session).whenComplete((u, ex) -> {
if (u != null) {
LOGGER.debug("[{}] TransferSubscriptions succeeded", ctx.getInstanceId());
try (MDCCloseable ignored =
MDC.putCloseable("instance-id", ctx.getUserContext().toString())) {

LOGGER.debug("TransferSubscriptions succeeded");
}
ctx.fireEvent(new Event.TransferSubscriptionsSuccess(event.session));
} else {
LOGGER.debug(
"[{}] TransferSubscriptions failed: {}",
ctx.getInstanceId(), ex.getMessage(), ex);
try (MDCCloseable ignored =
MDC.putCloseable("instance-id", ctx.getUserContext().toString())) {

LOGGER.debug("TransferSubscriptions failed: {}", ex.getMessage(), ex);
}
ctx.fireEvent(new Event.TransferSubscriptionsFailure(ex));
}
});
Expand Down Expand Up @@ -465,12 +501,18 @@ private static void configureInitializingState(FsmBuilder<State, Event> fb, OpcU

initialize(ctx, client, session).whenComplete((u, ex) -> {
if (u != null) {
LOGGER.debug("[{}] Initialization succeeded: {}", ctx.getInstanceId(), session);
try (MDCCloseable ignored =
MDC.putCloseable("instance-id", ctx.getUserContext().toString())) {

LOGGER.debug("Initialization succeeded: {}", session);
}
ctx.fireEvent(new Event.InitializeSuccess(session));
} else {
LOGGER.warn("[{}] Initialization failed: {}", ctx.getInstanceId(), session, ex);
try (MDCCloseable ignored =
MDC.putCloseable("instance-id", ctx.getUserContext().toString())) {

LOGGER.warn("Initialization failed: {}", session, ex);
}
ctx.fireEvent(new Event.InitializeFailure(ex));
}
});
Expand Down Expand Up @@ -633,7 +675,11 @@ public void onStateTransition(
Object value = results[0].getValue().getValue();
if (value instanceof Integer) {
ServerState state = ServerState.from((Integer) value);
LOGGER.debug("[{}] ServerState: {}", ctx.getInstanceId(), state);
try (MDCCloseable ignored =
MDC.putCloseable("instance-id", ctx.getUserContext().toString())) {

LOGGER.debug("ServerState: {}", state);
}
}
}

Expand All @@ -652,10 +698,14 @@ public void onStateTransition(
long keepAliveFailuresAllowed = client.getConfig().getKeepAliveFailuresAllowed().longValue();

if (keepAliveFailureCount > keepAliveFailuresAllowed) {
LOGGER.warn(
"[{}] Keep Alive failureCount={} exceeds failuresAllowed={}",
ctx.getInstanceId(), keepAliveFailureCount, keepAliveFailuresAllowed
);
try (MDCCloseable ignored =
MDC.putCloseable("instance-id", ctx.getUserContext().toString())) {

LOGGER.warn(
"Keep Alive failureCount={} exceeds failuresAllowed={}",
keepAliveFailureCount, keepAliveFailuresAllowed
);
}

ctx.fireEvent(new Event.KeepAliveFailure());

Expand All @@ -671,10 +721,11 @@ public void onStateTransition(
}
}
} else {
LOGGER.debug(
"[{}] Keep Alive failureCount={}",
ctx.getInstanceId(), keepAliveFailureCount, ex
);
try (MDCCloseable ignored =
MDC.putCloseable("instance-id", ctx.getUserContext().toString())) {

LOGGER.debug("Keep Alive failureCount={}", keepAliveFailureCount, ex);
}
}
}
});
Expand Down Expand Up @@ -713,9 +764,17 @@ private static void configureClosingState(FsmBuilder<State, Event> fb, OpcUaClie

closeSession(ctx, client, session).whenComplete((u, ex) -> {
if (u != null) {
LOGGER.debug("[{}] Session closed: {}", ctx.getInstanceId(), session);
try (MDCCloseable ignored =
MDC.putCloseable("instance-id", ctx.getUserContext().toString())) {

LOGGER.debug("Session closed: {}", session);
}
} else {
LOGGER.debug("[{}] CloseSession failed: {}", ctx.getInstanceId(), ex.getMessage(), ex);
try (MDCCloseable ignored =
MDC.putCloseable("instance-id", ctx.getUserContext().toString())) {

LOGGER.debug("CloseSession failed: {}", ex.getMessage(), ex);
}
}

ctx.fireEvent(new Event.CloseSessionSuccess());
Expand Down Expand Up @@ -801,7 +860,11 @@ private static CompletableFuture<Unit> closeSession(

CloseSessionRequest request = new CloseSessionRequest(requestHeader, true);

LOGGER.debug("[{}] Sending CloseSessionRequest...", ctx.getInstanceId());
try (MDCCloseable ignored =
MDC.putCloseable("instance-id", ctx.getUserContext().toString())) {

LOGGER.debug("Sending CloseSessionRequest...");
}

stackClient.sendRequest(request).whenCompleteAsync(
(csr, ex2) -> {
Expand Down Expand Up @@ -867,7 +930,11 @@ private static CompletableFuture<CreateSessionResponse> createSession(
client.getConfig().getMaxResponseMessageSize()
);

LOGGER.debug("[{}] Sending CreateSessionRequest...", ctx.getInstanceId());
try (MDCCloseable ignored =
MDC.putCloseable("instance-id", ctx.getUserContext().toString())) {

LOGGER.debug("Sending CreateSessionRequest...");
}

return stackClient.sendRequest(request)
.thenApply(CreateSessionResponse.class::cast)
Expand Down Expand Up @@ -955,7 +1022,11 @@ private static CompletableFuture<OpcUaSession> activateSession(
userTokenSignature
);

LOGGER.debug("[{}] Sending ActivateSessionRequest...", ctx.getInstanceId());
try (MDCCloseable ignored =
MDC.putCloseable("instance-id", ctx.getUserContext().toString())) {

LOGGER.debug("Sending ActivateSessionRequest...");
}

return stackClient.sendRequest(request)
.thenApply(ActivateSessionResponse.class::cast)
Expand Down Expand Up @@ -1009,17 +1080,23 @@ private static CompletableFuture<Unit> transferSubscriptions(
true
);

LOGGER.debug("[{}] Sending TransferSubscriptionsRequest...", ctx.getInstanceId());
try (MDCCloseable ignored =
MDC.putCloseable("instance-id", ctx.getUserContext().toString())) {

LOGGER.debug("Sending TransferSubscriptionsRequest...");
}

stackClient.sendRequest(request)
.thenApply(TransferSubscriptionsResponse.class::cast)
.whenComplete((tsr, ex) -> {
if (tsr != null) {
List<TransferResult> results = l(tsr.getResults());

LOGGER.debug(
"[{}] TransferSubscriptions supported: {}",
ctx.getInstanceId(), tsr.getResponseHeader().getServiceResult());
try (MDCCloseable ignored =
MDC.putCloseable("instance-id", ctx.getUserContext().toString())) {

LOGGER.debug("TransferSubscriptions supported: {}", tsr.getResponseHeader().getServiceResult());
}

if (LOGGER.isDebugEnabled()) {
try {
Expand All @@ -1037,11 +1114,17 @@ private static CompletableFuture<Unit> transferSubscriptions(
.map(sa -> sa[0]).orElse(s.toString()))
).toArray(String[]::new);

LOGGER.debug(
"[{}] TransferSubscriptions results: {}",
ctx.getInstanceId(), Arrays.toString(ss));
try (MDCCloseable ignored =
MDC.putCloseable("instance-id", ctx.getUserContext().toString())) {

LOGGER.debug("TransferSubscriptions results: {}", Arrays.toString(ss));
}
} catch (Throwable t) {
LOGGER.error("[{}] error logging TransferSubscription results", ctx.getInstanceId(), t);
try (MDCCloseable ignored =
MDC.putCloseable("instance-id", ctx.getUserContext().toString())) {

LOGGER.error("error logging TransferSubscription results", t);
}
}
}

Expand All @@ -1066,7 +1149,11 @@ private static CompletableFuture<Unit> transferSubscriptions(
.map(UaException::getStatusCode)
.orElse(StatusCode.BAD);

LOGGER.debug("[{}] TransferSubscriptions not supported: {}", ctx.getInstanceId(), statusCode);
try (MDCCloseable ignored =
MDC.putCloseable("instance-id", ctx.getUserContext().toString())) {

LOGGER.debug("TransferSubscriptions not supported: {}", statusCode);
}

client.getConfig().getExecutor().execute(() -> {
// transferFailed() will remove the subscription, but that is okay
Expand Down Expand Up @@ -1216,7 +1303,11 @@ public void onServiceFault(ServiceFault serviceFault) {
StatusCode serviceResult = serviceFault.getResponseHeader().getServiceResult();

if (SESSION_ERROR.or(SECURE_CHANNEL_ERROR).test(serviceResult)) {
logger.debug("[{}] ServiceFault: {}", fsm.getFromContext(FsmContext::getInstanceId), serviceResult);
try (MDCCloseable ignored =
MDC.putCloseable("instance-id", fsm.getFromContext(ctx -> ctx.getUserContext().toString()))) {

logger.debug("ServiceFault: {}", serviceResult);
}

fsm.fireEvent(new Event.ServiceFault(serviceResult));
}
Expand Down
4 changes: 2 additions & 2 deletions opc-ua-stack/stack-client/pom.xml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Copyright (c) 2022 the Eclipse Milo Authors
~ Copyright (c) 2025 the Eclipse Milo Authors
~
~ This program and the accompanying materials are made
~ available under the terms of the Eclipse Public License 2.0
Expand Down Expand Up @@ -33,7 +33,7 @@
<dependency>
<groupId>com.digitalpetri.netty</groupId>
<artifactId>netty-channel-fsm</artifactId>
<version>0.9</version>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
Expand Down
Loading
Loading