-
Notifications
You must be signed in to change notification settings - Fork 352
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
runtime: add a netty4-http based knative component
- Loading branch information
1 parent
c104443
commit c9aadd1
Showing
14 changed files
with
711 additions
and
11 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,108 @@ | ||
<?xml version="1.0" encoding="UTF-8"?> | ||
<!-- | ||
Licensed to the Apache Software Foundation (ASF) under one or more | ||
contributor license agreements. See the NOTICE file distributed with | ||
this work for additional information regarding copyright ownership. | ||
The ASF licenses this file to You under the Apache License, Version 2.0 | ||
(the "License"); you may not use this file except in compliance with | ||
the License. You may obtain a copy of the License at | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
--> | ||
<project xmlns="http://maven.apache.org/POM/4.0.0" | ||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | ||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | ||
<parent> | ||
<groupId>org.apache.camel.k</groupId> | ||
<artifactId>camel-k-runtime-parent</artifactId> | ||
<version>0.0.6-SNAPSHOT</version> | ||
</parent> | ||
<modelVersion>4.0.0</modelVersion> | ||
|
||
<artifactId>camel-knative-http</artifactId> | ||
|
||
<dependencies> | ||
|
||
<!-- ****************************** --> | ||
<!-- --> | ||
<!-- RUNTIME --> | ||
<!-- --> | ||
<!-- ****************************** --> | ||
|
||
<dependency> | ||
<groupId>org.slf4j</groupId> | ||
<artifactId>slf4j-api</artifactId> | ||
<version>${slf4j.version}</version> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>org.apache.camel</groupId> | ||
<artifactId>camel-core</artifactId> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.camel</groupId> | ||
<artifactId>camel-netty4-http</artifactId> | ||
</dependency> | ||
|
||
<!-- ****************************** --> | ||
<!-- --> | ||
<!-- TESTS --> | ||
<!-- --> | ||
<!-- ****************************** --> | ||
|
||
<dependency> | ||
<groupId>org.apache.camel</groupId> | ||
<artifactId>camel-test</artifactId> | ||
<exclusions> | ||
<exclusion> | ||
<groupId>junit</groupId> | ||
<artifactId>junit</artifactId> | ||
</exclusion> | ||
</exclusions> | ||
<scope>test</scope> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>org.junit.jupiter</groupId> | ||
<artifactId>junit-jupiter-api</artifactId> | ||
<version>${junit-jupiter.version}</version> | ||
<scope>test</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.junit.jupiter</groupId> | ||
<artifactId>junit-jupiter-engine</artifactId> | ||
<version>${junit-jupiter.version}</version> | ||
<scope>test</scope> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>org.assertj</groupId> | ||
<artifactId>assertj-core</artifactId> | ||
<version>${assertj.version}</version> | ||
<scope>test</scope> | ||
</dependency> | ||
|
||
<dependency> | ||
<groupId>org.apache.logging.log4j</groupId> | ||
<artifactId>log4j-core</artifactId> | ||
<version>${log4j2.version}</version> | ||
<scope>test</scope> | ||
</dependency> | ||
<dependency> | ||
<groupId>org.apache.logging.log4j</groupId> | ||
<artifactId>log4j-slf4j-impl</artifactId> | ||
<version>${log4j2.version}</version> | ||
<scope>test</scope> | ||
</dependency> | ||
|
||
</dependencies> | ||
|
||
</project> |
22 changes: 22 additions & 0 deletions
22
...camel-knative-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttp.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
/** | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.camel.component.knative.http; | ||
|
||
public final class KnativeHttp { | ||
private KnativeHttp() { | ||
} | ||
} |
263 changes: 263 additions & 0 deletions
263
...tive-http/src/main/java/org/apache/camel/component/knative/http/KnativeHttpComponent.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,263 @@ | ||
/** | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package org.apache.camel.component.knative.http; | ||
|
||
import java.net.URI; | ||
import java.nio.channels.ClosedChannelException; | ||
import java.util.Map; | ||
import java.util.Set; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.concurrent.CopyOnWriteArraySet; | ||
|
||
import io.netty.channel.ChannelHandler; | ||
import io.netty.channel.ChannelHandlerContext; | ||
import io.netty.channel.SimpleChannelInboundHandler; | ||
import io.netty.handler.codec.http.DefaultHttpResponse; | ||
import io.netty.handler.codec.http.HttpContent; | ||
import io.netty.handler.codec.http.HttpHeaders; | ||
import io.netty.handler.codec.http.HttpRequest; | ||
import io.netty.handler.codec.http.HttpResponse; | ||
import io.netty.util.Attribute; | ||
import io.netty.util.AttributeKey; | ||
import org.apache.camel.Exchange; | ||
import org.apache.camel.component.netty4.http.HttpServerConsumerChannelFactory; | ||
import org.apache.camel.component.netty4.http.NettyHttpComponent; | ||
import org.apache.camel.component.netty4.http.NettyHttpConsumer; | ||
import org.apache.camel.component.netty4.http.handlers.HttpServerChannelHandler; | ||
import org.apache.camel.http.common.CamelServlet; | ||
import org.apache.camel.support.RestConsumerContextPathMatcher; | ||
import org.apache.camel.util.ObjectHelper; | ||
import org.apache.camel.util.ServiceHelper; | ||
import org.apache.camel.util.URISupport; | ||
import org.apache.camel.util.UnsafeUriCharactersEncoder; | ||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import static io.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED; | ||
import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND; | ||
import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1; | ||
|
||
public class KnativeHttpComponent extends NettyHttpComponent { | ||
private final Map<Integer, HttpServerConsumerChannelFactory> handlers = new ConcurrentHashMap<>(); | ||
|
||
@Override | ||
public synchronized HttpServerConsumerChannelFactory getMultiplexChannelHandler(int port) { | ||
return handlers.computeIfAbsent(port, Handler::new); | ||
} | ||
|
||
@Override | ||
protected void doStop() throws Exception { | ||
super.doStop(); | ||
|
||
ServiceHelper.stopService(handlers.values()); | ||
handlers.clear(); | ||
} | ||
|
||
@ChannelHandler.Sharable | ||
private static class Handler extends SimpleChannelInboundHandler<Object> implements HttpServerConsumerChannelFactory { | ||
private static final Logger LOG = LoggerFactory.getLogger(Handler.class); | ||
private static final AttributeKey<HttpServerChannelHandler> SERVER_HANDLER_KEY = AttributeKey.valueOf("serverHandler"); | ||
|
||
private final Set<HttpServerChannelHandler> consumers; | ||
private final int port; | ||
private final String token; | ||
private final int len; | ||
|
||
public Handler(int port) { | ||
this.consumers = new CopyOnWriteArraySet<>(); | ||
this.port = port; | ||
this.token = ":" + port; | ||
this.len = token.length(); | ||
} | ||
|
||
public void init(int port) { | ||
} | ||
|
||
public void addConsumer(NettyHttpConsumer consumer) { | ||
consumers.add(new HttpServerChannelHandler(consumer)); | ||
} | ||
|
||
public void removeConsumer(NettyHttpConsumer consumer) { | ||
consumers.removeIf(h -> h.getConsumer() == consumer); | ||
} | ||
|
||
public int consumers() { | ||
return consumers.size(); | ||
} | ||
|
||
public int getPort() { | ||
return port; | ||
} | ||
|
||
public ChannelHandler getChannelHandler() { | ||
return this; | ||
} | ||
|
||
@Override | ||
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { | ||
// store request, as this channel handler is created per pipeline | ||
HttpRequest request = (HttpRequest) msg; | ||
|
||
LOG.debug("Message received: {}", request); | ||
|
||
HttpServerChannelHandler handler = getHandler(request, request.method().name()); | ||
if (handler != null) { | ||
Attribute<HttpServerChannelHandler> attr = ctx.channel().attr(SERVER_HANDLER_KEY); | ||
// store handler as attachment | ||
attr.set(handler); | ||
if (msg instanceof HttpContent) { | ||
// need to hold the reference of content | ||
HttpContent httpContent = (HttpContent) msg; | ||
httpContent.content().retain(); | ||
} | ||
handler.channelRead(ctx, request); | ||
} else { | ||
// okay we cannot process this requires so return either 404 or 405. | ||
// to know if its 405 then we need to check if any other HTTP method would have a consumer for the "same" request | ||
boolean hasAnyMethod = CamelServlet.METHODS.stream().anyMatch(m -> isHttpMethodAllowed(request, m)); | ||
HttpResponse response = null; | ||
if (hasAnyMethod) { | ||
//method match error, return 405 | ||
response = new DefaultHttpResponse(HTTP_1_1, METHOD_NOT_ALLOWED); | ||
} else { | ||
// this resource is not found, return 404 | ||
response = new DefaultHttpResponse(HTTP_1_1, NOT_FOUND); | ||
} | ||
response.headers().set(Exchange.CONTENT_TYPE, "text/plain"); | ||
response.headers().set(Exchange.CONTENT_LENGTH, 0); | ||
ctx.writeAndFlush(response); | ||
ctx.close(); | ||
} | ||
} | ||
|
||
@Override | ||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { | ||
Attribute<HttpServerChannelHandler> attr = ctx.channel().attr(SERVER_HANDLER_KEY); | ||
HttpServerChannelHandler handler = attr.get(); | ||
if (handler != null) { | ||
handler.exceptionCaught(ctx, cause); | ||
} else { | ||
if (cause instanceof ClosedChannelException) { | ||
// The channel is closed so we do nothing here | ||
LOG.debug("Channel already closed. Ignoring this exception."); | ||
return; | ||
} else { | ||
// we cannot throw the exception here | ||
LOG.warn("HttpServerChannelHandler is not found as attachment to handle exception, send 404 back to the client.", cause); | ||
// Now we just send 404 back to the client | ||
HttpResponse response = new DefaultHttpResponse(HTTP_1_1, NOT_FOUND); | ||
response.headers().set(Exchange.CONTENT_TYPE, "text/plain"); | ||
response.headers().set(Exchange.CONTENT_LENGTH, 0); | ||
ctx.writeAndFlush(response); | ||
ctx.close(); | ||
} | ||
} | ||
} | ||
|
||
private boolean isHttpMethodAllowed(HttpRequest request, String method) { | ||
return getHandler(request, method) != null; | ||
} | ||
|
||
@SuppressWarnings("unchecked") | ||
private HttpServerChannelHandler getHandler(HttpRequest request, String method) { | ||
HttpServerChannelHandler answer = null; | ||
|
||
// need to strip out host and port etc, as we only need the context-path for matching | ||
if (method == null) { | ||
return null; | ||
} | ||
|
||
String path = request.uri(); | ||
int idx = path.indexOf(token); | ||
if (idx > -1) { | ||
path = path.substring(idx + len); | ||
} | ||
// use the path as key to find the consumer handler to use | ||
path = pathAsKey(path); | ||
|
||
/* | ||
List<RestConsumerContextPathMatcher.ConsumerPath> paths = new ArrayList<>(); | ||
for (final HttpServerChannelHandler handler : consumers) { | ||
paths.add(new HttpRestConsumerPath(handler)); | ||
} | ||
RestConsumerContextPathMatcher.ConsumerPath<HttpServerChannelHandler> best = RestConsumerContextPathMatcher.matchBestPath(method, path, paths); | ||
if (best != null) { | ||
answer = best.getConsumer(); | ||
} | ||
*/ | ||
|
||
// fallback to regular matching | ||
if (answer == null) { | ||
for (final HttpServerChannelHandler handler : consumers) { | ||
try { | ||
final NettyHttpConsumer consumer = handler.getConsumer(); | ||
final HttpHeaders headers = request.headers(); | ||
final String uri = consumer.getEndpoint().getEndpointUri(); | ||
final Map<String, Object> params = URISupport.parseParameters(URI.create(uri)); | ||
|
||
if (params.containsKey("filter.headerName") && params.containsKey("filter.headerValue")) { | ||
final String filterKey = (String) params.get("filter.headerName"); | ||
final String filterVal = (String) params.get("filter.headerValue"); | ||
final String headerVal = headers.get(filterKey); | ||
|
||
if (ObjectHelper.isEmpty(headerVal)) { | ||
continue; | ||
} | ||
if (!ObjectHelper.equal(filterVal, headerVal)) { | ||
continue; | ||
} | ||
} | ||
|
||
String consumerPath = consumer.getConfiguration().getPath(); | ||
boolean matchOnUriPrefix = consumer.getEndpoint().getConfiguration().isMatchOnUriPrefix(); | ||
// Just make sure the we get the right consumer path first | ||
if (RestConsumerContextPathMatcher.matchPath(path, consumerPath, matchOnUriPrefix)) { | ||
answer = handler; | ||
break; | ||
} | ||
} catch (Exception e) { | ||
throw ObjectHelper.wrapRuntimeCamelException(e); | ||
} | ||
} | ||
} | ||
|
||
return answer; | ||
} | ||
|
||
private static String pathAsKey(String path) { | ||
// cater for default path | ||
if (path == null || path.equals("/")) { | ||
path = ""; | ||
} | ||
|
||
// strip out query parameters | ||
int idx = path.indexOf('?'); | ||
if (idx > -1) { | ||
path = path.substring(0, idx); | ||
} | ||
|
||
// strip of ending / | ||
if (path.endsWith("/")) { | ||
path = path.substring(0, path.length() - 1); | ||
} | ||
|
||
return UnsafeUriCharactersEncoder.encodeHttpURI(path); | ||
} | ||
|
||
} | ||
} |
Oops, something went wrong.