Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gather attributes of downstream resources #61

Merged
merged 1 commit into from
Feb 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion deploy/subscriptions.crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,23 @@ spec:
type: object
additionalProperties:
type: string
attributes:
description: Physical attributes of the job and sink/output table.
type: object
additionalProperties:
type: string
resources:
description: The YAML generated to implement this pipeline.
description: The yaml generated to implement this pipeline.
type: array
items:
type: string
jobResources:
description: The yaml generated to implement the job.
type: array
items:
type: string
downstreamResources:
description: The yaml generated to implement the sink/output table.
type: array
items:
type: string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import com.linkedin.hoptimator.operator.Operator;
import com.linkedin.hoptimator.operator.ConfigAssembler;
import com.linkedin.hoptimator.models.V1alpha1KafkaTopic;
import com.linkedin.hoptimator.models.V1alpha1KafkaTopicStatus;

import io.kubernetes.client.extended.controller.reconciler.Reconciler;
import io.kubernetes.client.extended.controller.reconciler.Request;
Expand Down Expand Up @@ -52,6 +53,10 @@ public Result reconcile(Request request) {
return new Result(false);
}

if (object.getStatus() == null) {
object.setStatus(new V1alpha1KafkaTopicStatus());
}

String topicName = object.getSpec().getTopicName();
Integer desiredPartitions = object.getSpec().getNumPartitions();
Integer desiredReplicationFactor = object.getSpec().getReplicationFactor();
Expand All @@ -72,22 +77,29 @@ public Result reconcile(Request request) {

log.info("Found existing topic {}", topicName);
int actualPartitions = topicDescription.partitions().size();
object.getStatus().setNumPartitions(actualPartitions);
if (desiredPartitions != null && desiredPartitions > actualPartitions) {
log.info("Desired partitions {} > actual partitions {}. Creating additional partitions.",
desiredPartitions, actualPartitions);
admin.createPartitions(Collections.singletonMap(topicName, NewPartitions.increaseTo(desiredPartitions))).all().get();
object.getStatus().setNumPartitions(desiredPartitions);
}
} catch(ExecutionException e) {
if (e.getCause() instanceof UnknownTopicOrPartitionException ) {
log.info("No existing topic {}. Will create it.", topicName);
admin.createTopics(Collections.singleton(new NewTopic(topicName, Optional.ofNullable(desiredPartitions),
Optional.ofNullable(desiredReplicationFactor).map(x -> x.shortValue())))).all().get();
object.getStatus().setNumPartitions(desiredPartitions);
} else {
throw e;
}
} finally {
admin.close();
}

