Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

runtime: add a netty4-http based knative component #234

Merged
merged 1 commit into from
Nov 22, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 108 additions & 0 deletions runtime/camel-knative-http/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--

Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.

-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.camel.k</groupId>
<artifactId>camel-k-runtime-parent</artifactId>
<version>0.0.6-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>camel-knative-http</artifactId>

<dependencies>

<!-- ****************************** -->
<!-- -->
<!-- RUNTIME -->
<!-- -->
<!-- ****************************** -->

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
</dependency>

<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-core</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-netty4-http</artifactId>
</dependency>

<!-- ****************************** -->
<!-- -->
<!-- TESTS -->
<!-- -->
<!-- ****************************** -->

<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-test</artifactId>
<exclusions>
<exclusion>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
</exclusion>
</exclusions>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<version>${junit-jupiter.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<version>${junit-jupiter.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>${assertj.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j2.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j2.version}</version>
<scope>test</scope>
</dependency>

</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.knative.http;

public final class KnativeHttp {
private KnativeHttp() {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,263 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.camel.component.knative.http;

import java.net.URI;
import java.nio.channels.ClosedChannelException;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;

import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.DefaultHttpResponse;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.util.Attribute;
import io.netty.util.AttributeKey;
import org.apache.camel.Exchange;
import org.apache.camel.component.netty4.http.HttpServerConsumerChannelFactory;
import org.apache.camel.component.netty4.http.NettyHttpComponent;
import org.apache.camel.component.netty4.http.NettyHttpConsumer;
import org.apache.camel.component.netty4.http.handlers.HttpServerChannelHandler;
import org.apache.camel.http.common.CamelServlet;
import org.apache.camel.support.RestConsumerContextPathMatcher;
import org.apache.camel.util.ObjectHelper;
import org.apache.camel.util.ServiceHelper;
import org.apache.camel.util.URISupport;
import org.apache.camel.util.UnsafeUriCharactersEncoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static io.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED;
import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;

public class KnativeHttpComponent extends NettyHttpComponent {
private final Map<Integer, HttpServerConsumerChannelFactory> handlers = new ConcurrentHashMap<>();

@Override
public synchronized HttpServerConsumerChannelFactory getMultiplexChannelHandler(int port) {
return handlers.computeIfAbsent(port, Handler::new);
}

@Override
protected void doStop() throws Exception {
super.doStop();

ServiceHelper.stopService(handlers.values());
handlers.clear();
}

@ChannelHandler.Sharable
private static class Handler extends SimpleChannelInboundHandler<Object> implements HttpServerConsumerChannelFactory {
private static final Logger LOG = LoggerFactory.getLogger(Handler.class);
private static final AttributeKey<HttpServerChannelHandler> SERVER_HANDLER_KEY = AttributeKey.valueOf("serverHandler");

private final Set<HttpServerChannelHandler> consumers;
private final int port;
private final String token;
private final int len;

public Handler(int port) {
this.consumers = new CopyOnWriteArraySet<>();
this.port = port;
this.token = ":" + port;
this.len = token.length();
}

public void init(int port) {
}

public void addConsumer(NettyHttpConsumer consumer) {
consumers.add(new HttpServerChannelHandler(consumer));
}

public void removeConsumer(NettyHttpConsumer consumer) {
consumers.removeIf(h -> h.getConsumer() == consumer);
}

public int consumers() {
return consumers.size();
}

public int getPort() {
return port;
}

public ChannelHandler getChannelHandler() {
return this;
}

@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
// store request, as this channel handler is created per pipeline
HttpRequest request = (HttpRequest) msg;

LOG.debug("Message received: {}", request);

HttpServerChannelHandler handler = getHandler(request, request.method().name());
if (handler != null) {
Attribute<HttpServerChannelHandler> attr = ctx.channel().attr(SERVER_HANDLER_KEY);
// store handler as attachment
attr.set(handler);
if (msg instanceof HttpContent) {
// need to hold the reference of content
HttpContent httpContent = (HttpContent) msg;
httpContent.content().retain();
}
handler.channelRead(ctx, request);
} else {
// okay we cannot process this requires so return either 404 or 405.
// to know if its 405 then we need to check if any other HTTP method would have a consumer for the "same" request
boolean hasAnyMethod = CamelServlet.METHODS.stream().anyMatch(m -> isHttpMethodAllowed(request, m));
HttpResponse response = null;
if (hasAnyMethod) {
//method match error, return 405
response = new DefaultHttpResponse(HTTP_1_1, METHOD_NOT_ALLOWED);
} else {
// this resource is not found, return 404
response = new DefaultHttpResponse(HTTP_1_1, NOT_FOUND);
}
response.headers().set(Exchange.CONTENT_TYPE, "text/plain");
response.headers().set(Exchange.CONTENT_LENGTH, 0);
ctx.writeAndFlush(response);
ctx.close();
}
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
Attribute<HttpServerChannelHandler> attr = ctx.channel().attr(SERVER_HANDLER_KEY);
HttpServerChannelHandler handler = attr.get();
if (handler != null) {
handler.exceptionCaught(ctx, cause);
} else {
if (cause instanceof ClosedChannelException) {
// The channel is closed so we do nothing here
LOG.debug("Channel already closed. Ignoring this exception.");
return;
} else {
// we cannot throw the exception here
LOG.warn("HttpServerChannelHandler is not found as attachment to handle exception, send 404 back to the client.", cause);
// Now we just send 404 back to the client
HttpResponse response = new DefaultHttpResponse(HTTP_1_1, NOT_FOUND);
response.headers().set(Exchange.CONTENT_TYPE, "text/plain");
response.headers().set(Exchange.CONTENT_LENGTH, 0);
ctx.writeAndFlush(response);
ctx.close();
}
}
}

private boolean isHttpMethodAllowed(HttpRequest request, String method) {
return getHandler(request, method) != null;
}

@SuppressWarnings("unchecked")
private HttpServerChannelHandler getHandler(HttpRequest request, String method) {
HttpServerChannelHandler answer = null;

// need to strip out host and port etc, as we only need the context-path for matching
if (method == null) {
return null;
}

String path = request.uri();
int idx = path.indexOf(token);
if (idx > -1) {
path = path.substring(idx + len);
}
// use the path as key to find the consumer handler to use
path = pathAsKey(path);

/*
List<RestConsumerContextPathMatcher.ConsumerPath> paths = new ArrayList<>();
for (final HttpServerChannelHandler handler : consumers) {
paths.add(new HttpRestConsumerPath(handler));
}

RestConsumerContextPathMatcher.ConsumerPath<HttpServerChannelHandler> best = RestConsumerContextPathMatcher.matchBestPath(method, path, paths);
if (best != null) {
answer = best.getConsumer();
}
*/

// fallback to regular matching
if (answer == null) {
for (final HttpServerChannelHandler handler : consumers) {
try {
final NettyHttpConsumer consumer = handler.getConsumer();
final HttpHeaders headers = request.headers();
final String uri = consumer.getEndpoint().getEndpointUri();
final Map<String, Object> params = URISupport.parseParameters(URI.create(uri));

if (params.containsKey("filter.headerName") && params.containsKey("filter.headerValue")) {
final String filterKey = (String) params.get("filter.headerName");
final String filterVal = (String) params.get("filter.headerValue");
final String headerVal = headers.get(filterKey);

if (ObjectHelper.isEmpty(headerVal)) {
continue;
}
if (!ObjectHelper.equal(filterVal, headerVal)) {
continue;
}
}

String consumerPath = consumer.getConfiguration().getPath();
boolean matchOnUriPrefix = consumer.getEndpoint().getConfiguration().isMatchOnUriPrefix();
// Just make sure the we get the right consumer path first
if (RestConsumerContextPathMatcher.matchPath(path, consumerPath, matchOnUriPrefix)) {
answer = handler;
break;
}
} catch (Exception e) {
throw ObjectHelper.wrapRuntimeCamelException(e);
}
}
}

return answer;
}

private static String pathAsKey(String path) {
// cater for default path
if (path == null || path.equals("/")) {
path = "";
}

// strip out query parameters
int idx = path.indexOf('?');
if (idx > -1) {
path = path.substring(0, idx);
}

// strip of ending /
if (path.endsWith("/")) {
path = path.substring(0, path.length() - 1);
}

return UnsafeUriCharactersEncoder.encodeHttpURI(path);
}

}
}
Loading