diff --git a/deploy/samples/subscriptions.yaml b/deploy/samples/subscriptions.yaml index 010ebe9..21346f1 100644 --- a/deploy/samples/subscriptions.yaml +++ b/deploy/samples/subscriptions.yaml @@ -3,8 +3,19 @@ apiVersion: hoptimator.linkedin.com/v1alpha1 kind: Subscription metadata: name: products +spec: + sql: SELECT "quantity", "product_id" AS KEY FROM INVENTORY."products_on_hand" + database: RAWKAFKA + +--- + +apiVersion: hoptimator.linkedin.com/v1alpha1 +kind: Subscription +metadata: + name: products-with-hints spec: sql: SELECT "quantity", "product_id" AS KEY FROM INVENTORY."products_on_hand" database: RAWKAFKA hints: - numPartitions: "2" + kafka.numPartitions: "7" + diff --git a/etc/integration-tests.sql b/etc/integration-tests.sql index d107fd8..31f2cfe 100644 --- a/etc/integration-tests.sql +++ b/etc/integration-tests.sql @@ -16,6 +16,9 @@ SELECT * FROM INVENTORY."products_on_hand" LIMIT 1; -- MySQL CDC -> Kafka SELECT * FROM RAWKAFKA."products" LIMIT 1; +-- Same, but with hints: +SELECT * FROM RAWKAFKA."products-with-hints" LIMIT 1; + -- test insert into command !insert into RAWKAFKA."test-sink" SELECT AGE AS PAYLOAD, NAME AS KEY FROM DATAGEN.PERSON SELECT * FROM RAWKAFKA."test-sink" LIMIT 5; diff --git a/hoptimator-kafka-adapter/src/main/resources/KafkaTopic.yaml.template b/hoptimator-kafka-adapter/src/main/resources/KafkaTopic.yaml.template index 0e8b35e..fa0cb26 100644 --- a/hoptimator-kafka-adapter/src/main/resources/KafkaTopic.yaml.template +++ b/hoptimator-kafka-adapter/src/main/resources/KafkaTopic.yaml.template @@ -5,6 +5,6 @@ metadata: namespace: {{pipeline.namespace}} spec: topicName: {{topicName}} - numPartitions: {{numPartitions:null}} + numPartitions: {{kafka.numPartitions:null}} clientOverrides: {{clientOverrides}} diff --git a/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/subscription/SubscriptionReconciler.java b/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/subscription/SubscriptionReconciler.java index 2b82ca2..e9bde8a 100644 --- a/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/subscription/SubscriptionReconciler.java +++ b/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/subscription/SubscriptionReconciler.java @@ -93,7 +93,7 @@ public Result reconcile(Request request) { // For sink resources, also expose hints. Resource.TemplateFactory sinkTemplateFactory = new Resource.SimpleTemplateFactory(subEnv - .orElse(new Resource.SimpleEnvironment(object.getSpec().getHints()))); + .orElse(new Resource.SimpleEnvironment(map(object.getSpec().getHints())))); // Render resources related to all source tables. List upstreamResources = pipeline.upstreamResources().stream() @@ -284,5 +284,13 @@ public static Controller controller(Operator operator, HoptimatorPlanner.Factory .watch(x -> ControllerBuilder.controllerWatchBuilder(V1alpha1Subscription.class, x).build()) .build(); } + + private static Map map(Map m) { + if (m == null) { + return Collections.emptyMap(); + } else { + return m; + } + } }