Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Report subscription status #38

Merged
merged 3 commits into from
Jun 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions deploy/dev/rbac.yaml
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: hoptimator-controller
name: hoptimator-operator
namespace: default
subjects:
- kind: ServiceAccount
name: default
name: hoptimator-operator
namespace: default
roleRef:
kind: Role
name: hoptimator-controller
name: hoptimator-operator
apiGroup: rbac.authorization.k8s.io
1 change: 1 addition & 0 deletions deploy/hoptimator-operator-deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ spec:
labels:
app: hoptimator-operator
spec:
serviceAccountName: hoptimator-operator
containers:
- name: hoptimator-operator
image: docker.io/library/hoptimator
Expand Down
9 changes: 6 additions & 3 deletions deploy/rbac.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
namespace: default
name: hoptimator-controller
name: hoptimator-operator
rules:
- apiGroups: ["hoptimator.linkedin.com"]
resources: ["kafkatopics", "subscriptions"]
verbs: ["get", "watch", "list", "update", "create"]
verbs: ["get", "watch", "list", "create"]
- apiGroups: ["hoptimator.linkedin.com"]
resources: ["kafkatopics/status", "subscriptions/status"]
verbs: ["get", "patch"]
- apiGroups: ["flink.apache.org"]
resources: ["flinkdeployments"]
verbs: ["update", "create"]
verbs: ["get", "update", "create"]

