Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ensureCustomSerialization to ensure that headers are serialized correctly with multiple transport hops #4741

Merged
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,12 @@ public class SecurityInterceptorTests {
private Connection connection3;
private DiscoveryNode otherRemoteNode;
private Connection connection4;
private DiscoveryNode remoteNodeWithCustomSerialization;
private Connection connection5;

private AsyncSender sender;
private AsyncSender serializedSender;
private AsyncSender jdkSerializedSender;
private AsyncSender customSerializedSender;
private AtomicReference<CountDownLatch> senderLatch = new AtomicReference<>(new CountDownLatch(1));

@Before
Expand Down Expand Up @@ -199,7 +202,14 @@ public void setup() {
otherRemoteNode = new DiscoveryNode("remote-node2", new TransportAddress(remoteAddress, 9876), remoteNodeVersion);
connection4 = transportService.getConnection(otherRemoteNode);

serializedSender = new AsyncSender() {
remoteNodeWithCustomSerialization = new DiscoveryNode(
peternied marked this conversation as resolved.
Show resolved Hide resolved
"remote-node-with-custom-serialization",
new TransportAddress(localAddress, 7456),
Version.V_2_12_0
);
connection5 = transportService.getConnection(remoteNodeWithCustomSerialization);

jdkSerializedSender = new AsyncSender() {
@Override
public <T extends TransportResponse> void sendRequest(
Connection connection,
Expand All @@ -209,11 +219,27 @@ public <T extends TransportResponse> void sendRequest(
TransportResponseHandler<T> handler
) {
String serializedUserHeader = threadPool.getThreadContext().getHeader(ConfigConstants.OPENDISTRO_SECURITY_USER_HEADER);
System.out.println("serializedUserHeader: " + serializedUserHeader);
assertThat(serializedUserHeader, is(Base64Helper.serializeObject(user, true)));
senderLatch.get().countDown();
}
};

customSerializedSender = new AsyncSender() {
@Override
public <T extends TransportResponse> void sendRequest(
Connection connection,
String action,
TransportRequest request,
TransportRequestOptions options,
TransportResponseHandler<T> handler
) {
String serializedUserHeader = threadPool.getThreadContext().getHeader(ConfigConstants.OPENDISTRO_SECURITY_USER_HEADER);
assertThat(serializedUserHeader, is(Base64Helper.serializeObject(user, false)));
senderLatch.get().countDown();
}
};

sender = new AsyncSender() {
@Override
public <T extends TransportResponse> void sendRequest(
Expand Down Expand Up @@ -265,6 +291,27 @@ final void completableRequestDecorate(
senderLatch.set(new CountDownLatch(1));
}

@SuppressWarnings({ "rawtypes", "unchecked" })
final void completableRequestDecorateWithPreviouslyPopulatedHeaders(
AsyncSender sender,
Connection connection,
String action,
TransportRequest request,
TransportRequestOptions options,
TransportResponseHandler handler,
DiscoveryNode localNode
) {
securityInterceptor.sendRequestDecorate(sender, connection, action, request, options, handler, localNode);
try {
senderLatch.get().await(1, TimeUnit.SECONDS);
} catch (final InterruptedException e) {
throw new RuntimeException(e);
}

// Reset the latch so another request can be processed
senderLatch.set(new CountDownLatch(1));
}

@Test
public void testSendRequestDecorateLocalConnection() {

Expand All @@ -278,16 +325,30 @@ public void testSendRequestDecorateLocalConnection() {
public void testSendRequestDecorateRemoteConnection() {

// this is a remote request
completableRequestDecorate(serializedSender, connection3, action, request, options, handler, localNode);
completableRequestDecorate(jdkSerializedSender, connection3, action, request, options, handler, localNode);
// this is a remote request where the transport address is different
completableRequestDecorate(serializedSender, connection4, action, request, options, handler, localNode);
completableRequestDecorate(jdkSerializedSender, connection4, action, request, options, handler, localNode);
}

@Test
public void testSendRequestDecorateRemoteConnectionUsesCustomSerialization() {
threadPool.getThreadContext().putHeader(ConfigConstants.OPENDISTRO_SECURITY_USER_HEADER, Base64Helper.serializeObject(user));
completableRequestDecorateWithPreviouslyPopulatedHeaders(
customSerializedSender,
connection5,
action,
request,
options,
handler,
localNode
);
}

@Test
public void testSendNoOriginNodeCausesSerialization() {

// this is a request where the local node is null; have to use the remote connection since the serialization will fail
completableRequestDecorate(serializedSender, connection3, action, request, options, handler, null);
completableRequestDecorate(jdkSerializedSender, connection3, action, request, options, handler, null);
}

@Test
Expand All @@ -296,7 +357,7 @@ public void testSendNoConnectionShouldThrowNPE() {
// The completable version swallows the NPE so have to call actual method
assertThrows(
java.lang.NullPointerException.class,
() -> securityInterceptor.sendRequestDecorate(serializedSender, null, action, request, options, handler, localNode)
() -> securityInterceptor.sendRequestDecorate(jdkSerializedSender, null, action, request, options, handler, localNode)
);
}

Expand Down Expand Up @@ -328,7 +389,7 @@ public void testCustomRemoteAddressCausesSerialization() {
ConfigConstants.OPENDISTRO_SECURITY_REMOTE_ADDRESS,
String.valueOf(new TransportAddress(new InetSocketAddress("8.8.8.8", 80)))
);
completableRequestDecorate(serializedSender, connection3, action, request, options, handler, localNode);
completableRequestDecorate(jdkSerializedSender, connection3, action, request, options, handler, localNode);
}

@Test
Expand All @@ -351,7 +412,7 @@ public void testFakeHeaderIsIgnored() {
// this is a local request
completableRequestDecorate(sender, connection1, action, request, options, handler, localNode);
// this is a remote request
completableRequestDecorate(serializedSender, connection3, action, request, options, handler, localNode);
completableRequestDecorate(jdkSerializedSender, connection3, action, request, options, handler, localNode);
}

@Test
Expand All @@ -363,7 +424,7 @@ public void testNullHeaderIsIgnored() {
// this is a local request
completableRequestDecorate(sender, connection1, action, request, options, handler, localNode);
// this is a remote request
completableRequestDecorate(serializedSender, connection3, action, request, options, handler, localNode);
completableRequestDecorate(jdkSerializedSender, connection3, action, request, options, handler, localNode);
}

@Test
Expand Down
Loading