operator.apiFor(KAFKATOPIC).updateStatus(object, x -> object.getStatus())
.onFailure((x, y) -> log.error("Failed to update status of KafkaTopic {}/{}: {}.", namespace, name,
y.getMessage()));
} catch (Exception e) {
log.error("Encountered exception while reconciling KafkaTopic {}/{}", namespace, name, e);
return new Result(true, operator.failureRetryDuration());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
* Access control rule (colloquially, an Acl)
*/
@ApiModel(description = "Access control rule (colloquially, an Acl)")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-05T21:19:50.466Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-09T06:53:38.470Z[Etc/UTC]")
public class V1alpha1Acl implements io.kubernetes.client.common.KubernetesObject {
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
@SerializedName(SERIALIZED_NAME_API_VERSION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
* AclList is a list of Acl
*/
@ApiModel(description = "AclList is a list of Acl")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-05T21:19:50.466Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-09T06:53:38.470Z[Etc/UTC]")
public class V1alpha1AclList implements io.kubernetes.client.common.KubernetesListObject {
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
@SerializedName(SERIALIZED_NAME_API_VERSION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
* A set of related ACL rules.
*/
@ApiModel(description = "A set of related ACL rules.")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-05T21:19:50.466Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-09T06:53:38.470Z[Etc/UTC]")
public class V1alpha1AclSpec {
/**
* The resource access method.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
* The resource being controlled.
*/
@ApiModel(description = "The resource being controlled.")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-05T21:19:50.466Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-09T06:53:38.470Z[Etc/UTC]")
public class V1alpha1AclSpecResource {
public static final String SERIALIZED_NAME_KIND = "kind";
@SerializedName(SERIALIZED_NAME_KIND)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
* Status, as set by the operator.
*/
@ApiModel(description = "Status, as set by the operator.")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-05T21:19:50.466Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-09T06:53:38.470Z[Etc/UTC]")
public class V1alpha1AclStatus {
public static final String SERIALIZED_NAME_MESSAGE = "message";
@SerializedName(SERIALIZED_NAME_MESSAGE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
* Kafka Topic
*/
@ApiModel(description = "Kafka Topic")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-05T21:19:50.466Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-09T06:53:38.470Z[Etc/UTC]")
public class V1alpha1KafkaTopic implements io.kubernetes.client.common.KubernetesObject {
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
@SerializedName(SERIALIZED_NAME_API_VERSION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
* KafkaTopicList is a list of KafkaTopic
*/
@ApiModel(description = "KafkaTopicList is a list of KafkaTopic")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-05T21:19:50.466Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-09T06:53:38.470Z[Etc/UTC]")
public class V1alpha1KafkaTopicList implements io.kubernetes.client.common.KubernetesListObject {
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
@SerializedName(SERIALIZED_NAME_API_VERSION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
* Desired Kafka topic configuration.
*/
@ApiModel(description = "Desired Kafka topic configuration.")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-05T21:19:50.466Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-09T06:53:38.470Z[Etc/UTC]")
public class V1alpha1KafkaTopicSpec {
public static final String SERIALIZED_NAME_CLIENT_CONFIGS = "clientConfigs";
@SerializedName(SERIALIZED_NAME_CLIENT_CONFIGS)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
/**
* V1alpha1KafkaTopicSpecClientConfigs
*/
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-05T21:19:50.466Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-09T06:53:38.470Z[Etc/UTC]")
public class V1alpha1KafkaTopicSpecClientConfigs {
public static final String SERIALIZED_NAME_CONFIG_MAP_REF = "configMapRef";
@SerializedName(SERIALIZED_NAME_CONFIG_MAP_REF)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
* Reference to a ConfigMap to use for AdminClient configuration.
*/
@ApiModel(description = "Reference to a ConfigMap to use for AdminClient configuration.")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-05T21:19:50.466Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-09T06:53:38.470Z[Etc/UTC]")
public class V1alpha1KafkaTopicSpecConfigMapRef {
public static final String SERIALIZED_NAME_NAME = "name";
@SerializedName(SERIALIZED_NAME_NAME)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
* Current state of the topic.
*/
@ApiModel(description = "Current state of the topic.")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-05T21:19:50.466Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-09T06:53:38.470Z[Etc/UTC]")
public class V1alpha1KafkaTopicStatus {
public static final String SERIALIZED_NAME_MESSAGE = "message";
@SerializedName(SERIALIZED_NAME_MESSAGE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
* Hoptimator Subscription
*/
@ApiModel(description = "Hoptimator Subscription")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-05T21:19:50.466Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-09T06:53:38.470Z[Etc/UTC]")
public class V1alpha1Subscription implements io.kubernetes.client.common.KubernetesObject {
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
@SerializedName(SERIALIZED_NAME_API_VERSION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
* SubscriptionList is a list of Subscription
*/
@ApiModel(description = "SubscriptionList is a list of Subscription")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-05T21:19:50.466Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-09T06:53:38.470Z[Etc/UTC]")
public class V1alpha1SubscriptionList implements io.kubernetes.client.common.KubernetesListObject {
public static final String SERIALIZED_NAME_API_VERSION = "apiVersion";
@SerializedName(SERIALIZED_NAME_API_VERSION)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
* Subscription spec
*/
@ApiModel(description = "Subscription spec")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-05T21:19:50.466Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-09T06:53:38.470Z[Etc/UTC]")
public class V1alpha1SubscriptionSpec {
public static final String SERIALIZED_NAME_DATABASE = "database";
@SerializedName(SERIALIZED_NAME_DATABASE)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,16 @@
* Filled in by the operator.
*/
@ApiModel(description = "Filled in by the operator.")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-05T21:19:50.466Z[Etc/UTC]")
@javax.annotation.Generated(value = "org.openapitools.codegen.languages.JavaClientCodegen", date = "2024-02-09T06:53:38.470Z[Etc/UTC]")
public class V1alpha1SubscriptionStatus {
public static final String SERIALIZED_NAME_ATTRIBUTES = "attributes";
@SerializedName(SERIALIZED_NAME_ATTRIBUTES)
private Map<String, String> attributes = null;

public static final String SERIALIZED_NAME_DOWNSTREAM_RESOURCES = "downstreamResources";
@SerializedName(SERIALIZED_NAME_DOWNSTREAM_RESOURCES)
private List<String> downstreamResources = null;

public static final String SERIALIZED_NAME_FAILED = "failed";
@SerializedName(SERIALIZED_NAME_FAILED)
private Boolean failed;
Expand All @@ -42,6 +50,10 @@ public class V1alpha1SubscriptionStatus {
@SerializedName(SERIALIZED_NAME_HINTS)
private Map<String, String> hints = null;

public static final String SERIALIZED_NAME_JOB_RESOURCES = "jobResources";
@SerializedName(SERIALIZED_NAME_JOB_RESOURCES)
private List<String> jobResources = null;

public static final String SERIALIZED_NAME_MESSAGE = "message";
@SerializedName(SERIALIZED_NAME_MESSAGE)
private String message;
Expand All @@ -59,6 +71,68 @@ public class V1alpha1SubscriptionStatus {
private String sql;


public V1alpha1SubscriptionStatus attributes(Map<String, String> attributes) {

this.attributes = attributes;
return this;
}

public V1alpha1SubscriptionStatus putAttributesItem(String key, String attributesItem) {
if (this.attributes == null) {
this.attributes = new HashMap<>();
}
this.attributes.put(key, attributesItem);
return this;
}

/**
* Physical attributes of the job and sink/output table.
* @return attributes
**/
@javax.annotation.Nullable
@ApiModelProperty(value = "Physical attributes of the job and sink/output table.")

public Map<String, String> getAttributes() {
return attributes;
}


public void setAttributes(Map<String, String> attributes) {
this.attributes = attributes;
}


public V1alpha1SubscriptionStatus downstreamResources(List<String> downstreamResources) {

this.downstreamResources = downstreamResources;
return this;
}

public V1alpha1SubscriptionStatus addDownstreamResourcesItem(String downstreamResourcesItem) {
if (this.downstreamResources == null) {
this.downstreamResources = new ArrayList<>();
}
this.downstreamResources.add(downstreamResourcesItem);
return this;
}

/**
* The yaml generated to implement the sink/output table.
* @return downstreamResources
**/
@javax.annotation.Nullable
@ApiModelProperty(value = "The yaml generated to implement the sink/output table.")

public List<String> getDownstreamResources() {
return downstreamResources;
}


public void setDownstreamResources(List<String> downstreamResources) {
this.downstreamResources = downstreamResources;
}


public V1alpha1SubscriptionStatus failed(Boolean failed) {

this.failed = failed;
Expand Down Expand Up @@ -113,6 +187,37 @@ public void setHints(Map<String, String> hints) {
}


public V1alpha1SubscriptionStatus jobResources(List<String> jobResources) {

this.jobResources = jobResources;
return this;
}

public V1alpha1SubscriptionStatus addJobResourcesItem(String jobResourcesItem) {
if (this.jobResources == null) {
this.jobResources = new ArrayList<>();
}
this.jobResources.add(jobResourcesItem);
return this;
}

/**
* The yaml generated to implement the job.
* @return jobResources
**/
@javax.annotation.Nullable
@ApiModelProperty(value = "The yaml generated to implement the job.")

public List<String> getJobResources() {
return jobResources;
}


public void setJobResources(List<String> jobResources) {
this.jobResources = jobResources;
}


public V1alpha1SubscriptionStatus message(String message) {

this.message = message;
Expand Down Expand Up @@ -174,11 +279,11 @@ public V1alpha1SubscriptionStatus addResourcesItem(String resourcesItem) {
}

/**
* The YAML generated to implement this pipeline.
* The yaml generated to implement this pipeline.
* @return resources
**/
@javax.annotation.Nullable
@ApiModelProperty(value = "The YAML generated to implement this pipeline.")
@ApiModelProperty(value = "The yaml generated to implement this pipeline.")

public List<String> getResources() {
return resources;
Expand Down Expand Up @@ -222,8 +327,11 @@ public boolean equals(Object o) {
return false;
}
V1alpha1SubscriptionStatus v1alpha1SubscriptionStatus = (V1alpha1SubscriptionStatus) o;
return Objects.equals(this.failed, v1alpha1SubscriptionStatus.failed) &&
return Objects.equals(this.attributes, v1alpha1SubscriptionStatus.attributes) &&
Objects.equals(this.downstreamResources, v1alpha1SubscriptionStatus.downstreamResources) &&
Objects.equals(this.failed, v1alpha1SubscriptionStatus.failed) &&
Objects.equals(this.hints, v1alpha1SubscriptionStatus.hints) &&
Objects.equals(this.jobResources, v1alpha1SubscriptionStatus.jobResources) &&
Objects.equals(this.message, v1alpha1SubscriptionStatus.message) &&
Objects.equals(this.ready, v1alpha1SubscriptionStatus.ready) &&
Objects.equals(this.resources, v1alpha1SubscriptionStatus.resources) &&
Expand All @@ -232,16 +340,19 @@ public boolean equals(Object o) {

@Override
public int hashCode() {
return Objects.hash(failed, hints, message, ready, resources, sql);
return Objects.hash(attributes, downstreamResources, failed, hints, jobResources, message, ready, resources, sql);
}


@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("class V1alpha1SubscriptionStatus {\n");
sb.append(" attributes: ").append(toIndentedString(attributes)).append("\n");
sb.append(" downstreamResources: ").append(toIndentedString(downstreamResources)).append("\n");
sb.append(" failed: ").append(toIndentedString(failed)).append("\n");
sb.append(" hints: ").append(toIndentedString(hints)).append("\n");
sb.append(" jobResources: ").append(toIndentedString(jobResources)).append("\n");
sb.append(" message: ").append(toIndentedString(message)).append("\n");
sb.append(" ready: ").append(toIndentedString(ready)).append("\n");
sb.append(" resources: ").append(toIndentedString(resources)).append("\n");
Expand Down
Loading
Loading