diff --git a/deploy/dev/rbac.yaml b/deploy/dev/rbac.yaml index 37a5ee8..9595182 100644 --- a/deploy/dev/rbac.yaml +++ b/deploy/dev/rbac.yaml @@ -1,13 +1,13 @@ apiVersion: rbac.authorization.k8s.io/v1 kind: RoleBinding metadata: - name: hoptimator-controller + name: hoptimator-operator namespace: default subjects: - kind: ServiceAccount - name: default + name: hoptimator-operator namespace: default roleRef: kind: Role - name: hoptimator-controller + name: hoptimator-operator apiGroup: rbac.authorization.k8s.io diff --git a/deploy/hoptimator-operator-deployment.yaml b/deploy/hoptimator-operator-deployment.yaml index 83e786c..679bf58 100644 --- a/deploy/hoptimator-operator-deployment.yaml +++ b/deploy/hoptimator-operator-deployment.yaml @@ -14,6 +14,7 @@ spec: labels: app: hoptimator-operator spec: + serviceAccountName: hoptimator-operator containers: - name: hoptimator-operator image: docker.io/library/hoptimator diff --git a/deploy/rbac.yaml b/deploy/rbac.yaml index c3d2614..9d097a1 100644 --- a/deploy/rbac.yaml +++ b/deploy/rbac.yaml @@ -2,12 +2,15 @@ apiVersion: rbac.authorization.k8s.io/v1 kind: Role metadata: namespace: default - name: hoptimator-controller + name: hoptimator-operator rules: - apiGroups: ["hoptimator.linkedin.com"] resources: ["kafkatopics", "subscriptions"] - verbs: ["get", "watch", "list", "update", "create"] + verbs: ["get", "watch", "list", "create"] +- apiGroups: ["hoptimator.linkedin.com"] + resources: ["kafkatopics/status", "subscriptions/status"] + verbs: ["get", "patch"] - apiGroups: ["flink.apache.org"] resources: ["flinkdeployments"] - verbs: ["update", "create"] + verbs: ["get", "update", "create"] diff --git a/deploy/subscriptions.crd.yaml b/deploy/subscriptions.crd.yaml index b3bae54..87fcbaf 100644 --- a/deploy/subscriptions.crd.yaml +++ b/deploy/subscriptions.crd.yaml @@ -51,9 +51,21 @@ spec: message: description: Error or success message, for information only. type: string + sql: + description: The SQL being implemented by this pipeline. + type: string + resources: + description: The YAML generated to implement this pipeline. + type: array + items: + type: string subresources: status: {} additionalPrinterColumns: + - name: STATUS + type: string + description: Status message from the operator. + jsonPath: .status.message - name: DB type: string description: The database where the subscription is materialized. diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopic.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopic.java index b6bdddb..51f9516 100644 --- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopic.java +++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopic.java @@ -31,7 +31,7 @@ * Kafka Topic */ @ApiModel(description = "Kafka Topic") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-04-28T19:46:31.976Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-06-29T22:51:02.615Z[Etc/UTC]") public class V1alpha1KafkaTopic implements io.kubernetes.client.common.KubernetesObject { public static final String SERIALIZED_NAME_API_VERSION = "apiVersion"; @SerializedName(SERIALIZED_NAME_API_VERSION) diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicList.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicList.java index ac89f64..cac3ee8 100644 --- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicList.java +++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicList.java @@ -32,7 +32,7 @@ * KafkaTopicList is a list of KafkaTopic */ @ApiModel(description = "KafkaTopicList is a list of KafkaTopic") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-04-28T19:46:31.976Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-06-29T22:51:02.615Z[Etc/UTC]") public class V1alpha1KafkaTopicList implements io.kubernetes.client.common.KubernetesListObject { public static final String SERIALIZED_NAME_API_VERSION = "apiVersion"; @SerializedName(SERIALIZED_NAME_API_VERSION) diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpec.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpec.java index b278f9a..57c3eef 100644 --- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpec.java +++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpec.java @@ -33,7 +33,7 @@ * Desired Kafka topic configuration. */ @ApiModel(description = "Desired Kafka topic configuration.") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-04-28T19:46:31.976Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-06-29T22:51:02.615Z[Etc/UTC]") public class V1alpha1KafkaTopicSpec { public static final String SERIALIZED_NAME_CLIENT_CONFIGS = "clientConfigs"; @SerializedName(SERIALIZED_NAME_CLIENT_CONFIGS) diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpecClientConfigs.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpecClientConfigs.java index ff1a707..1e9ba00 100644 --- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpecClientConfigs.java +++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpecClientConfigs.java @@ -28,7 +28,7 @@ /** * V1alpha1KafkaTopicSpecClientConfigs */ -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-04-28T19:46:31.976Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-06-29T22:51:02.615Z[Etc/UTC]") public class V1alpha1KafkaTopicSpecClientConfigs { public static final String SERIALIZED_NAME_CONFIG_MAP_REF = "configMapRef"; @SerializedName(SERIALIZED_NAME_CONFIG_MAP_REF) diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpecConfigMapRef.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpecConfigMapRef.java index ecf8fab..03fdb91 100644 --- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpecConfigMapRef.java +++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicSpecConfigMapRef.java @@ -28,7 +28,7 @@ * Reference to a ConfigMap to use for AdminClient configuration. */ @ApiModel(description = "Reference to a ConfigMap to use for AdminClient configuration.") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-04-28T19:46:31.976Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-06-29T22:51:02.615Z[Etc/UTC]") public class V1alpha1KafkaTopicSpecConfigMapRef { public static final String SERIALIZED_NAME_NAME = "name"; @SerializedName(SERIALIZED_NAME_NAME) diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicStatus.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicStatus.java index 5b0ab4c..3113c19 100644 --- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicStatus.java +++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1KafkaTopicStatus.java @@ -28,7 +28,7 @@ * Current state of the topic. */ @ApiModel(description = "Current state of the topic.") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-04-28T19:46:31.976Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-06-29T22:51:02.615Z[Etc/UTC]") public class V1alpha1KafkaTopicStatus { public static final String SERIALIZED_NAME_MESSAGE = "message"; @SerializedName(SERIALIZED_NAME_MESSAGE) diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1Subscription.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1Subscription.java index 552a319..47ef54c 100644 --- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1Subscription.java +++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1Subscription.java @@ -31,7 +31,7 @@ * Hoptimator Subscription */ @ApiModel(description = "Hoptimator Subscription") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-04-28T19:46:31.976Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-06-29T22:51:02.615Z[Etc/UTC]") public class V1alpha1Subscription implements io.kubernetes.client.common.KubernetesObject { public static final String SERIALIZED_NAME_API_VERSION = "apiVersion"; @SerializedName(SERIALIZED_NAME_API_VERSION) diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionList.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionList.java index 7f5185d..d81833f 100644 --- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionList.java +++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionList.java @@ -32,7 +32,7 @@ * SubscriptionList is a list of Subscription */ @ApiModel(description = "SubscriptionList is a list of Subscription") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-04-28T19:46:31.976Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-06-29T22:51:02.615Z[Etc/UTC]") public class V1alpha1SubscriptionList implements io.kubernetes.client.common.KubernetesListObject { public static final String SERIALIZED_NAME_API_VERSION = "apiVersion"; @SerializedName(SERIALIZED_NAME_API_VERSION) diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionSpec.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionSpec.java index 10f189a..265b040 100644 --- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionSpec.java +++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionSpec.java @@ -28,7 +28,7 @@ * Subscription spec */ @ApiModel(description = "Subscription spec") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-04-28T19:46:31.976Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-06-29T22:51:02.615Z[Etc/UTC]") public class V1alpha1SubscriptionSpec { public static final String SERIALIZED_NAME_DATABASE = "database"; @SerializedName(SERIALIZED_NAME_DATABASE) diff --git a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionStatus.java b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionStatus.java index 8f41d08..b13acec 100644 --- a/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionStatus.java +++ b/hoptimator-models/src/main/java/com/linkedin/hoptimator/models/V1alpha1SubscriptionStatus.java @@ -23,12 +23,14 @@ import io.swagger.annotations.ApiModel; import io.swagger.annotations.ApiModelProperty; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; /** * Filled in by the operator. */ @ApiModel(description = "Filled in by the operator.") -@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-04-28T19:46:31.976Z[Etc/UTC]") +@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-06-29T22:51:02.615Z[Etc/UTC]") public class V1alpha1SubscriptionStatus { public static final String SERIALIZED_NAME_MESSAGE = "message"; @SerializedName(SERIALIZED_NAME_MESSAGE) @@ -38,6 +40,14 @@ public class V1alpha1SubscriptionStatus { @SerializedName(SERIALIZED_NAME_READY) private Boolean ready; + public static final String SERIALIZED_NAME_RESOURCES = "resources"; + @SerializedName(SERIALIZED_NAME_RESOURCES) + private List resources = null; + + public static final String SERIALIZED_NAME_SQL = "sql"; + @SerializedName(SERIALIZED_NAME_SQL) + private String sql; + public V1alpha1SubscriptionStatus message(String message) { @@ -85,6 +95,60 @@ public void setReady(Boolean ready) { } + public V1alpha1SubscriptionStatus resources(List resources) { + + this.resources = resources; + return this; + } + + public V1alpha1SubscriptionStatus addResourcesItem(String resourcesItem) { + if (this.resources == null) { + this.resources = new ArrayList<>(); + } + this.resources.add(resourcesItem); + return this; + } + + /** + * The YAML generated to implement this pipeline. + * @return resources + **/ + @javax.annotation.Nullable + @ApiModelProperty(value = "The YAML generated to implement this pipeline.") + + public List getResources() { + return resources; + } + + + public void setResources(List resources) { + this.resources = resources; + } + + + public V1alpha1SubscriptionStatus sql(String sql) { + + this.sql = sql; + return this; + } + + /** + * The SQL being implemented by this pipeline. + * @return sql + **/ + @javax.annotation.Nullable + @ApiModelProperty(value = "The SQL being implemented by this pipeline.") + + public String getSql() { + return sql; + } + + + public void setSql(String sql) { + this.sql = sql; + } + + @Override public boolean equals(Object o) { if (this == o) { @@ -95,12 +159,14 @@ public boolean equals(Object o) { } V1alpha1SubscriptionStatus v1alpha1SubscriptionStatus = (V1alpha1SubscriptionStatus) o; return Objects.equals(this.message, v1alpha1SubscriptionStatus.message) && - Objects.equals(this.ready, v1alpha1SubscriptionStatus.ready); + Objects.equals(this.ready, v1alpha1SubscriptionStatus.ready) && + Objects.equals(this.resources, v1alpha1SubscriptionStatus.resources) && + Objects.equals(this.sql, v1alpha1SubscriptionStatus.sql); } @Override public int hashCode() { - return Objects.hash(message, ready); + return Objects.hash(message, ready, resources, sql); } @@ -110,6 +176,8 @@ public String toString() { sb.append("class V1alpha1SubscriptionStatus {\n"); sb.append(" message: ").append(toIndentedString(message)).append("\n"); sb.append(" ready: ").append(toIndentedString(ready)).append("\n"); + sb.append(" resources: ").append(toIndentedString(resources)).append("\n"); + sb.append(" sql: ").append(toIndentedString(sql)).append("\n"); sb.append("}"); return sb.toString(); } diff --git a/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/Operator.java b/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/Operator.java index 0005bd7..27fbd96 100644 --- a/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/Operator.java +++ b/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/Operator.java @@ -122,54 +122,17 @@ public DynamicKubernetesApi apiFor(DynamicKubernetesObject obj) { return (GenericKubernetesApi) apiInfo(groupVersionKind).generic(apiClient); } - public boolean applyResource(Resource resource, V1OwnerReference ownerReference, - Resource.TemplateFactory templateFactory) { - String yaml = resource.render(templateFactory); - DynamicKubernetesObject obj = Dynamics.newFromYaml(yaml); - String namespace = obj.getMetadata().getNamespace(); - String name = obj.getMetadata().getName(); - KubernetesApiResponse existing = apiFor(obj).get(namespace, name); - if (existing.isSuccess()) { - String resourceVersion = existing.getObject().getMetadata().getResourceVersion(); - log.info("Updating existing downstream resource {}/{} {} as \n{}", - namespace, name, resourceVersion, yaml); - List owners = existing.getObject().getMetadata().getOwnerReferences(); - if (owners == null) { - owners = new ArrayList<>(); - } - if (owners.stream().anyMatch(x -> x.getUid().equals(ownerReference.getUid()))) { - log.info("Existing downstream resource {}/{} is already owned by {}/{}.", - namespace, name, ownerReference.getKind(), ownerReference.getName()); - } else { - log.info("Existing downstream resource {}/{} will be owned by {}/{} and {} others.", - namespace, name, ownerReference.getKind(), ownerReference.getName(), owners.size()); - owners.add(ownerReference); - } - obj.setMetadata(obj.getMetadata().ownerReferences(owners).resourceVersion(resourceVersion)); - KubernetesApiResponse response = apiFor(obj).update(obj); - if (!response.isSuccess()) { - log.error("Error updating downstream resource {}/{}: {}.", namespace, name, response.getStatus().getMessage()); - return false; - } - } else { - log.info("Creating downstream resource {}/{} as \n{}", namespace, name, yaml); - obj.setMetadata(obj.getMetadata().addOwnerReferencesItem(ownerReference)); - KubernetesApiResponse response = apiFor(obj).create(obj); - if (!response.isSuccess()) { - log.error("Error creating downstream resource {}/{}: {}.", namespace, name, response.getStatus().getMessage()); - return false; - } - } - return true; - } - public Duration failureRetryDuration() { return Duration.ofMinutes(5); } - public Duration resyncPeriod() { + public Duration pendingRetryDuration() { return Duration.ofMinutes(1); } + + public Duration resyncPeriod() { + return Duration.ofMinutes(10); + } public static class ApiInfo { private final String kind; diff --git a/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/subscription/SubscriptionReconciler.java b/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/subscription/SubscriptionReconciler.java index f09ca7d..918ca5c 100644 --- a/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/subscription/SubscriptionReconciler.java +++ b/hoptimator-operator/src/main/java/com/linkedin/hoptimator/operator/subscription/SubscriptionReconciler.java @@ -3,8 +3,8 @@ import com.linkedin.hoptimator.catalog.Resource; import com.linkedin.hoptimator.catalog.HopTable; import com.linkedin.hoptimator.models.V1alpha1Subscription; -import com.linkedin.hoptimator.models.V1alpha1SubscriptionStatus; import com.linkedin.hoptimator.models.V1alpha1SubscriptionSpec; +import com.linkedin.hoptimator.models.V1alpha1SubscriptionStatus; import com.linkedin.hoptimator.operator.Operator; import com.linkedin.hoptimator.operator.ConfigAssembler; import com.linkedin.hoptimator.operator.RequestEnvironment; @@ -27,6 +27,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Map; @@ -35,6 +36,7 @@ import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.CountDownLatch; +import java.util.stream.Collectors; public class SubscriptionReconciler implements Reconciler { private final static Logger log = LoggerFactory.getLogger(SubscriptionReconciler.class); @@ -56,38 +58,80 @@ public Result reconcile(Request request) { RequestEnvironment env = new RequestEnvironment(request); Resource.TemplateFactory templateFactory = new Resource.SimpleTemplateFactory(env); + Result result = new Result(true, operator.pendingRetryDuration()); try { V1alpha1Subscription object = operator.fetch(SUBSCRIPTION, namespace, name); - + if (object == null) { log.info("Object {}/{} deleted, skipping.", namespace, name); return new Result(false); } - - V1OwnerReference ownerReference = new V1OwnerReference(); - ownerReference.kind(object.getKind()); - ownerReference.name(object.getMetadata().getName()); - ownerReference.apiVersion(object.getApiVersion()); - ownerReference.uid(object.getMetadata().getUid()); - - Pipeline pipeline = pipeline(object); - boolean ready = pipeline.resources().stream() - .map(x -> operator.applyResource(x, ownerReference, templateFactory)).allMatch(x -> x); + + String kind = object.getKind(); V1alpha1SubscriptionStatus status = object.getStatus(); if (status == null) { status = new V1alpha1SubscriptionStatus(); + object.setStatus(status); } - status.setReady(ready); - operator.apiFor(SUBSCRIPTION).updateStatus(object, x -> object.getStatus()); + // We deploy in three phases: + // 1. Plan a pipeline, and write the plan to Status. + // 2. Deploy the pipeline per plan. + // 3. Verify readiness of the entire pipeline. + // Each phase should be a separate reconcilation loop to avoid races. + // TODO: We should disown orphaned resources when the pipeline changes. + if (status.getSql() == null || !status.getSql().equals(object.getSpec().getSql())) { + // Phase 1 + log.info("Planning a new pipeline for {}/{} with SQL `{}`...", kind, name, object.getSpec().getSql()); + + Pipeline pipeline = pipeline(object); + status.setResources(pipeline.resources().stream() + .map(x -> x.render(templateFactory)) + .collect(Collectors.toList())); + + status.setSql(object.getSpec().getSql()); + status.setReady(null); // null indicates that pipeline needs to be deployed + status.setMessage("Planned."); + } else if (status.getReady() == null && status.getResources() != null) { + // Phase 2 + log.info("Deploying pipeline for {}/{}...", kind, name); + + boolean deployed = status.getResources().stream() + .allMatch(x -> apply(x, object)); + + if (deployed) { + status.setReady(false); + status.setMessage("Deployed."); + } else { + return new Result(true, operator.failureRetryDuration()); + } + } else { + log.info("Checking status of pipeline for {}/{}...", kind, name); + + boolean ready = status.getResources().stream() + .allMatch(x -> checkStatus(x)); + + if (ready) { + status.setReady(true); + status.setMessage("Ready."); + log.info("{}/{} is ready.", kind, name); + result = new Result(false); + } else { + status.setReady(false); + status.setMessage("Deployed."); + log.info("Pipeline for {}/{} is NOT ready.", kind, name); + } + } + + operator.apiFor(SUBSCRIPTION).updateStatus(object, x -> object.getStatus()) + .onFailure((x, y) -> log.error("Failed to update status of {}/{}: {}.", kind, name, y.getMessage())); } catch (Exception e) { log.error("Encountered exception while reconciling Subscription {}/{}", namespace, name, e); return new Result(true, operator.failureRetryDuration()); } - log.info("Done reconciling {}/{}", namespace, name); - return new Result(false); + return result; } Pipeline pipeline(V1alpha1Subscription object) throws Exception { @@ -106,6 +150,102 @@ Pipeline pipeline(V1alpha1Subscription object) throws Exception { return impl.pipeline(sink); } + private boolean apply(String yaml, V1alpha1Subscription owner) { + V1OwnerReference ownerReference = new V1OwnerReference(); + ownerReference.kind(owner.getKind()); + ownerReference.name(owner.getMetadata().getName()); + ownerReference.apiVersion(owner.getApiVersion()); + ownerReference.uid(owner.getMetadata().getUid()); + + DynamicKubernetesObject obj = Dynamics.newFromYaml(yaml); + String namespace = obj.getMetadata().getNamespace(); + String name = obj.getMetadata().getName(); + KubernetesApiResponse existing = operator.apiFor(obj).get(namespace, name); + if (existing.isSuccess()) { + String resourceVersion = existing.getObject().getMetadata().getResourceVersion(); + log.info("Updating existing downstream resource {}/{} {} as \n{}", + namespace, name, resourceVersion, yaml); + List owners = existing.getObject().getMetadata().getOwnerReferences(); + if (owners == null) { + owners = new ArrayList<>(); + } + if (owners.stream().anyMatch(x -> x.getUid().equals(ownerReference.getUid()))) { + log.info("Existing downstream resource {}/{} is already owned by {}/{}.", + namespace, name, ownerReference.getKind(), ownerReference.getName()); + } else { + log.info("Existing downstream resource {}/{} will be owned by {}/{} and {} others.", + namespace, name, ownerReference.getKind(), ownerReference.getName(), owners.size()); + owners.add(ownerReference); + } + obj.setMetadata(obj.getMetadata().ownerReferences(owners).resourceVersion(resourceVersion)); + KubernetesApiResponse response = operator.apiFor(obj).update(obj); + if (!response.isSuccess()) { + log.error("Error updating downstream resource {}/{}: {}.", namespace, name, response.getStatus().getMessage()); + return false; + } + } else { + log.info("Creating downstream resource {}/{} as \n{}", namespace, name, yaml); + obj.setMetadata(obj.getMetadata().addOwnerReferencesItem(ownerReference)); + KubernetesApiResponse response = operator.apiFor(obj).create(obj); + if (!response.isSuccess()) { + log.error("Error creating downstream resource {}/{}: {}.", namespace, name, response.getStatus().getMessage()); + return false; + } + } + return true; + } + + private boolean checkStatus(String yaml) { + DynamicKubernetesObject obj = Dynamics.newFromYaml(yaml); + String namespace = obj.getMetadata().getNamespace(); + String name = obj.getMetadata().getName(); + String kind = obj.getKind(); + try { + KubernetesApiResponse existing = operator.apiFor(obj).get(namespace, name); + existing.onFailure((code, status) -> log.warn("Failed to fetch {}/{}: {}.", kind, name, status.getMessage())); + if (!existing.isSuccess()) { + return false; + } + if (isReady(existing.getObject())) { + log.info("{}/{} is ready.", kind, name); + return true; + } else { + log.info("{}/{} is NOT ready.", kind, name); + return false; + } + } catch (Exception e) { + return false; + } + } + + private static boolean isReady(DynamicKubernetesObject obj) { + // We make a best effort to guess the status of the dynamic object. By default, it's ready. + if (obj == null || obj.getRaw() == null) { + return false; + } + try { + return obj.getRaw().get("status").getAsJsonObject().get("ready").getAsBoolean(); + } catch (Exception e) { + log.debug("Exception looking for .status.ready. Swallowing.", e); + } + try { + return obj.getRaw().get("status").getAsJsonObject().get("state").getAsString() + .matches("(?i)READY|RUNNING|FINISHED"); + } catch (Exception e) { + log.debug("Exception looking for .status.state. Swallowing.", e); + } + try { + return obj.getRaw().get("status").getAsJsonObject().get("jobStatus").getAsJsonObject() + .get("state").getAsString().matches("(?i)READY|RUNNING|FINISHED"); + } catch (Exception e) { + log.debug("Exception looking for .status.jobStatus.state. Swallowing.", e); + } + // TODO: Look for common Conditions + log.warn("Resource {}/{}/{} considered ready by default.", obj.getMetadata().getNamespace(), + obj.getKind(), obj.getMetadata().getName()); + return true; + } + public static Controller controller(Operator operator, HoptimatorPlanner.Factory plannerFactory) { Reconciler reconciler = new SubscriptionReconciler(operator, plannerFactory); return ControllerBuilder.defaultBuilder(operator.informerFactory())