12 changes: 12 additions & 0 deletions deploy/subscriptions.crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,21 @@ spec:
message:
description: Error or success message, for information only.
type: string
sql:
description: The SQL being implemented by this pipeline.
type: string
resources:
ryannedolan marked this conversation as resolved.
Show resolved Hide resolved
description: The YAML generated to implement this pipeline.
type: array
items:
type: string
subresources:
status: {}
additionalPrinterColumns:
- name: STATUS
type: string
description: Status message from the operator.
jsonPath: .status.message
- name: DB
type: string
description: The database where the subscription is materialized.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
* Kafka Topic
*/
@ApiModel(description = "Kafka Topic")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-04-28T19:46:31.976Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-06-29T22:51:02.615Z[Etc/UTC]")
public class V1alpha1KafkaTopic implements io.kubernetes.client.common.KubernetesObject {
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
@SerializedName(SERIALIZED_NAME_API_VERSION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
* KafkaTopicList is a list of KafkaTopic
*/
@ApiModel(description = "KafkaTopicList is a list of KafkaTopic")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-04-28T19:46:31.976Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-06-29T22:51:02.615Z[Etc/UTC]")
public class V1alpha1KafkaTopicList implements io.kubernetes.client.common.KubernetesListObject {
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
@SerializedName(SERIALIZED_NAME_API_VERSION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
* Desired Kafka topic configuration.
*/
@ApiModel(description = "Desired Kafka topic configuration.")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-04-28T19:46:31.976Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-06-29T22:51:02.615Z[Etc/UTC]")
public class V1alpha1KafkaTopicSpec {
public static final String SERIALIZED_NAME_CLIENT_CONFIGS = "clientConfigs";
@SerializedName(SERIALIZED_NAME_CLIENT_CONFIGS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
/**
* V1alpha1KafkaTopicSpecClientConfigs
*/
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-04-28T19:46:31.976Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-06-29T22:51:02.615Z[Etc/UTC]")
public class V1alpha1KafkaTopicSpecClientConfigs {
public static final String SERIALIZED_NAME_CONFIG_MAP_REF = "configMapRef";
@SerializedName(SERIALIZED_NAME_CONFIG_MAP_REF)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
* Reference to a ConfigMap to use for AdminClient configuration.
*/
@ApiModel(description = "Reference to a ConfigMap to use for AdminClient configuration.")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-04-28T19:46:31.976Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-06-29T22:51:02.615Z[Etc/UTC]")
public class V1alpha1KafkaTopicSpecConfigMapRef {
public static final String SERIALIZED_NAME_NAME = "name";
@SerializedName(SERIALIZED_NAME_NAME)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
* Current state of the topic.
*/
@ApiModel(description = "Current state of the topic.")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-04-28T19:46:31.976Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-06-29T22:51:02.615Z[Etc/UTC]")
public class V1alpha1KafkaTopicStatus {
public static final String SERIALIZED_NAME_MESSAGE = "message";
@SerializedName(SERIALIZED_NAME_MESSAGE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
* Hoptimator Subscription
*/
@ApiModel(description = "Hoptimator Subscription")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-04-28T19:46:31.976Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-06-29T22:51:02.615Z[Etc/UTC]")
public class V1alpha1Subscription implements io.kubernetes.client.common.KubernetesObject {
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
@SerializedName(SERIALIZED_NAME_API_VERSION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
* SubscriptionList is a list of Subscription
*/
@ApiModel(description = "SubscriptionList is a list of Subscription")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-04-28T19:46:31.976Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-06-29T22:51:02.615Z[Etc/UTC]")
public class V1alpha1SubscriptionList implements io.kubernetes.client.common.KubernetesListObject {
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
@SerializedName(SERIALIZED_NAME_API_VERSION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
* Subscription spec
*/
@ApiModel(description = "Subscription spec")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-04-28T19:46:31.976Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-06-29T22:51:02.615Z[Etc/UTC]")
public class V1alpha1SubscriptionSpec {
public static final String SERIALIZED_NAME_DATABASE = "database";
@SerializedName(SERIALIZED_NAME_DATABASE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,14 @@
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
* Filled in by the operator.
*/
@ApiModel(description = "Filled in by the operator.")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-04-28T19:46:31.976Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2023-06-29T22:51:02.615Z[Etc/UTC]")
public class V1alpha1SubscriptionStatus {
public static final String SERIALIZED_NAME_MESSAGE = "message";
@SerializedName(SERIALIZED_NAME_MESSAGE)
Expand All @@ -38,6 +40,14 @@ public class V1alpha1SubscriptionStatus {
@SerializedName(SERIALIZED_NAME_READY)
private Boolean ready;

public static final String SERIALIZED_NAME_RESOURCES = "resources";
@SerializedName(SERIALIZED_NAME_RESOURCES)
private List<String> resources = null;

public static final String SERIALIZED_NAME_SQL = "sql";
@SerializedName(SERIALIZED_NAME_SQL)
private String sql;


public V1alpha1SubscriptionStatus message(String message) {

Expand Down Expand Up @@ -85,6 +95,60 @@ public void setReady(Boolean ready) {
}


public V1alpha1SubscriptionStatus resources(List<String> resources) {

this.resources = resources;
return this;
}

public V1alpha1SubscriptionStatus addResourcesItem(String resourcesItem) {
if (this.resources == null) {
this.resources = new ArrayList<>();
}
this.resources.add(resourcesItem);
return this;
}

/**
* The YAML generated to implement this pipeline.
* @return resources
**/
@javax.annotation.Nullable
@ApiModelProperty(value = "The YAML generated to implement this pipeline.")

public List<String> getResources() {
return resources;
}


public void setResources(List<String> resources) {
this.resources = resources;
}


public V1alpha1SubscriptionStatus sql(String sql) {

this.sql = sql;
return this;
}

/**
* The SQL being implemented by this pipeline.
* @return sql
**/
@javax.annotation.Nullable
@ApiModelProperty(value = "The SQL being implemented by this pipeline.")

public String getSql() {
return sql;
}


public void setSql(String sql) {
this.sql = sql;
}


@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -95,12 +159,14 @@ public boolean equals(Object o) {
}
V1alpha1SubscriptionStatus v1alpha1SubscriptionStatus = (V1alpha1SubscriptionStatus) o;
return Objects.equals(this.message, v1alpha1SubscriptionStatus.message) &&
Objects.equals(this.ready, v1alpha1SubscriptionStatus.ready);
Objects.equals(this.ready, v1alpha1SubscriptionStatus.ready) &&
Objects.equals(this.resources, v1alpha1SubscriptionStatus.resources) &&
Objects.equals(this.sql, v1alpha1SubscriptionStatus.sql);
}

@Override
public int hashCode() {
return Objects.hash(message, ready);
return Objects.hash(message, ready, resources, sql);
}


Expand All @@ -110,6 +176,8 @@ public String toString() {
sb.append("class V1alpha1SubscriptionStatus {\n");
sb.append(" message: ").append(toIndentedString(message)).append("\n");
sb.append(" ready: ").append(toIndentedString(ready)).append("\n");
sb.append(" resources: ").append(toIndentedString(resources)).append("\n");
sb.append(" sql: ").append(toIndentedString(sql)).append("\n");
sb.append("}");
return sb.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,54 +122,17 @@ public DynamicKubernetesApi apiFor(DynamicKubernetesObject obj) {
return (GenericKubernetesApi<T, L>) apiInfo(groupVersionKind).generic(apiClient);
}

public boolean applyResource(Resource resource, V1OwnerReference ownerReference,
Resource.TemplateFactory templateFactory) {
String yaml = resource.render(templateFactory);
DynamicKubernetesObject obj = Dynamics.newFromYaml(yaml);
String namespace = obj.getMetadata().getNamespace();
String name = obj.getMetadata().getName();
KubernetesApiResponse<DynamicKubernetesObject> existing = apiFor(obj).get(namespace, name);
if (existing.isSuccess()) {
String resourceVersion = existing.getObject().getMetadata().getResourceVersion();
log.info("Updating existing downstream resource {}/{} {} as \n{}",
namespace, name, resourceVersion, yaml);
List<V1OwnerReference> owners = existing.getObject().getMetadata().getOwnerReferences();
if (owners == null) {
owners = new ArrayList<>();
}
if (owners.stream().anyMatch(x -> x.getUid().equals(ownerReference.getUid()))) {
log.info("Existing downstream resource {}/{} is already owned by {}/{}.",
namespace, name, ownerReference.getKind(), ownerReference.getName());
} else {
log.info("Existing downstream resource {}/{} will be owned by {}/{} and {} others.",
namespace, name, ownerReference.getKind(), ownerReference.getName(), owners.size());
owners.add(ownerReference);
}
obj.setMetadata(obj.getMetadata().ownerReferences(owners).resourceVersion(resourceVersion));
KubernetesApiResponse<DynamicKubernetesObject> response = apiFor(obj).update(obj);
if (!response.isSuccess()) {
log.error("Error updating downstream resource {}/{}: {}.", namespace, name, response.getStatus().getMessage());
return false;
}
} else {
log.info("Creating downstream resource {}/{} as \n{}", namespace, name, yaml);
obj.setMetadata(obj.getMetadata().addOwnerReferencesItem(ownerReference));
KubernetesApiResponse<DynamicKubernetesObject> response = apiFor(obj).create(obj);
if (!response.isSuccess()) {
log.error("Error creating downstream resource {}/{}: {}.", namespace, name, response.getStatus().getMessage());
return false;
}
}
return true;
}

public Duration failureRetryDuration() {
return Duration.ofMinutes(5);
}

public Duration resyncPeriod() {
public Duration pendingRetryDuration() {
return Duration.ofMinutes(1);
}

public Duration resyncPeriod() {
return Duration.ofMinutes(10);
}

public static class ApiInfo<T extends KubernetesObject, L extends KubernetesListObject> {
private final String kind;
Expand Down
Loading