Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CMLK-2214: Fix timestamp-router-action Kamelet #340

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ spec:
password: "{{password}}"
loginUrl: "{{loginUrl}}"
from:
uri: "{{local-salesforce}}:{{topicName}}"
uri: "{{local-salesforce}}:{{operation}}:{{topicName}}"
parameters:
notifyForFields: "{{notifyForFields}}"
updateTopic: "true"
Expand All @@ -134,7 +134,6 @@ spec:
notifyForOperationDelete: "{{notifyForOperationDelete}}"
notifyForOperationUndelete: "{{notifyForOperationUndelete}}"
sObjectQuery: "{{query}}"
operationName: "{{operation}}"
steps:
- marshal:
json: {}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ spec:
from:
uri: kamelet:source
steps:
- set-property:
- setProperty:
name: "topicFormat"
constant: "{{topicFormat}}"
- set-property:
- setProperty:
name: "timestampFormat"
constant: "{{timestampFormat}}"
- set-property:
- setProperty:
name: "timestampHeaderName"
constant: "{{timestampHeaderName}}"
- bean: "org.apache.camel.kamelets.utils.transform.TimestampRouter"
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,18 @@
*/
package org.apache.camel.kamelets.utils.transform;

import org.apache.camel.Exchange;
import org.apache.camel.ExchangeProperty;
import org.apache.camel.component.kafka.KafkaConstants;
import org.apache.camel.util.ObjectHelper;

import java.text.SimpleDateFormat;
import java.time.Instant;
import java.util.Date;
import java.util.TimeZone;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.apache.camel.Exchange;
import org.apache.camel.ExchangeProperty;
import org.apache.camel.component.kafka.KafkaConstants;
import org.apache.camel.util.ObjectHelper;

