Skip to content

Commit

Permalink
Context local data SPI.
Browse files Browse the repository at this point in the history
  • Loading branch information
vietj committed Dec 21, 2023
1 parent 74a8cc6 commit 1f8c781
Show file tree
Hide file tree
Showing 17 changed files with 302 additions and 69 deletions.
22 changes: 22 additions & 0 deletions src/main/java/io/vertx/core/Context.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.vertx.codegen.annotations.VertxGen;
import io.vertx.core.impl.VertxThread;
import io.vertx.core.json.JsonObject;
import io.vertx.core.spi.context.ContextKey;

import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -244,6 +245,27 @@ default List<String> processArgs() {
*/
boolean removeLocal(Object key);

/**
* Get some local data from the context.
*
* @param key the key of the data
* @param <T> the type of the data
* @return the data
*/
@GenIgnore(GenIgnore.PERMITTED_TYPE)
<T> T getLocal(ContextKey<T> key);

/**
* Put some local data in the context.
* <p>
* This can be used to share data between different handlers that share a context
*
* @param key the key of the data
* @param value the data
*/
@GenIgnore(GenIgnore.PERMITTED_TYPE)
<T> void putLocal(ContextKey<T> key, T value);

/**
* @return The Vertx instance that created the context
*/
Expand Down
51 changes: 51 additions & 0 deletions src/main/java/io/vertx/core/impl/ContextBase.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Copyright (c) 2011-2023 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.core.impl;

import io.vertx.core.spi.context.ContextKey;

import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;

/**
* Base class for context.
*
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
*/
class ContextBase {

private static final VarHandle LOCALS_UPDATER = MethodHandles.arrayElementVarHandle(Object[].class);

final Object[] locals;

ContextBase(Object[] locals) {
this.locals = locals;
}

public final <T> T getLocal(ContextKey<T> key) {
ContextKeyImpl<T> internalKey = (ContextKeyImpl<T>) key;
int index = internalKey.index;
if (index >= locals.length) {
throw new IllegalArgumentException();
}
Object res = LOCALS_UPDATER.getVolatile(locals, index);
return (T) res;
}

public final <T> void putLocal(ContextKey<T> key, T value) {
ContextKeyImpl<T> internalKey = (ContextKeyImpl<T>) key;
int index = internalKey.index;
if (index >= locals.length) {
throw new IllegalArgumentException();
}
LOCALS_UPDATER.setVolatile(locals, index, value);
}
}
9 changes: 7 additions & 2 deletions src/main/java/io/vertx/core/impl/ContextImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@
import io.vertx.core.impl.logging.Logger;
import io.vertx.core.impl.logging.LoggerFactory;
import io.vertx.core.json.JsonObject;
import io.vertx.core.spi.context.ContextKey;
import io.vertx.core.spi.metrics.PoolMetrics;
import io.vertx.core.spi.tracing.VertxTracer;

import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.util.concurrent.*;

/**
Expand All @@ -28,7 +31,7 @@
* @author <a href="http://tfox.org">Tim Fox</a>
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
*/
public final class ContextImpl implements ContextInternal {
public final class ContextImpl extends ContextBase implements ContextInternal {

private static final Logger log = LoggerFactory.getLogger(ContextImpl.class);

Expand All @@ -52,6 +55,7 @@ public final class ContextImpl implements ContextInternal {
final TaskQueue orderedTasks;

protected ContextImpl(VertxInternal vertx,
Object[] locals,
ThreadingModel threadingModel,
EventLoop eventLoop,
EventExecutor executor,
Expand All @@ -61,6 +65,7 @@ protected ContextImpl(VertxInternal vertx,
Deployment deployment,
CloseFuture closeFuture,
ClassLoader tccl) {
super(locals);
this.threadingModel = threadingModel;
this.deployment = deployment;
this.config = deployment != null ? deployment.config() : new JsonObject();
Expand Down Expand Up @@ -306,6 +311,6 @@ protected <T> void emit(ContextInternal ctx, T argument, Handler<T> task) {

@Override
public ContextInternal duplicate() {
return new DuplicatedContext(this);
return new DuplicatedContext(this, locals.length == 0 ? VertxImpl.EMPTY_CONTEXT_LOCALS : new Object[locals.length]);
}
}
5 changes: 5 additions & 0 deletions src/main/java/io/vertx/core/impl/ContextInternal.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.vertx.core.impl.future.PromiseImpl;
import io.vertx.core.impl.future.PromiseInternal;
import io.vertx.core.impl.future.SucceededFuture;
import io.vertx.core.spi.context.ContextKey;
import io.vertx.core.spi.tracing.VertxTracer;

import java.util.concurrent.Callable;
Expand Down Expand Up @@ -306,17 +307,20 @@ default boolean remove(Object key) {
*/
ConcurrentMap<Object, Object> localContextData();

@Deprecated
@SuppressWarnings("unchecked")
@Override
default <T> T getLocal(Object key) {
return (T) localContextData().get(key);
}

@Deprecated
@Override
default void putLocal(Object key, Object value) {
localContextData().put(key, value);
}

@Deprecated
@Override
default boolean removeLocal(Object key) {
return localContextData().remove(key) != null;
Expand Down Expand Up @@ -445,4 +449,5 @@ default ContextInternal unwrap() {
default boolean isDuplicate() {
return false;
}

}
24 changes: 24 additions & 0 deletions src/main/java/io/vertx/core/impl/ContextKeyImpl.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright (c) 2011-2023 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.core.impl;

import io.vertx.core.spi.context.ContextKey;

/**
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
*/
public class ContextKeyImpl<T> implements ContextKey<T> {

final int index = KeySeq.next();

public ContextKeyImpl() {
}
}
8 changes: 5 additions & 3 deletions src/main/java/io/vertx/core/impl/DuplicatedContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.vertx.core.Handler;
import io.vertx.core.ThreadingModel;
import io.vertx.core.json.JsonObject;
import io.vertx.core.spi.context.ContextKey;
import io.vertx.core.spi.tracing.VertxTracer;

import java.util.concurrent.Callable;
Expand All @@ -34,12 +35,13 @@
*
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
*/
class DuplicatedContext implements ContextInternal {
final class DuplicatedContext extends ContextBase implements ContextInternal {

protected final ContextImpl delegate;
private ConcurrentMap<Object, Object> localData;

DuplicatedContext(ContextImpl delegate) {
DuplicatedContext(ContextImpl delegate, Object[] locals) {
super(locals);
this.delegate = delegate;
}

Expand Down Expand Up @@ -176,7 +178,7 @@ public boolean isWorkerContext() {

@Override
public ContextInternal duplicate() {
return new DuplicatedContext(delegate);
return new DuplicatedContext(delegate, locals.length == 0 ? VertxImpl.EMPTY_CONTEXT_LOCALS : new Object[locals.length]);
}

@Override
Expand Down
36 changes: 36 additions & 0 deletions src/main/java/io/vertx/core/impl/KeySeq.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright (c) 2011-2023 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.core.impl;

import java.util.concurrent.atomic.AtomicInteger;

/**
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
*/
class KeySeq {

private static final AtomicInteger seq = new AtomicInteger();

/**
* Hook for testing purposes
*/
static void reset() {
seq.set((0));
}

static int get() {
return seq.get();
}

static int next() {
return seq.getAndIncrement();
}
}
17 changes: 14 additions & 3 deletions src/main/java/io/vertx/core/impl/VertxImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ public class VertxImpl implements VertxInternal, MetricsProvider {

private static final Logger log = LoggerFactory.getLogger(VertxImpl.class);

static final Object[] EMPTY_CONTEXT_LOCALS = new Object[0];
private static final String CLUSTER_MAP_NAME = "__vertx.haInfo";
private static final String NETTY_IO_RATIO_PROPERTY_NAME = "vertx.nettyIORatio";
private static final int NETTY_IO_RATIO = Integer.getInteger(NETTY_IO_RATIO_PROPERTY_NAME, 50);
Expand Down Expand Up @@ -140,6 +141,7 @@ public class VertxImpl implements VertxInternal, MetricsProvider {
private final FileResolver fileResolver;
private final Map<ServerID, HttpServerImpl> sharedHttpServers = new HashMap<>();
private final Map<ServerID, NetServerImpl> sharedNetServers = new HashMap<>();
private final int contextLocals;
final WorkerPool workerPool;
final WorkerPool internalWorkerPool;
final WorkerPool virtualThreaWorkerPool;
Expand Down Expand Up @@ -193,6 +195,7 @@ public class VertxImpl implements VertxInternal, MetricsProvider {
ExecutorService internalWorkerExec = executorServiceFactory.createExecutor(internalWorkerThreadFactory, internalBlockingPoolSize, internalBlockingPoolSize);
PoolMetrics internalBlockingPoolMetrics = metrics != null ? metrics.createPoolMetrics("worker", "vert.x-internal-blocking", internalBlockingPoolSize) : null;

contextLocals = KeySeq.get();
closeFuture = new CloseFuture(log);
maxEventLoopExecTime = maxEventLoopExecuteTime;
maxEventLoopExecTimeUnit = maxEventLoopExecuteTimeUnit;
Expand Down Expand Up @@ -540,8 +543,16 @@ public boolean cancelTimer(long id) {
}
}

private Object[] createContextLocals() {
if (contextLocals == 0) {
return EMPTY_CONTEXT_LOCALS;
} else {
return new Object[contextLocals];
}
}

private ContextImpl createEventLoopContext(EventLoop eventLoop, CloseFuture closeFuture, WorkerPool workerPool, Deployment deployment, ClassLoader tccl) {
return new ContextImpl(this, ThreadingModel.EVENT_LOOP, eventLoop, new EventLoopExecutor(eventLoop), internalWorkerPool, workerPool != null ? workerPool : this.workerPool, new TaskQueue(), deployment, closeFuture, disableTCCL ? null : tccl);
return new ContextImpl(this, createContextLocals(), ThreadingModel.EVENT_LOOP, eventLoop, new EventLoopExecutor(eventLoop), internalWorkerPool, workerPool != null ? workerPool : this.workerPool, new TaskQueue(), deployment, closeFuture, disableTCCL ? null : tccl);
}

@Override
Expand All @@ -562,7 +573,7 @@ public ContextImpl createEventLoopContext() {
private ContextImpl createWorkerContext(EventLoop eventLoop, CloseFuture closeFuture, WorkerPool workerPool, Deployment deployment, ClassLoader tccl) {
TaskQueue orderedTasks = new TaskQueue();
WorkerPool wp = workerPool != null ? workerPool : this.workerPool;
return new ContextImpl(this, ThreadingModel.WORKER, eventLoop, new WorkerExecutor(wp, orderedTasks), internalWorkerPool, wp, orderedTasks, deployment, closeFuture, disableTCCL ? null : tccl);
return new ContextImpl(this, createContextLocals(), ThreadingModel.WORKER, eventLoop, new WorkerExecutor(wp, orderedTasks), internalWorkerPool, wp, orderedTasks, deployment, closeFuture, disableTCCL ? null : tccl);
}

@Override
Expand All @@ -585,7 +596,7 @@ private ContextImpl createVirtualThreadContext(EventLoop eventLoop, CloseFuture
throw new IllegalStateException("This Java runtime does not support virtual threads");
}
TaskQueue orderedTasks = new TaskQueue();
return new ContextImpl(this, ThreadingModel.VIRTUAL_THREAD, eventLoop, new WorkerExecutor(virtualThreaWorkerPool, orderedTasks), internalWorkerPool, virtualThreaWorkerPool, orderedTasks, deployment, closeFuture, disableTCCL ? null : tccl);
return new ContextImpl(this, createContextLocals(), ThreadingModel.VIRTUAL_THREAD, eventLoop, new WorkerExecutor(virtualThreaWorkerPool, orderedTasks), internalWorkerPool, virtualThreaWorkerPool, orderedTasks, deployment, closeFuture, disableTCCL ? null : tccl);
}

@Override
Expand Down
34 changes: 34 additions & 0 deletions src/main/java/io/vertx/core/spi/context/ContextKey.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright (c) 2011-2023 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.core.spi.context;

import io.vertx.core.impl.ContextKeyImpl;

/**
* A context key to address local context data.
*
* @author <a href="mailto:julien@julienviet.com">Julien Viet</a>
*/
public interface ContextKey<T> {

/**
* Register a key.
*
* Keys should be registered prior creating a {@link io.vertx.core.Vertx} instance as a static field, once registered
* a key cannot be unregistered.
*
* @param type the type of context data
* @return the context key
*/
static <T> ContextKey<T> registerKey(Class<T> type) {
return new ContextKeyImpl<>();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ public static ContextInternal create(Vertx vertx) {
VertxImpl impl = (VertxImpl) vertx;
return new ContextImpl(
impl,
new Object[0],
ThreadingModel.WORKER,
impl.getEventLoopGroup().next(),
EXECUTOR,
Expand Down
7 changes: 5 additions & 2 deletions src/test/java/io/vertx/core/ContextTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import io.netty.channel.EventLoop;
import io.vertx.core.impl.*;
import io.vertx.core.impl.future.PromiseInternal;
import io.vertx.core.spi.context.ContextKey;
import io.vertx.test.core.VertxTestBase;
import org.junit.Assume;
import org.junit.Test;
Expand All @@ -38,6 +39,8 @@
*/
public class ContextTest extends VertxTestBase {

private static final ContextKey<Object> CONTEXT_KEY = ContextKey.registerKey(Object.class);

private ExecutorService workerExecutor;

private ContextInternal createWorkerContext() {
Expand Down Expand Up @@ -444,9 +447,9 @@ private void checkDuplicate(ContextInternal ctx, ContextInternal duplicated) thr
Object shared = new Object();
Object local = new Object();
ctx.put("key", shared);
ctx.putLocal("key", local);
ctx.putLocal(CONTEXT_KEY, local);
assertSame(shared, duplicated.get("key"));
assertNull(duplicated.getLocal("key"));
assertNull(duplicated.getLocal(CONTEXT_KEY));
assertTrue(duplicated.remove("key"));
assertNull(ctx.get("key"));

Expand Down
Loading

0 comments on commit 1f8c781

Please sign in to comment.