public class TimestampRouter {

public void process(@ExchangeProperty("topicFormat") String topicFormat, @ExchangeProperty("timestampFormat") String timestampFormat, @ExchangeProperty("timestampHeaderName") String timestampHeaderName, Exchange ex) {
Expand All @@ -38,15 +38,15 @@ public void process(@ExchangeProperty("topicFormat") String topicFormat, @Exchan
final SimpleDateFormat fmt = new SimpleDateFormat(timestampFormat);
fmt.setTimeZone(TimeZone.getTimeZone("UTC"));

long timestamp;
Long timestamp = null;
String topicName = ex.getMessage().getHeader(KafkaConstants.TOPIC, String.class);
Object rawTimestamp = ex.getMessage().getHeader(timestampHeaderName);
if (rawTimestamp instanceof Long) {
timestamp = (Long) rawTimestamp;
} else if (rawTimestamp instanceof Instant) {
timestamp = ((Instant) rawTimestamp).toEpochMilli();
} else {
timestamp = (Long) rawTimestamp;
} else if (ObjectHelper.isNotEmpty(rawTimestamp)) {
timestamp = Long.parseLong(rawTimestamp.toString());
}
if (ObjectHelper.isNotEmpty(timestamp)) {
final String formattedTimestamp = fmt.format(new Date(timestamp));
Expand Down
43 changes: 43 additions & 0 deletions test/kafka/timestamp-router-pipe.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
apiVersion: camel.apache.org/v1
kind: Pipe
metadata:
name: timestamp-router-pipe
spec:
source:
ref:
kind: Kamelet
apiVersion: camel.apache.org/v1
name: webhook-source
properties:
subpath: messages
steps:
- ref:
kind: Kamelet
apiVersion: camel.apache.org/v1
name: timestamp-router-action
properties:
topicFormat: $[topic]_$[timestamp]
timestampFormat: YYYY-MM-dd
- ref:
kind: Kamelet
apiVersion: camel.apache.org/v1
name: set-body-action
properties:
value: $simple{header[message]}
- ref:
kind: Kamelet
apiVersion: camel.apache.org/v1
name: log-action
properties:
showHeaders: true
sink:
ref:
kind: Kamelet
apiVersion: camel.apache.org/v1
name: kafka-sink
properties:
bootstrapServers: ${bootstrap.server.host}.${YAKS_NAMESPACE}:${bootstrap.server.port}
user: ${user}
password: ${password}
topic: dummy
securityProtocol: ${securityProtocol}
37 changes: 37 additions & 0 deletions test/kafka/timestamp-router.feature
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
Feature: Kafka Timestamp Router

Background:
Given variable user is ""
Given variable password is ""
Given variables
| bootstrap.server.host | my-cluster-kafka-bootstrap |
| bootstrap.server.port | 9092 |
| securityProtocol | PLAINTEXT |
| topicName | my-topic |
| timestamp | yaks:unixTimestamp()000 |
| topic | ${topicName}_yaks:currentDate('YYYY-MM-dd') |
| message | Camel K rocks! |
Given Kafka topic: ${topic}
Given Kafka topic partition: 0

Scenario: Create Pipe
Given Camel K resource polling configuration
| maxAttempts | 200 |
| delayBetweenAttempts | 2000 |
When load Pipe timestamp-router-pipe.yaml
Then Camel K integration timestamp-router-pipe should be running
Then Camel K integration timestamp-router-pipe should print Routes startup

Scenario: Receive message on Kafka topic and verify sink output
Given Kafka connection
| url | ${bootstrap.server.host}.${YAKS_NAMESPACE}:${bootstrap.server.port} |
Given URL: yaks:resolveURL('timestamp-router-pipe',8080)
Given HTTP request query parameter kafka.TOPIC="${topicName}"
Given HTTP request query parameter kafka.TIMESTAMP="${timestamp}"
Given HTTP request query parameter message="yaks:urlEncode(${message})"
When send GET /messages
And receive HTTP 200 OK
Then receive Kafka message with body: ${message}

Scenario: Remove resources
Given delete Pipe timestamp-router-pipe
39 changes: 39 additions & 0 deletions test/kafka/webhook-source.kamelet.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
apiVersion: camel.apache.org/v1
kind: Kamelet
metadata:
name: webhook-source
annotations:
camel.apache.org/kamelet.support.level: "Stable"
camel.apache.org/catalog.version: "2.3.0"
camel.apache.org/kamelet.icon: "data:image/svg+xml;base64,PHN2ZyB4bWxucz0iaHR0cDovL3d3dy53My5vcmcvMjAwMC9zdmciIHZpZXdCb3g9Ijk0NCAtNzcxIDI1MDAgMjMzNCI+PHBhdGggZD0iTTIxMTEuNCAyMTAuNWMtMTAzLjYgMTc0LjItMjAyLjkgMzQyLjktMzA0LjIgNTEwLjMtMjYgNDMtMzguOSA3OC0xOC4xIDEzMi42IDU3LjQgMTUwLjktMjMuNiAyOTcuOC0xNzUuOCAzMzcuNi0xNDMuNSAzNy42LTI4My40LTU2LjctMzExLjgtMjEwLjQtMjUuMi0xMzYgODAuMy0yNjkuMyAyMzAuMy0yOTAuNiAxMi42LTEuOCAyNS40LTIgNDYuNS0zLjZsMjI4LjEtMzgyLjVjLTE0My41LTE0Mi43LTIyOC45LTMwOS40LTIxMC01MTYgMTMuNC0xNDYuMSA3MC44LTI3Mi4zIDE3NS44LTM3NS44IDIwMS0xOTguMSA1MDcuOC0yMzAuMiA3NDQuNS03OC4xIDIyNy40IDE0Ni4xIDMzMS41IDQzMC42IDI0Mi44IDY3NC4xLTY2LjktMTguMS0xMzQuMy0zNi40LTIwOC40LTU2LjUgMjcuOS0xMzUuNCA3LjMtMjU3LTg0LjEtMzYxLjEtNjAuMy02OC44LTEzNy44LTEwNC44LTIyNS44LTExOC4xLTE3Ni41LTI2LjctMzQ5LjggODYuNy00MDEuMiAyNTkuOS01OC4zIDE5Ni42IDMwIDM1Ny4yIDI3MS40IDQ3OC4yeiIgZmlsbD0iI2M3M2E2MyIvPjxwYXRoIGQ9Ik0yNDA3LjMgNC41YzczIDEyOC44IDE0Ny4xIDI1OS41IDIyMC42IDM4OSAzNzEuMy0xMTQuOSA2NTEuMiA5MC43IDc1MS43IDMxMC43IDEyMS4zIDI2NS44IDM4LjQgNTgwLjYtMTk5LjkgNzQ0LjYtMjQ0LjUgMTY4LjMtNTUzLjggMTM5LjYtNzcwLjQtNzYuNyA1NS4yLTQ2LjIgMTEwLjctOTIuNiAxNzAtMTQyLjIgMjE0IDEzOC42IDQwMS4xIDEzMi4xIDU0MC4xLTMyLjEgMTE4LjUtMTQwIDExNS45LTM0OC44LTYtNDg1LjgtMTQwLjctMTU4LjItMzI5LjItMTYzLTU1Ny0xMS4yLTk0LjUtMTY3LjctMTkwLjYtMzM0LTI4Mi4yLTUwMi44LTMwLjktNTYuOS02NC45LTg5LjktMTM0LjUtMTAyLTExNi4xLTIwLjEtMTkxLjEtMTE5LjktMTk1LjYtMjMxLjYtNC40LTExMC41IDYwLjctMjEwLjQgMTYyLjQtMjQ5LjMgMTAwLjgtMzguNiAyMTkuMS03LjQgMjg2LjkgNzguMyA1NS40IDcwIDczIDE0OC44IDQzLjkgMjM1LjItOC4xIDI0LjEtMTguNiA0Ny40LTMwIDc1Ljl6IiBmaWxsPSIjNGI0YjRiIi8+PHBhdGggZD0iTTI1ODEuOCAxMDU3LjJoLTQ0N2MtNDIuOCAxNzYuMi0xMzUuNCAzMTguNS0yOTQuOCA0MDguOS0xMjQgNzAuMy0yNTcuNSA5NC4yLTM5OS43IDcxLjItMjYxLjgtNDIuMi00NzUuOS0yNzcuOS00OTQuNy01NDMuMy0yMS4zLTMwMC42IDE4NS4zLTU2Ny44IDQ2MC43LTYyNy45IDE5IDY5LjEgMzguMiAxMzguOCA1Ny4yIDIwNy43LTI1Mi43IDEyOC45LTM0MC4yIDI5MS40LTI2OS40IDQ5NC41IDYyLjMgMTc4LjggMjM5LjEgMjc2LjcgNDMxLjEgMjM4LjggMTk2LjEtMzguNyAyOTUtMjAxLjcgMjgyLjktNDYzLjIgMTg1LjkgMCAzNzItMS45IDU1Ny45LjkgNzIuNiAxLjEgMTI4LjYtNi40IDE4My4zLTcwLjQgOTAtMTA1LjMgMjU1LjgtOTUuOCAzNTIuNyAzLjcgOTkuMSAxMDEuNyA5NC40IDI2NS4yLTEwLjUgMzYyLjYtMTAxLjIgOTMuOS0yNjEgODguOS0zNTUuNy0xMi4zLTE5LjUtMjAuOC0zNC44LTQ1LjUtNTQtNzEuMnoiIGZpbGw9IiM0YTRhNGEiLz48L3N2Zz4="
camel.apache.org/provider: "Apache Software Foundation"
camel.apache.org/kamelet.group: "Webhook"
camel.apache.org/kamelet.namespace: "Cloud"
labels:
camel.apache.org/kamelet.type: "source"
spec:
definition:
title: "Webhook Source"
description: |-
Creates an HTTP endpoint that can be used as a bridge to forward data to the Kamelet sink.

The "subpath" parameter of the Webhook source allows to customize the subpath where the integration will respond to HTTP requests.
It's common to use a non-guessable ID for that parameter.

When the "subpath" parameter is set to "webhook" (default), the integration will accept requests at the "https://integration-external-url/webhook" endpoint.
type: object
properties:
subpath:
title: Subpath
description: |
The subpath where the webhook is registered
type: string
default: "webhook"
dependencies:
- "camel:platform-http"
- "camel:kamelet"
template:
from:
uri: "platform-http:///{{subpath}}"
steps:
- to: "kamelet:sink"
2 changes: 2 additions & 0 deletions test/kafka/yaks-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ config:
resources:
- kafka-source-pipe.yaml
- kafka-sink-pipe.yaml
- timestamp-router-pipe.yaml
dump:
enabled: true
failedOnly: true
Expand All @@ -56,3 +57,4 @@ pre:
kubectl apply -f ../../kafka-source.kamelet.yaml -n $YAKS_NAMESPACE
kubectl delete kamelet kafka-sink -n $YAKS_NAMESPACE
kubectl apply -f ../../kafka-sink.kamelet.yaml -n $YAKS_NAMESPACE
kubectl apply -f webhook-source.kamelet.yaml -n $YAKS_NAMESPACE
6 changes: 3 additions & 3 deletions timestamp-router-action.kamelet.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ spec:
from:
uri: kamelet:source
steps:
- set-property:
- setProperty:
name: "topicFormat"
constant: "{{topicFormat}}"
- set-property:
- setProperty:
name: "timestampFormat"
constant: "{{timestampFormat}}"
- set-property:
- setProperty:
name: "timestampHeaderName"
constant: "{{timestampHeaderName}}"
- bean: "org.apache.camel.kamelets.utils.transform.TimestampRouter"
Loading