From 748a016bc5bb5c85eff655be32c15cc225f1160b Mon Sep 17 00:00:00 2001 From: Karanjot Singh Date: Thu, 13 Feb 2025 23:10:33 +0530 Subject: [PATCH 01/21] feat: Add models for workflow Signed-off-by: Karanjot Singh --- atlan/model/structs/workflow.go | 204 ++++++++++++++++++++++++++++++++ 1 file changed, 204 insertions(+) create mode 100644 atlan/model/structs/workflow.go diff --git a/atlan/model/structs/workflow.go b/atlan/model/structs/workflow.go new file mode 100644 index 0000000..3029ae1 --- /dev/null +++ b/atlan/model/structs/workflow.go @@ -0,0 +1,204 @@ +package structs + +// PackageParameter represents package-related parameters. +type PackageParameter struct { + Parameter string `json:"parameter"` + Type string `json:"type"` + Body map[string]interface{} `json:"body"` +} + +// WorkflowMetadata captures metadata about a workflow. +type WorkflowMetadata struct { + Annotations map[string]string `json:"annotations,omitempty"` + CreationTimestamp *string `json:"creationTimestamp,omitempty"` + GenerateName *string `json:"generateName,omitempty"` + Generation *int `json:"generation,omitempty"` + Labels map[string]string `json:"labels,omitempty"` + ManagedFields []interface{} `json:"managedFields,omitempty"` + Name *string `json:"name,omitempty"` + Namespace *string `json:"namespace,omitempty"` + ResourceVersion *string `json:"resourceVersion,omitempty"` + UID *string `json:"uid,omitempty"` +} + +// WorkflowTemplateRef references a specific workflow template. +type WorkflowTemplateRef struct { + Name string `json:"name"` + Template string `json:"template"` + ClusterScope bool `json:"clusterScope"` +} + +// NameValuePair represents a name-value pair. +type NameValuePair struct { + Name string `json:"name"` + Value interface{} `json:"value"` +} + +// WorkflowParameters holds parameters for workflow operations. +type WorkflowParameters struct { + Parameters []NameValuePair `json:"parameters"` +} + +// WorkflowTask represents a task in the workflow. +type WorkflowTask struct { + Name string `json:"name"` + Arguments WorkflowParameters `json:"arguments"` + TemplateRef WorkflowTemplateRef `json:"templateRef"` +} + +// WorkflowDAG represents a directed acyclic graph (DAG) of workflow tasks. +type WorkflowDAG struct { + Tasks []WorkflowTask `json:"tasks"` +} + +// WorkflowTemplate captures the structure of a workflow template. +type WorkflowTemplate struct { + Name string `json:"name"` + Inputs interface{} `json:"inputs,omitempty"` + Outputs interface{} `json:"outputs,omitempty"` + Metadata interface{} `json:"metadata,omitempty"` + DAG WorkflowDAG `json:"dag"` +} + +// WorkflowSpec defines the specification of a workflow. +type WorkflowSpec struct { + Entrypoint *string `json:"entrypoint,omitempty"` + Arguments interface{} `json:"arguments,omitempty"` + Templates []WorkflowTemplate `json:"templates,omitempty"` + WorkflowTemplateRef map[string]string `json:"workflowTemplateRef,omitempty"` + WorkflowMetadata *WorkflowMetadata `json:"workflowMetadata,omitempty"` +} + +// Workflow represents the primary workflow object. +type Workflow struct { + Metadata *WorkflowMetadata `json:"metadata"` + Spec *WorkflowSpec `json:"spec"` + Payload []PackageParameter `json:"payload,omitempty"` +} + +// WorkflowSearchResultStatus captures the status of a workflow search result. +type WorkflowSearchResultStatus struct { + ArtifactGCStatus map[string]interface{} `json:"artifactGCStatus,omitempty"` + ArtifactRepositoryRef interface{} `json:"artifactRepositoryRef,omitempty"` + CompressedNodes *string `json:"compressedNodes,omitempty"` + EstimatedDuration *int `json:"estimatedDuration,omitempty"` + Conditions []interface{} `json:"conditions,omitempty"` + Message *string `json:"message,omitempty"` + FinishedAt *string `json:"finishedAt,omitempty"` + Nodes interface{} `json:"nodes,omitempty"` + Outputs *WorkflowParameters `json:"outputs,omitempty"` + Phase *string `json:"phase,omitempty"` + Progress *string `json:"progress,omitempty"` + ResourcesDuration map[string]int `json:"resourcesDuration,omitempty"` + StartedAt *string `json:"startedAt,omitempty"` + StoredTemplates interface{} `json:"storedTemplates,omitempty"` + StoredWorkflowTemplateSpec interface{} `json:"storedWorkflowTemplateSpec,omitempty"` + Synchronization map[string]interface{} `json:"synchronization,omitempty"` +} + +// WorkflowSearchResultDetail contains detailed information about a workflow search result. +type WorkflowSearchResultDetail struct { + APIVersion string `json:"apiVersion"` + Kind string `json:"kind"` + Metadata WorkflowMetadata `json:"metadata"` + Spec WorkflowSpec `json:"spec"` + Status *WorkflowSearchResultStatus `json:"status,omitempty"` +} + +// WorkflowSearchResult captures the search result for a workflow. +type WorkflowSearchResult struct { + Index string `json:"_index"` + Type string `json:"_type"` + ID string `json:"_id"` + SeqNo interface{} `json:"_seq_no"` + PrimaryTerm interface{} `json:"_primary_term"` + Sort []interface{} `json:"sort"` + Source WorkflowSearchResultDetail `json:"_source"` +} + +// Status returns the workflow phase if available. +func (w *WorkflowSearchResult) Status() *string { + if w.Source.Status != nil { + return w.Source.Status.Phase + } + return nil +} + +// ToWorkflow converts a WorkflowSearchResult into a Workflow object. +func (w *WorkflowSearchResult) ToWorkflow() *Workflow { + return &Workflow{ + Spec: &w.Source.Spec, + Metadata: &w.Source.Metadata, + } +} + +// WorkflowSearchHits contains hits from a workflow search response. +type WorkflowSearchHits struct { + Total map[string]interface{} `json:"total"` + Hits []WorkflowSearchResult `json:"hits,omitempty"` +} + +// WorkflowSearchResponse represents the response from a workflow search. +type WorkflowSearchResponse struct { + Took *int `json:"took,omitempty"` + Hits WorkflowSearchHits `json:"hits"` + Shards map[string]interface{} `json:"_shards"` +} + +// ReRunRequest captures a request to rerun a workflow. +type ReRunRequest struct { + Namespace string `json:"namespace"` + ResourceKind string `json:"resourceKind"` + ResourceName string `json:"resourceName"` +} + +// WorkflowResponse represents a generic workflow response. +type WorkflowResponse struct { + Metadata *WorkflowMetadata `json:"metadata"` + Spec *WorkflowSpec `json:"spec"` + Payload []interface{} `json:"payload,omitempty"` +} + +// WorkflowRunResponse extends WorkflowResponse with status information. +type WorkflowRunResponse struct { + WorkflowResponse + Status WorkflowSearchResultStatus `json:"status"` +} + +// ScheduleQueriesSearchRequest defines a request for searching schedule queries. +type ScheduleQueriesSearchRequest struct { + StartDate string `json:"startDate"` + EndDate string `json:"endDate"` +} + +// WorkflowSchedule defines a workflow schedule. +type WorkflowSchedule struct { + Timezone string `json:"timezone"` + CronSchedule string `json:"cronSchedule"` +} + +// WorkflowScheduleSpec specifies details for a workflow schedule. +type WorkflowScheduleSpec struct { + Schedule *string `json:"schedule,omitempty"` + Timezone *string `json:"timezone,omitempty"` + WorkflowSpec *WorkflowSpec `json:"workflowSpec,omitempty"` + ConcurrencyPolicy *string `json:"concurrencyPolicy,omitempty"` + StartingDeadlineSeconds *int `json:"startingDeadlineSeconds,omitempty"` + SuccessfulJobsHistoryLimit *int `json:"successfulJobsHistoryLimit,omitempty"` + FailedJobsHistoryLimit *int `json:"failedJobsHistoryLimit,omitempty"` +} + +// WorkflowScheduleStatus captures the status of a workflow schedule. +type WorkflowScheduleStatus struct { + Active interface{} `json:"active,omitempty"` + Conditions interface{} `json:"conditions,omitempty"` + LastScheduledTime *string `json:"lastScheduledTime,omitempty"` +} + +// WorkflowScheduleResponse captures the response for a workflow schedule. +type WorkflowScheduleResponse struct { + Metadata *WorkflowMetadata `json:"metadata,omitempty"` + Spec *WorkflowScheduleSpec `json:"spec,omitempty"` + Status *WorkflowScheduleStatus `json:"status,omitempty"` + WorkflowMetadata *WorkflowMetadata `json:"workflowMetadata,omitempty"` +} From 8d5571bb90c953ae984731b0794e494278cd73c3 Mon Sep 17 00:00:00 2001 From: Karanjot Singh Date: Thu, 13 Feb 2025 23:13:23 +0530 Subject: [PATCH 02/21] feat: Add custom marshalling for WorkflowSearchRequest Signed-off-by: Karanjot Singh --- atlan/model/search.go | 49 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/atlan/model/search.go b/atlan/model/search.go index 3e7bb53..d4036c0 100644 --- a/atlan/model/search.go +++ b/atlan/model/search.go @@ -407,6 +407,55 @@ func (s *SortItem) ToJSON() map[string]interface{} { return map[string]interface{}{s.Field: sortField} } +/* +I don’t remember why we made a separate `ToJSON()` method for above search queries instead of using +the custom `MarshalJSON()` method, which would automatically be used when preparing a request. +I won’t change these methods for now, but I’ll make a separate JIRA ticket to refactor them and check if anything breaks. +*/ + +// WorkflowSearchRequest captures the request structure for workflow search. (Added here in order to avoid circular imports) +type WorkflowSearchRequest struct { + From int `json:"from"` + Size int `json:"size"` + TrackTotalHits bool `json:"track_total_hits"` + PostFilter Query `json:"post_filter,omitempty"` + Query Query `json:"query,omitempty"` + Sort []SortItem `json:"sort"` +} + +// MarshalJSON marshals the WorkflowSearchRequest to JSON. +func (w WorkflowSearchRequest) MarshalJSON() ([]byte, error) { + type Alias WorkflowSearchRequest + alias := Alias(w) + + var queryJSON map[string]interface{} + if w.Query != nil { + queryJSON = w.Query.ToJSON() + } + + var postFilterJSON map[string]interface{} + if w.PostFilter != nil { + postFilterJSON = w.PostFilter.ToJSON() + } + + var sortJSON []map[string]interface{} + for _, s := range w.Sort { + sortJSON = append(sortJSON, s.ToJSON()) + } + + return json.Marshal(&struct { + Query map[string]interface{} `json:"query,omitempty"` + PostFilter map[string]interface{} `json:"post_filter,omitempty"` + Sort []map[string]interface{} `json:"sort,omitempty"` + Alias + }{ + Query: queryJSON, + PostFilter: postFilterJSON, + Sort: sortJSON, + Alias: alias, + }) +} + // SearchRequest represents a search request in the Atlas search DSL. type SearchRequest struct { Attributes []string `json:"attributes,omitempty"` From cf849329b7d68795b32ea31a4df7fae8bdcc42fe Mon Sep 17 00:00:00 2001 From: Karanjot Singh Date: Thu, 13 Feb 2025 23:18:00 +0530 Subject: [PATCH 03/21] feat: Add WorkflowClient and Methods related to running, retrieving and updating workflows Signed-off-by: Karanjot Singh --- atlan/assets/client.go | 19 +- atlan/assets/constants.go | 101 +++++++++ atlan/assets/workflow_client.go | 353 ++++++++++++++++++++++++++++++++ 3 files changed, 464 insertions(+), 9 deletions(-) create mode 100644 atlan/assets/workflow_client.go diff --git a/atlan/assets/client.go b/atlan/assets/client.go index 3787786..d8c69f8 100644 --- a/atlan/assets/client.go +++ b/atlan/assets/client.go @@ -19,15 +19,16 @@ import ( // AtlanClient defines the Atlan API client structure. type AtlanClient struct { - Session *http.Client - host string - ApiKey string - requestParams map[string]interface{} - logger logger.Logger - RoleClient *RoleClient - GroupClient *GroupClient - UserClient *UserClient - TokenClient *TokenClient + Session *http.Client + host string + ApiKey string + requestParams map[string]interface{} + logger logger.Logger + RoleClient *RoleClient + GroupClient *GroupClient + UserClient *UserClient + TokenClient *TokenClient + WorkflowClient *WorkflowClient SearchAssets } diff --git a/atlan/assets/constants.go b/atlan/assets/constants.go index 1ca6b60..0eb034f 100644 --- a/atlan/assets/constants.go +++ b/atlan/assets/constants.go @@ -34,6 +34,17 @@ const ( // Tokens API TOKENS_API = "apikeys" + + // Workflows API + WORKFLOW_API = "workflows" + WORKFLOW_INDEX_API = "workflows/indexsearch" + WORKFLOW_INDEX_RUN_API = "runs/indexsearch" + SCHEDULE_QUERY_WORKFLOWS_SEARCH_API = "runs/cron/scheduleQueriesBetweenDuration" + SCHEDULE_QUERY_WORKFLOWS_MISSED_API = "runs/cron/missedScheduleQueriesBetweenDuration" + WORKFLOW_OWNER_RERUN_API = "workflows/triggerAsOwner" + WORKFLOW_RERUN_API = "workflows/submit" + WORKFLOW_RUN_API = "workflows?submit=true" + WORKFLOW_SCHEDULE_RUN = "runs" ) // API defines the structure of an API call. @@ -319,6 +330,96 @@ var ( Status: http.StatusOK, Endpoint: HeraclesEndpoint, } + + // Workflows + + SCHEDULE_QUERY_WORKFLOWS_SEARCH = API{ + Path: SCHEDULE_QUERY_WORKFLOWS_SEARCH_API, + Method: http.MethodGet, + Status: http.StatusOK, + Endpoint: HeraclesEndpoint, + } + + SCHEDULE_QUERY_WORKFLOWS_MISSED = API{ + Path: SCHEDULE_QUERY_WORKFLOWS_MISSED_API, + Method: http.MethodGet, + Status: http.StatusOK, + Endpoint: HeraclesEndpoint, + } + + WORKFLOW_INDEX_SEARCH = API{ + Path: WORKFLOW_INDEX_API, + Method: http.MethodPost, + Status: http.StatusOK, + Endpoint: HeraclesEndpoint, + } + + WORKFLOW_INDEX_RUN_SEARCH = API{ + Path: WORKFLOW_INDEX_RUN_API, + Method: http.MethodPost, + Status: http.StatusOK, + Endpoint: HeraclesEndpoint, + } + + // triggers a workflow using the current user's credentials + + WORKFLOW_RERUN = API{ + Path: WORKFLOW_RUN_API, + Method: http.MethodPost, + Status: http.StatusOK, + Endpoint: HeraclesEndpoint, + } + + // triggers a workflow using the workflow owner's credentials + + WORKFLOW_OWNER_RERUN = API{ + Path: WORKFLOW_OWNER_RERUN_API, + Method: http.MethodPost, + Status: http.StatusOK, + Endpoint: HeraclesEndpoint, + } + + WORKFLOW_UPDATE = API{ + Path: WORKFLOW_API + "/%s", + Method: http.MethodPost, + Status: http.StatusOK, + Endpoint: HeraclesEndpoint, + } + + WORKFLOW_ARCHIVE = API{ + Path: WORKFLOW_API + "/%s/archive", + Method: http.MethodPost, + Status: http.StatusOK, + Endpoint: HeraclesEndpoint, + } + + GET_ALL_SCHEDULE_RUNS = API{ + Path: WORKFLOW_SCHEDULE_RUN + "/cron", + Method: http.MethodGet, + Status: http.StatusOK, + Endpoint: HeraclesEndpoint, + } + + GET_SCHEDULE_RUN = API{ + Path: WORKFLOW_SCHEDULE_RUN + "/cron/%s", + Method: http.MethodGet, + Status: http.StatusOK, + Endpoint: HeraclesEndpoint, + } + + STOP_WORKFLOW_RUN = API{ + Path: WORKFLOW_SCHEDULE_RUN + "/%s/stop", + Method: http.MethodPost, + Status: http.StatusOK, + Endpoint: HeraclesEndpoint, + } + + WORKFLOW_CHANGE_OWNER = API{ + Path: WORKFLOW_API + "/%s/changeownership", + Method: http.MethodPost, + Status: http.StatusOK, + Endpoint: HeraclesEndpoint, + } ) // Constants for the Atlas search DSL diff --git a/atlan/assets/workflow_client.go b/atlan/assets/workflow_client.go new file mode 100644 index 0000000..d8d4df9 --- /dev/null +++ b/atlan/assets/workflow_client.go @@ -0,0 +1,353 @@ +package assets + +import ( + "encoding/json" + "fmt" + "github.com/atlanhq/atlan-go/atlan" + "github.com/atlanhq/atlan-go/atlan/model" + "github.com/atlanhq/atlan-go/atlan/model/structs" + "time" +) + +const ( + workflowRunSchedule = "orchestration.atlan.com/schedule" + workflowRunTimezone = "orchestration.atlan.com/timezone" +) + +type WorkflowClient struct { + *AtlanClient +} + +// FindByType searches for workflows by their type prefix. +func (w *WorkflowClient) FindByType(prefix atlan.WorkflowPackage, maxResults int) ([]structs.WorkflowSearchResult, error) { + + var query model.Query = &model.BoolQuery{ + Filter: []model.Query{ + &model.NestedQuery{ + Path: "metadata", + Query: &model.PrefixQuery{ + Field: "metadata.name.keyword", + Value: prefix.Name, + }, + }, + }, + } + + nestedPath := "metadata" + + // Add sorting based on creation timestamp in descending order. + sortItems := []model.SortItem{ + { + Order: atlan.SortOrderDescending, + Field: "metadata.creationTimestamp", + NestedPath: &nestedPath, + }, + } + + request := model.WorkflowSearchRequest{ + Query: query, + Size: maxResults, + Sort: sortItems, + } + + rawJSON, err := DefaultAtlanClient.CallAPI(&WORKFLOW_INDEX_SEARCH, nil, &request) + if err != nil { + return nil, err + } + var response structs.WorkflowSearchResponse + if err := json.Unmarshal(rawJSON, &response); err != nil { + return nil, err + } + return response.Hits.Hits, nil +} + +// FindByID searches for a workflow by its ID. +func (w *WorkflowClient) FindByID(id string) (*structs.WorkflowSearchResult, error) { + var query model.Query = &model.BoolQuery{ + Filter: []model.Query{ + &model.NestedQuery{ + Path: "metadata", + Query: &model.TermQuery{ + Field: "metadata.name.keyword", + Value: id, + }, + }, + }, + } + + request := model.WorkflowSearchRequest{ + Query: query, + Size: 1, + } + + rawJSON, err := DefaultAtlanClient.CallAPI(&WORKFLOW_INDEX_SEARCH, nil, &request) + if err != nil { + return nil, err + } + + var response structs.WorkflowSearchResponse + if err := json.Unmarshal(rawJSON, &response); err != nil { + return nil, err + } + + if len(response.Hits.Hits) > 0 { + return &response.Hits.Hits[0], nil + } + return nil, nil +} + +// FindRunByID searches for a workflow run by its ID. +func (w *WorkflowClient) FindRunByID(id string) (*structs.WorkflowSearchResult, error) { + var query model.Query = &model.BoolQuery{ + Filter: []model.Query{ + &model.TermQuery{ + Field: "_id", + Value: id, + }, + }, + } + + response, err := w.findRuns(query, 0, 1) + if err != nil { + return nil, err + } + + if len(response.Hits.Hits) > 0 { + return &response.Hits.Hits[0], nil + } + return nil, nil +} + +// findRuns retrieves existing workflow runs. +func (w *WorkflowClient) findRuns(query model.Query, from, size int) (*structs.WorkflowSearchResponse, error) { + request := model.WorkflowSearchRequest{ + Query: query, + From: from, + Size: size, + } + + rawJSON, err := DefaultAtlanClient.CallAPI(&WORKFLOW_INDEX_RUN_SEARCH, nil, &request) + if err != nil { + return nil, err + } + + var response structs.WorkflowSearchResponse + if err := json.Unmarshal(rawJSON, &response); err != nil { + return nil, err + } + + return &response, nil +} + +// FindLatestRun retrieves the latest run of a given workflow. +func (w *WorkflowClient) FindLatestRun(workflowName string) (*structs.WorkflowSearchResult, error) { + var query model.Query = &model.BoolQuery{ + Filter: []model.Query{ + &model.NestedQuery{ + Path: "spec", + Query: &model.TermQuery{ + Field: "spec.workflowTemplateRef.name.keyword", + Value: workflowName, + }, + }, + }, + } + + response, err := w.findRuns(query, 0, 1) + if err != nil { + return nil, err + } + + if len(response.Hits.Hits) > 0 { + return &response.Hits.Hits[0], nil + } + return nil, nil +} + +// FindCurrentRun retrieves the most current, still-running workflow. +func (w *WorkflowClient) FindCurrentRun(workflowName string) (*structs.WorkflowSearchResult, error) { + var query model.Query = &model.BoolQuery{ + Filter: []model.Query{ + &model.NestedQuery{ + Path: "spec", + Query: &model.TermQuery{ + Field: "spec.workflowTemplateRef.name.keyword", + Value: workflowName, + }, + }, + }, + } + + response, err := w.findRuns(query, 0, 50) + if err != nil { + return nil, err + } + + for _, result := range response.Hits.Hits { + if *result.Source.Status.Phase == atlan.AtlanWorkflowPhasePending.Name || *result.Source.Status.Phase == atlan.AtlanWorkflowPhaseRunning.Name { + return &result, nil + } + } + return nil, nil +} + +// GetRuns retrieves all workflow runs filtered by workflow name and phase. +func (w *WorkflowClient) GetRuns(workflowName string, workflowPhase atlan.AtlanWorkflowPhase, from, size int) ([]structs.WorkflowSearchResult, error) { + var query model.Query = &model.BoolQuery{ + Must: []model.Query{ + &model.NestedQuery{ + Path: "spec", + Query: &model.TermQuery{ + Field: "spec.workflowTemplateRef.name.keyword", + Value: workflowName, + }, + }, + }, + Filter: []model.Query{ + &model.TermQuery{ + Field: "status.phase.keyword", + Value: workflowPhase.Name, + }, + }, + } + + response, err := w.findRuns(query, from, size) + if err != nil { + return nil, err + } + + return response.Hits.Hits, nil +} + +// Stop stops a running workflow. +func (w *WorkflowClient) Stop(workflowRunID string) (*structs.WorkflowRunResponse, error) { + + api := &STOP_WORKFLOW_RUN + api.Path = fmt.Sprintf("runs/%s/stop", workflowRunID) + + rawJSON, err := DefaultAtlanClient.CallAPI(api, nil, "") + if err != nil { + return nil, err + } + + var response structs.WorkflowRunResponse + if err := json.Unmarshal(rawJSON, &response); err != nil { + return nil, err + } + + return &response, nil +} + +// Delete archives (deletes) the provided workflow. +func (w *WorkflowClient) Delete(workflowName string) error { + api := &WORKFLOW_ARCHIVE + api.Path = fmt.Sprintf("workflows/%s/archive", workflowName) + _, err := DefaultAtlanClient.CallAPI(api, nil, "") + return err +} + +// handleWorkflowTypes determines the workflow details based on its type. +func (w *WorkflowClient) handleWorkflowTypes(workflow interface{}) (*structs.WorkflowSearchResultDetail, error) { + switch wf := workflow.(type) { + case atlan.WorkflowPackage: + results, err := w.FindByType(wf, 1) // Fetching at most 1 result + if err != nil { + return nil, err + } + if len(results) == 0 { + return nil, fmt.Errorf("no prior run available for workflow: %v", wf) + } + return &results[0].Source, nil + + case structs.WorkflowSearchResult: + return &wf.Source, nil + + case *structs.WorkflowSearchResultDetail: + return wf, nil + + default: + return nil, fmt.Errorf("invalid workflow type provided") + } +} + +// Rerun executes the workflow immediately if it has been run before. +// If idempotent is true, it only reruns if not already running. +func (w *WorkflowClient) Rerun(workflow interface{}, idempotent bool) (*structs.WorkflowRunResponse, error) { + detail, err := w.handleWorkflowTypes(workflow) + if err != nil { + return nil, err + } + + if idempotent && *detail.Metadata.Name != "" { + // Wait before checking the current workflow run status + time.Sleep(10 * time.Second) + + currentRun, err := w.FindCurrentRun(*detail.Metadata.Name) + if err == nil && currentRun != nil && currentRun.Source.Status != nil { + return &structs.WorkflowRunResponse{ + WorkflowResponse: structs.WorkflowResponse{ + Metadata: ¤tRun.Source.Metadata, + Spec: ¤tRun.Source.Spec, + }, + Status: *currentRun.Source.Status, + }, nil + } + } + + request := structs.ReRunRequest{ + Namespace: *detail.Metadata.Namespace, + ResourceName: *detail.Metadata.Name, + } + + rawJSON, err := DefaultAtlanClient.CallAPI(&WORKFLOW_RERUN, nil, &request) + if err != nil { + return nil, err + } + + var response structs.WorkflowRunResponse + if err := json.Unmarshal(rawJSON, &response); err != nil { + return nil, err + } + + return &response, nil +} + +// Update modifies the configuration of an existing workflow. +func (w *WorkflowClient) Update(workflow *structs.Workflow) (*structs.WorkflowResponse, error) { + + api := &WORKFLOW_UPDATE + api.Path = fmt.Sprintf("workflows/%s", *workflow.Metadata.Name) + + rawJSON, err := DefaultAtlanClient.CallAPI(api, nil, workflow) + if err != nil { + return nil, err + } + + var response structs.WorkflowResponse + if err := json.Unmarshal(rawJSON, &response); err != nil { + return nil, err + } + + return &response, nil +} + +// UpdateOwner assigns a new owner to the workflow. +func (w *WorkflowClient) UpdateOwner(workflowName, username string) (*structs.WorkflowResponse, error) { + + api := &WORKFLOW_CHANGE_OWNER + api.Path = fmt.Sprintf("workflows/%s/changeownership", workflowName) + + queryParams := map[string]string{"username": username} + + rawJSON, err := DefaultAtlanClient.CallAPI(api, queryParams, nil) + if err != nil { + return nil, err + } + + var response structs.WorkflowResponse + if err := json.Unmarshal(rawJSON, &response); err != nil { + return nil, err + } + + return &response, nil +} From 214afd7baf83804dae77701a9d4307eff04c8bf2 Mon Sep 17 00:00:00 2001 From: Karanjot Singh Date: Thu, 13 Feb 2025 23:20:12 +0530 Subject: [PATCH 04/21] chore: Include main.go for examples related to using workflows Signed-off-by: Karanjot Singh --- main.go | 98 ++++++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 93 insertions(+), 5 deletions(-) diff --git a/main.go b/main.go index a51acb6..eced7d2 100644 --- a/main.go +++ b/main.go @@ -1,15 +1,103 @@ package main -import ( - _ "github.com/atlanhq/atlan-go/atlan" - "github.com/atlanhq/atlan-go/atlan/assets" - _ "github.com/atlanhq/atlan-go/atlan/model/structs" -) +import "github.com/atlanhq/atlan-go/atlan/assets" func main() { ctx := assets.NewContext() ctx.EnableLogging("debug") + /* + // Update a workflow + result, _ := ctx.WorkflowClient.FindByID("csa-admin-export-1739443119") + + workflowTask := result.Source.Spec.Templates[0].DAG.Tasks[0] + workflowParams := workflowTask.Arguments.Parameters + + fmt.Println(workflowTask) + fmt.Println(workflowParams) + + for _, option := range workflowParams { + if option.Name == "enable-lineage" { + option.Value = true + fmt.Println(option) + } + } + + response, err := ctx.WorkflowClient.Update(result.ToWorkflow()) + if err != nil { + fmt.Println(err) + } + fmt.Println(response) + */ + /* + // Delete a Workflow + _ = ctx.WorkflowClient.Delete("csa-admin-export-1739368706") + */ + /* + // Stop a running workflow + runs, err := ctx.WorkflowClient.GetRuns("csa-admin-export-1739368706", atlan.AtlanWorkflowPhaseRunning, 0, 100) + if err != nil { + fmt.Println(err) + } + response, err := ctx.WorkflowClient.Stop(runs[0].ID) + if err != nil { + fmt.Println(err) + } + fmt.Println(response) + + */ + /* + // Retrieve runs by their phase: + result, err := ctx.WorkflowClient.GetRuns("csa-admin-export-1739368706", atlan.AtlanWorkflowPhaseSuccess, 0, 100) + if err != nil { + fmt.Println(err) + } + fmt.Println(result) + */ + /* + // Retrieve an existing workflow latest run: + result, err := ctx.WorkflowClient.FindCurrentRun("csa-admin-export-1739368706") + if err != nil { + fmt.Println(err) + } + fmt.Println(result) + */ + /* + // Retrieve an existing workflow latest run: + result, err := ctx.WorkflowClient.FindLatestRun("csa-admin-export-1739172254") + if err != nil { + fmt.Println(err) + } + fmt.Println(*result.Source.Metadata.CreationTimestamp) + + */ + /* + // Retrieve an existing workflow run by its ID: + result, err := ctx.WorkflowClient.FindRunByID("csa-admin-export-1739172254-skdzt") + if err != nil { + fmt.Println(err) + } + fmt.Println(*result.Source.Metadata.CreationTimestamp) + */ + /* + // Retrieve an existing workflow by its ID: + result, err := ctx.WorkflowClient.FindByID("csa-admin-export-1739172254\n\n") + if err != nil { + fmt.Println(err) + } + fmt.Println(result) + + + */ + /* + // Retrieve existing workflows by its type: + result, err := ctx.WorkflowClient.FindByType(atlan.WorkflowPackageSnowflakeMiner, 5) + if err != nil { + fmt.Println(err) + } + fmt.Println(result) + + */ /* // Find the GUID of a specific policy in a persona PurposeName := "Test-go-sdk-Purpose" From dadb26626f5a1dbf3814650f2d46e68447f23ab0 Mon Sep 17 00:00:00 2001 From: Karanjot Singh Date: Thu, 13 Feb 2025 23:28:16 +0530 Subject: [PATCH 05/21] chore: fix linting errors Signed-off-by: Karanjot Singh --- atlan/assets/workflow_client.go | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/atlan/assets/workflow_client.go b/atlan/assets/workflow_client.go index d8d4df9..328436c 100644 --- a/atlan/assets/workflow_client.go +++ b/atlan/assets/workflow_client.go @@ -3,10 +3,11 @@ package assets import ( "encoding/json" "fmt" + "time" + "github.com/atlanhq/atlan-go/atlan" "github.com/atlanhq/atlan-go/atlan/model" "github.com/atlanhq/atlan-go/atlan/model/structs" - "time" ) const ( @@ -20,7 +21,6 @@ type WorkflowClient struct { // FindByType searches for workflows by their type prefix. func (w *WorkflowClient) FindByType(prefix atlan.WorkflowPackage, maxResults int) ([]structs.WorkflowSearchResult, error) { - var query model.Query = &model.BoolQuery{ Filter: []model.Query{ &model.NestedQuery{ @@ -221,7 +221,6 @@ func (w *WorkflowClient) GetRuns(workflowName string, workflowPhase atlan.AtlanW // Stop stops a running workflow. func (w *WorkflowClient) Stop(workflowRunID string) (*structs.WorkflowRunResponse, error) { - api := &STOP_WORKFLOW_RUN api.Path = fmt.Sprintf("runs/%s/stop", workflowRunID) @@ -314,7 +313,6 @@ func (w *WorkflowClient) Rerun(workflow interface{}, idempotent bool) (*structs. // Update modifies the configuration of an existing workflow. func (w *WorkflowClient) Update(workflow *structs.Workflow) (*structs.WorkflowResponse, error) { - api := &WORKFLOW_UPDATE api.Path = fmt.Sprintf("workflows/%s", *workflow.Metadata.Name) @@ -333,7 +331,6 @@ func (w *WorkflowClient) Update(workflow *structs.Workflow) (*structs.WorkflowRe // UpdateOwner assigns a new owner to the workflow. func (w *WorkflowClient) UpdateOwner(workflowName, username string) (*structs.WorkflowResponse, error) { - api := &WORKFLOW_CHANGE_OWNER api.Path = fmt.Sprintf("workflows/%s/changeownership", workflowName) From 70035acd14e78185a2b4bc2da15b59c1c20dcec7 Mon Sep 17 00:00:00 2001 From: Karanjot Singh Date: Fri, 14 Feb 2025 03:53:44 +0530 Subject: [PATCH 06/21] feat: Add workflow as an Asset(used in searching) Signed-off-by: Karanjot Singh --- atlan/assets/asset.go | 61 +++++++++++++++++++++++++++++++++ atlan/model/structs/workflow.go | 58 ++++++++++++++++++++++--------- 2 files changed, 102 insertions(+), 17 deletions(-) diff --git a/atlan/assets/asset.go b/atlan/assets/asset.go index 52856fd..ddfc240 100644 --- a/atlan/assets/asset.go +++ b/atlan/assets/asset.go @@ -316,6 +316,18 @@ type PurposeFields struct { PURPOSE_CLASSIFICATIONS *KeywordField } +type WorkflowFields struct { + AssetFields + WORKFLOW_TEMPLATE_GUID *KeywordField + WORKFLOW_TYPE *KeywordField + WORKFLOW_CONFIG *TextField + WORKFLOW_STATUS *KeywordField + WORKFLOW_RUN_EXPIRES_IN *TextField + WORKFLOW_CREATED_BY *KeywordField + WORKFLOW_UPDATED_BY *KeywordField + WORKFLOW_DELETED_AT *NumericField +} + // NewSearchTable returns a new AtlasTable object for Searching func NewSearchTable() *AtlasTableFields { return &AtlasTableFields{ @@ -1022,6 +1034,55 @@ func NewPurposeFields() *PurposeFields { } } +// NewWorkflowFields initializes and returns a WorkflowFields struct. +func NewWorkflowFields() *WorkflowFields { + return &WorkflowFields{ + AssetFields: AssetFields{ + AttributesFields: AttributesFields{ + TYPENAME: NewKeywordTextField("typeName", "__typeName.keyword", "__typeName"), + GUID: NewKeywordField("guid", "__guid"), + CREATED_BY: NewKeywordField("createdBy", "__createdBy"), + UPDATED_BY: NewKeywordField("updatedBy", "__modifiedBy"), + STATUS: NewKeywordField("status", "__state"), + ATLAN_TAGS: NewKeywordTextField("classificationNames", "__traitNames", "__classificationsText"), + PROPOGATED_ATLAN_TAGS: NewKeywordTextField("classificationNames", "__propagatedTraitNames", "__classificationsText"), + ASSIGNED_TERMS: NewKeywordTextField("meanings", "__meanings", "__meaningsText"), + SUPERTYPE_NAMES: NewKeywordTextField("typeName", "__superTypeNames.keyword", "__superTypeNames"), + CREATE_TIME: NewNumericField("createTime", "__timestamp"), + UPDATE_TIME: NewNumericField("updateTime", "__modificationTimestamp"), + QUALIFIED_NAME: NewKeywordTextField("qualifiedName", "qualifiedName", "qualifiedName.text"), + }, + NAME: NewKeywordTextStemmedField("name", "name.keyword", "name", "name"), + DISPLAY_NAME: NewKeywordTextField("displayName", "displayName.keyword", "displayName"), + DESCRIPTION: NewKeywordTextField("description", "description", "description.text"), + USER_DESCRIPTION: NewKeywordTextField("userDescription", "userDescription", "userDescription.text"), + TENET_ID: NewKeywordField("tenetId", "tenetId"), + CERTIFICATE_STATUS: NewKeywordTextField("certificateStatus", "certificateStatus", "certificateStatus.text"), + CERTIFICATE_STATUS_MESSAGE: NewKeywordField("certificateStatusMessage", "certificateStatusMessage"), + CERTIFICATE_UPDATED_BY: NewNumericField("certificateUpdatedBy", "certificateUpdatedBy"), + ANNOUNCEMENT_TITLE: NewKeywordField("announcementTitle", "announcementTitle"), + ANNOUNCEMENT_MESSAGE: NewKeywordTextField("announcementMessage", "announcementMessage", "announcementMessage.text"), + ANNOUNCEMENT_TYPE: NewKeywordField("announcementType", "announcementType"), + ANNOUNCEMENT_UPDATED_AT: NewNumericField("announcementUpdatedAt", "announcementUpdatedAt"), + ANNOUNCEMENT_UPDATED_BY: NewKeywordField("announcementUpdatedBy", "announcementUpdatedBy"), + OWNER_USERS: NewKeywordTextField("ownerUsers", "ownerUsers", "ownerUsers.text"), + ADMIN_USERS: NewKeywordField("adminUsers", "adminUsers"), + VIEWER_USERS: NewKeywordField("viewerUsers", "viewerUsers"), + VIEWER_GROUPS: NewKeywordField("viewerGroups", "viewerGroups"), + CONNECTOR_NAME: NewKeywordTextField("connectorName", "connectorName", "connectorName.text"), + CONNECTION_QUALIFIED_NAME: NewKeywordTextField("connectionQualifiedName", "connectionQualifiedName", "connectionQualifiedName.text"), + }, + WORKFLOW_TEMPLATE_GUID: NewKeywordField("workflowTemplateGuid", "workflowTemplateGuid"), + WORKFLOW_TYPE: NewKeywordField("workflowType", "workflowType"), + WORKFLOW_CONFIG: NewTextField("workflowConfig", "workflowConfig"), + WORKFLOW_STATUS: NewKeywordField("workflowStatus", "workflowStatus"), + WORKFLOW_RUN_EXPIRES_IN: NewTextField("workflowRunExpiresIn", "workflowRunExpiresIn"), + WORKFLOW_CREATED_BY: NewKeywordField("workflowCreatedBy", "workflowCreatedBy"), + WORKFLOW_UPDATED_BY: NewKeywordField("workflowUpdatedBy", "workflowUpdatedBy"), + WORKFLOW_DELETED_AT: NewNumericField("workflowDeletedAt", "workflowDeletedAt"), + } +} + // Methods on assets // GetbyGuid retrieves an asset by guid diff --git a/atlan/model/structs/workflow.go b/atlan/model/structs/workflow.go index 3029ae1..9e6eadf 100644 --- a/atlan/model/structs/workflow.go +++ b/atlan/model/structs/workflow.go @@ -1,5 +1,11 @@ package structs +import ( + "time" + + "github.com/atlanhq/atlan-go/atlan" +) + // PackageParameter represents package-related parameters. type PackageParameter struct { Parameter string `json:"parameter"` @@ -76,24 +82,42 @@ type Workflow struct { Payload []PackageParameter `json:"payload,omitempty"` } +// WorkflowAsset defines the Asset structure for a workflow. +type WorkflowAsset struct { + Asset + WorkflowAttributes *WorkflowAttributes `json:"workflowAttributes"` +} + +// WorkflowAttributes captures attributes of a workflow. +type WorkflowAttributes struct { + WorkflowTemplateGuid *string `json:"workflowTemplateGuid,omitempty"` + WorkflowType *string `json:"workflowType,omitempty"` + WorkflowConfig *string `json:"workflowConfig,omitempty"` + WorkflowStatus *string `json:"workflowStatus,omitempty"` + WorkflowRunExpiresIn *string `json:"workflowRunExpiresIn,omitempty"` + WorkflowCreatedBy *string `json:"workflowCreatedBy,omitempty"` + WorkflowUpdatedBy *string `json:"workflowUpdatedBy,omitempty"` + WorkflowDeletedAt *time.Time `json:"workflowDeletedAt,omitempty"` +} + // WorkflowSearchResultStatus captures the status of a workflow search result. type WorkflowSearchResultStatus struct { - ArtifactGCStatus map[string]interface{} `json:"artifactGCStatus,omitempty"` - ArtifactRepositoryRef interface{} `json:"artifactRepositoryRef,omitempty"` - CompressedNodes *string `json:"compressedNodes,omitempty"` - EstimatedDuration *int `json:"estimatedDuration,omitempty"` - Conditions []interface{} `json:"conditions,omitempty"` - Message *string `json:"message,omitempty"` - FinishedAt *string `json:"finishedAt,omitempty"` - Nodes interface{} `json:"nodes,omitempty"` - Outputs *WorkflowParameters `json:"outputs,omitempty"` - Phase *string `json:"phase,omitempty"` - Progress *string `json:"progress,omitempty"` - ResourcesDuration map[string]int `json:"resourcesDuration,omitempty"` - StartedAt *string `json:"startedAt,omitempty"` - StoredTemplates interface{} `json:"storedTemplates,omitempty"` - StoredWorkflowTemplateSpec interface{} `json:"storedWorkflowTemplateSpec,omitempty"` - Synchronization map[string]interface{} `json:"synchronization,omitempty"` + ArtifactGCStatus map[string]interface{} `json:"artifactGCStatus,omitempty"` + ArtifactRepositoryRef interface{} `json:"artifactRepositoryRef,omitempty"` + CompressedNodes *string `json:"compressedNodes,omitempty"` + EstimatedDuration *int `json:"estimatedDuration,omitempty"` + Conditions []interface{} `json:"conditions,omitempty"` + Message *string `json:"message,omitempty"` + FinishedAt *string `json:"finishedAt,omitempty"` + Nodes interface{} `json:"nodes,omitempty"` + Outputs *WorkflowParameters `json:"outputs,omitempty"` + Phase *atlan.AtlanWorkflowPhase `json:"phase,omitempty"` + Progress *string `json:"progress,omitempty"` + ResourcesDuration map[string]int `json:"resourcesDuration,omitempty"` + StartedAt *string `json:"startedAt,omitempty"` + StoredTemplates interface{} `json:"storedTemplates,omitempty"` + StoredWorkflowTemplateSpec interface{} `json:"storedWorkflowTemplateSpec,omitempty"` + Synchronization map[string]interface{} `json:"synchronization,omitempty"` } // WorkflowSearchResultDetail contains detailed information about a workflow search result. @@ -117,7 +141,7 @@ type WorkflowSearchResult struct { } // Status returns the workflow phase if available. -func (w *WorkflowSearchResult) Status() *string { +func (w *WorkflowSearchResult) Status() *atlan.AtlanWorkflowPhase { if w.Source.Status != nil { return w.Source.Status.Phase } From 007d3e5d8c06e9fe2f5e2647c1083f10c135cfab Mon Sep 17 00:00:00 2001 From: Karanjot Singh Date: Fri, 14 Feb 2025 03:55:11 +0530 Subject: [PATCH 07/21] feat: Add methods related to workflow schedules such as Retrieving, Adding, Removing schedules Signed-off-by: Karanjot Singh --- atlan/assets/workflow_client.go | 205 +++++++++++++++++++++++++++++++- 1 file changed, 204 insertions(+), 1 deletion(-) diff --git a/atlan/assets/workflow_client.go b/atlan/assets/workflow_client.go index 328436c..3989298 100644 --- a/atlan/assets/workflow_client.go +++ b/atlan/assets/workflow_client.go @@ -3,6 +3,7 @@ package assets import ( "encoding/json" "fmt" + "log" "time" "github.com/atlanhq/atlan-go/atlan" @@ -13,6 +14,7 @@ import ( const ( workflowRunSchedule = "orchestration.atlan.com/schedule" workflowRunTimezone = "orchestration.atlan.com/timezone" + MonitorSleepSeconds = 5 ) type WorkflowClient struct { @@ -184,7 +186,7 @@ func (w *WorkflowClient) FindCurrentRun(workflowName string) (*structs.WorkflowS } for _, result := range response.Hits.Hits { - if *result.Source.Status.Phase == atlan.AtlanWorkflowPhasePending.Name || *result.Source.Status.Phase == atlan.AtlanWorkflowPhaseRunning.Name { + if *result.Source.Status.Phase == atlan.AtlanWorkflowPhasePending || *result.Source.Status.Phase == atlan.AtlanWorkflowPhaseRunning { return &result, nil } } @@ -348,3 +350,204 @@ func (w *WorkflowClient) UpdateOwner(workflowName, username string) (*structs.Wo return &response, nil } + +// Methods related to workflow schedules + +// Monitor the status of the workflow's run. +func (w *WorkflowClient) Monitor(workflowResponse *structs.WorkflowResponse, logger *log.Logger) (*atlan.AtlanWorkflowPhase, error) { + if workflowResponse.Metadata == nil || *workflowResponse.Metadata.Name == "" { + if logger != nil { + logger.Println("Skipping workflow monitoring — nothing to monitor.") + } + return nil, nil + } + + name := workflowResponse.Metadata.Name + var status *atlan.AtlanWorkflowPhase + + for status == nil || (*status != atlan.AtlanWorkflowPhaseSuccess && *status != atlan.AtlanWorkflowPhaseError && *status != atlan.AtlanWorkflowPhaseFailed) { + time.Sleep(MonitorSleepSeconds * time.Second) + runDetails, _ := w.FindLatestRun(*name) + if runDetails != nil { + status = runDetails.Status() + } + if logger != nil { + logger.Printf("Workflow status: %s\n", status) + } + } + if logger != nil { + logger.Printf("Workflow completion status: %s\n", status) + } + return status, nil +} + +func (w *WorkflowClient) addSchedule(workflow *structs.WorkflowSearchResultDetail, schedule *structs.WorkflowSchedule) { + if workflow.Metadata.Annotations == nil { + workflow.Metadata.Annotations = make(map[string]string) + } + workflow.Metadata.Annotations[workflowRunSchedule] = schedule.CronSchedule + workflow.Metadata.Annotations[workflowRunTimezone] = schedule.Timezone +} + +func (w *WorkflowClient) AddSchedule(workflow interface{}, schedule *structs.WorkflowSchedule) (*structs.WorkflowResponse, error) { + workflowToUpdate, err := w.handleWorkflowTypes(workflow) + if err != nil { + return nil, err + } + + w.addSchedule(workflowToUpdate, schedule) + + api := &WORKFLOW_UPDATE + api.Path = fmt.Sprintf("workflows/%s", *workflowToUpdate.Metadata.Name) + + rawJSON, err := DefaultAtlanClient.CallAPI(api, nil, workflowToUpdate) + if err != nil { + return nil, err + } + + var response structs.WorkflowResponse + if err := json.Unmarshal(rawJSON, &response); err != nil { + return nil, err + } + return &response, nil +} + +func (w *WorkflowClient) RemoveSchedule(workflow interface{}) (*structs.WorkflowResponse, error) { + workflowToUpdate, err := w.handleWorkflowTypes(workflow) + if err != nil { + return nil, err + } + + if workflowToUpdate.Metadata.Annotations != nil { + delete(workflowToUpdate.Metadata.Annotations, workflowRunSchedule) + } + + api := &WORKFLOW_UPDATE + api.Path = fmt.Sprintf("workflows/%s", *workflowToUpdate.Metadata.Name) + + rawJSON, err := DefaultAtlanClient.CallAPI(api, nil, workflowToUpdate) + if err != nil { + return nil, err + } + + var response structs.WorkflowResponse + if err := json.Unmarshal(rawJSON, &response); err != nil { + return nil, err + } + return &response, nil +} + +func (w *WorkflowClient) GetAllScheduledRuns() ([]structs.WorkflowScheduleResponse, error) { + rawJSON, err := DefaultAtlanClient.CallAPI(&GET_ALL_SCHEDULE_RUNS, nil, nil) + if err != nil { + return nil, err + } + + var response []structs.WorkflowScheduleResponse + if err := json.Unmarshal(rawJSON, &response); err != nil { + return nil, err + } + return response, nil +} + +func (w *WorkflowClient) GetScheduledRun(workflowName string) (*structs.WorkflowScheduleResponse, error) { + api := &GET_SCHEDULE_RUN + api.Path = fmt.Sprintf("schedules/%s/cron", workflowName) + + rawJSON, err := DefaultAtlanClient.CallAPI(api, nil, nil) + if err != nil { + return nil, err + } + + var response structs.WorkflowScheduleResponse + if err := json.Unmarshal(rawJSON, &response); err != nil { + return nil, err + } + return &response, nil +} + +func (w *WorkflowClient) FindScheduleQuery(savedQueryID string, maxResults int) ([]structs.WorkflowSearchResult, error) { + if maxResults <= 0 { + maxResults = 10 + } + + var query model.Query = &model.BoolQuery{ + Filter: []model.Query{ + &model.NestedQuery{ + Path: "metadata", + Query: &model.PrefixQuery{ + Field: "metadata.name.keyword", + Value: fmt.Sprintf("asq-%s", savedQueryID), + }, + }, + &model.NestedQuery{ + Path: "metadata", + Query: &model.TermQuery{ + Field: "metadata.annotations.package.argoproj.io/name.keyword", + Value: "@atlan/schedule-query", + }, + }, + }, + } + + request := model.WorkflowSearchRequest{ + Query: query, + Size: maxResults, + } + + rawJSON, err := DefaultAtlanClient.CallAPI(&WORKFLOW_INDEX_SEARCH, nil, request) + if err != nil { + return nil, err + } + + var response structs.WorkflowSearchResponse + if err := json.Unmarshal(rawJSON, &response); err != nil { + return nil, err + } + + if response.Hits.Hits != nil { + return response.Hits.Hits, nil + } + return nil, nil +} + +func (w *WorkflowClient) ReRunScheduleQuery(scheduleQueryID string) (*structs.WorkflowRunResponse, error) { + request := structs.ReRunRequest{ + Namespace: "default", + ResourceName: scheduleQueryID, + } + + rawJSON, err := DefaultAtlanClient.CallAPI(&WORKFLOW_OWNER_RERUN, nil, &request) + if err != nil { + return nil, err + } + + var response structs.WorkflowRunResponse + if err := json.Unmarshal(rawJSON, &response); err != nil { + return nil, err + } + return &response, nil +} + +func (w *WorkflowClient) FindScheduleQueryBetween(request structs.ScheduleQueriesSearchRequest, missed bool) ([]structs.WorkflowRunResponse, error) { + queryParams := map[string]string{ + "startDate": request.StartDate, + "endDate": request.EndDate, + } + + searchAPI := SCHEDULE_QUERY_WORKFLOWS_SEARCH + if missed { + searchAPI = SCHEDULE_QUERY_WORKFLOWS_MISSED + } + + rawJSON, err := DefaultAtlanClient.CallAPI(&searchAPI, queryParams, nil) + if err != nil { + return nil, err + } + + var response []structs.WorkflowRunResponse + if err := json.Unmarshal(rawJSON, &response); err != nil { + return nil, err + } + return response, nil +} From fec6dee8e67e48401c1bc4ed5843d83b932676f0 Mon Sep 17 00:00:00 2001 From: Karanjot Singh Date: Mon, 17 Feb 2025 23:27:14 +0530 Subject: [PATCH 08/21] feat: add method for running a workflow with optional schedule Signed-off-by: Karanjot Singh --- atlan/assets/workflow_client.go | 27 +++++++++++++++++++++++++++ atlan/model/structs/workflow.go | 7 ++++--- 2 files changed, 31 insertions(+), 3 deletions(-) diff --git a/atlan/assets/workflow_client.go b/atlan/assets/workflow_client.go index 3989298..c060967 100644 --- a/atlan/assets/workflow_client.go +++ b/atlan/assets/workflow_client.go @@ -2,6 +2,7 @@ package assets import ( "encoding/json" + "errors" "fmt" "log" "time" @@ -551,3 +552,29 @@ func (w *WorkflowClient) FindScheduleQueryBetween(request structs.ScheduleQuerie } return response, nil } + +// Run executes an Atlan workflow with an optional schedule. +func (w *WorkflowClient) Run(workflow interface{}, schedule *structs.WorkflowSchedule) (*structs.WorkflowResponse, error) { + if workflow == nil { + return nil, errors.New("workflow cannot be nil") + } + + if schedule != nil { + _, err := w.AddSchedule(workflow, schedule) + if err != nil { + return nil, err + } + } + + responseData, err := DefaultAtlanClient.CallAPI(&WORKFLOW_RUN, nil, workflow) + if err != nil { + return nil, fmt.Errorf("error executing workflow: %w", err) + } + + var workflowResponse structs.WorkflowResponse + if err := json.Unmarshal(responseData, &workflowResponse); err != nil { + return nil, fmt.Errorf("failed to parse workflow response: %w", err) + } + + return &workflowResponse, nil +} diff --git a/atlan/model/structs/workflow.go b/atlan/model/structs/workflow.go index 9e6eadf..3c662e6 100644 --- a/atlan/model/structs/workflow.go +++ b/atlan/model/structs/workflow.go @@ -1,6 +1,7 @@ package structs import ( + "encoding/json" "time" "github.com/atlanhq/atlan-go/atlan" @@ -8,9 +9,9 @@ import ( // PackageParameter represents package-related parameters. type PackageParameter struct { - Parameter string `json:"parameter"` - Type string `json:"type"` - Body map[string]interface{} `json:"body"` + Parameter string `json:"parameter"` + Type string `json:"type"` + Body json.RawMessage `json:"body"` } // WorkflowMetadata captures metadata about a workflow. From 6fd67f9cfdb76d41990f9832286ec8a7be54cd3c Mon Sep 17 00:00:00 2001 From: Karanjot Singh Date: Mon, 17 Feb 2025 23:28:41 +0530 Subject: [PATCH 09/21] feat: Implement abstract methods and structures for overriding by custom packages Signed-off-by: Karanjot Singh --- atlan/assets/abstract_package.go | 98 ++++++++++++++++++++++++++++++++ atlan/assets/constants.go | 7 +++ 2 files changed, 105 insertions(+) create mode 100644 atlan/assets/abstract_package.go diff --git a/atlan/assets/abstract_package.go b/atlan/assets/abstract_package.go new file mode 100644 index 0000000..83566ba --- /dev/null +++ b/atlan/assets/abstract_package.go @@ -0,0 +1,98 @@ +package assets + +import ( + "github.com/atlanhq/atlan-go/atlan/model/structs" + "time" +) + +import "encoding/json" + +// AbstractPackage represents a base package +type AbstractPackage struct { + Parameters []structs.NameValuePair + CredentialsBody map[string]interface{} + PackageName string + PackagePrefix string +} + +// NewAbstractPackage initializes an abstract package +func NewAbstractPackage(packageName, packagePrefix string) *AbstractPackage { + return &AbstractPackage{ + PackageName: packageName, + PackagePrefix: packagePrefix, + Parameters: []structs.NameValuePair{}, + CredentialsBody: map[string]interface{}{}, + } +} + +func (p *AbstractPackage) ToWorkflow() *structs.Workflow { + metadata := p.GetMetadata() + + spec := structs.WorkflowSpec{ + Entrypoint: structs.StringPtr("main"), + Templates: []structs.WorkflowTemplate{ + { + Name: "main", + DAG: structs.WorkflowDAG{ + Tasks: []structs.WorkflowTask{ + { + Name: "run", + Arguments: structs.WorkflowParameters{ + Parameters: p.Parameters, + }, + TemplateRef: structs.WorkflowTemplateRef{ + Name: p.PackagePrefix, + Template: "main", + ClusterScope: true, + }, + }, + }, + }, + }, + }, + WorkflowMetadata: metadata, + } + + var payload []structs.PackageParameter + if len(p.CredentialsBody) > 0 { + credJSON, _ := json.Marshal(p.CredentialsBody) + payload = append(payload, structs.PackageParameter{ + Parameter: "credentialGuid", + Type: "credential", + Body: credJSON, + }) + } + + return &structs.Workflow{ + Metadata: metadata, + Spec: &spec, + Payload: payload, + } +} + +// GetMetadata should be implemented by subclasses +func (p *AbstractPackage) GetMetadata() *structs.WorkflowMetadata { + // Default (empty) metadata implementation, to be overridden by child structs + return &structs.WorkflowMetadata{} +} + +// AbstractMiner represents a base miner package +type AbstractMiner struct { + *AbstractPackage + Epoch int64 +} + +// NewAbstractMiner initializes an abstract miner +func NewAbstractMiner(connectionQualifiedName, packageName, packagePrefix string) *AbstractMiner { + epoch := time.Now().Unix() + packageInstance := NewAbstractPackage(packageName, packagePrefix) + packageInstance.Parameters = append(packageInstance.Parameters, structs.NameValuePair{ + Name: "connection-qualified-name", + Value: connectionQualifiedName, + }) + + return &AbstractMiner{ + AbstractPackage: packageInstance, + Epoch: epoch, + } +} diff --git a/atlan/assets/constants.go b/atlan/assets/constants.go index 0eb034f..8d94f55 100644 --- a/atlan/assets/constants.go +++ b/atlan/assets/constants.go @@ -420,6 +420,13 @@ var ( Status: http.StatusOK, Endpoint: HeraclesEndpoint, } + + WORKFLOW_RUN = API{ + Path: WORKFLOW_RUN_API, + Method: http.MethodPost, + Status: http.StatusOK, + Endpoint: HeraclesEndpoint, + } ) // Constants for the Atlas search DSL From 56b8c5cfced00e76239fd785e8837e0992c0015a Mon Sep 17 00:00:00 2001 From: Karanjot Singh Date: Mon, 17 Feb 2025 23:29:38 +0530 Subject: [PATCH 10/21] feat: Implement snowflake miner Signed-off-by: Karanjot Singh --- atlan/assets/snowflake_miner.go | 131 ++++++++++++++++++++++++++++++++ 1 file changed, 131 insertions(+) create mode 100644 atlan/assets/snowflake_miner.go diff --git a/atlan/assets/snowflake_miner.go b/atlan/assets/snowflake_miner.go new file mode 100644 index 0000000..d1b7c8c --- /dev/null +++ b/atlan/assets/snowflake_miner.go @@ -0,0 +1,131 @@ +package assets + +import ( + "encoding/json" + "fmt" + "github.com/atlanhq/atlan-go/atlan" + "strconv" + + "github.com/atlanhq/atlan-go/atlan/model/structs" +) + +// SnowflakeMiner represents a Snowflake miner package +type SnowflakeMiner struct { + *AbstractMiner + AdvancedConfig bool + Name string + PackageName string + PackagePrefix string + ConnectorType string + PackageIcon string + PackageLogo string +} + +// NewSnowflakeMiner initializes a Snowflake miner +func NewSnowflakeMiner(connectionQualifiedName string) *SnowflakeMiner { + return &SnowflakeMiner{ + AbstractMiner: NewAbstractMiner( + connectionQualifiedName, + "@atlan/snowflake-miner", + "atlan-snowflake-miner", + ), + AdvancedConfig: false, + Name: "snowflake", + PackageName: "@atlan/snowflake-miner", + PackagePrefix: atlan.WorkflowPackageSnowflakeMiner.Name, + ConnectorType: "snowflake", + PackageIcon: "https://docs.snowflake.com/en/_images/logo-snowflake-sans-text.png", + PackageLogo: "https://1amiydhcmj36tz3733v94f15-wpengine.netdna-ssl.com/wp-content/themes/snowflake/assets/img/logo-blue.svg", + } +} + +// Direct sets up the miner to extract directly from Snowflake +func (s *SnowflakeMiner) Direct(startEpoch int64, database, schema string) *SnowflakeMiner { + if database == "" && schema == "" { + s.Parameters = append(s.Parameters, structs.NameValuePair{Name: "snowflake-database", Value: "default"}) + } else { + s.Parameters = append(s.Parameters, structs.NameValuePair{Name: "database-name", Value: database}) + s.Parameters = append(s.Parameters, structs.NameValuePair{Name: "schema-name", Value: schema}) + } + + s.Parameters = append(s.Parameters, structs.NameValuePair{Name: "extraction-method", Value: "query_history"}) + s.Parameters = append(s.Parameters, structs.NameValuePair{Name: "miner-start-time-epoch", Value: strconv.FormatInt(startEpoch, 10)}) + + return s +} + +// ExcludeUsers excludes certain users from usage metrics +func (s *SnowflakeMiner) ExcludeUsers(users []string) *SnowflakeMiner { + userJSON, _ := json.Marshal(users) + s.Parameters = append(s.Parameters, structs.NameValuePair{Name: "popularity-exclude-user-config", Value: string(userJSON)}) + return s +} + +// PopularityWindow sets the number of days for popularity metrics +func (s *SnowflakeMiner) PopularityWindow(days int) *SnowflakeMiner { + s.AdvancedConfig = true + s.Parameters = append(s.Parameters, structs.NameValuePair{Name: "popularity-window-days", Value: strconv.Itoa(days)}) + return s +} + +// NativeLineage enables or disables native lineage from Snowflake +func (s *SnowflakeMiner) NativeLineage(enabled bool) *SnowflakeMiner { + s.AdvancedConfig = true + s.Parameters = append(s.Parameters, structs.NameValuePair{Name: "native-lineage-active", Value: fmt.Sprintf("%t", enabled)}) + return s +} + +// CustomConfig sets a custom configuration for the miner +func (s *SnowflakeMiner) CustomConfig(config map[string]interface{}) *SnowflakeMiner { + if len(config) > 0 { + configJSON, _ := json.Marshal(config) + s.Parameters = append(s.Parameters, structs.NameValuePair{Name: "control-config", Value: string(configJSON)}) + } + s.AdvancedConfig = true + return s +} + +// GetMetadata generates workflow metadata for Snowflake Miner +func (s *SnowflakeMiner) GetMetadata() *structs.WorkflowMetadata { + return &structs.WorkflowMetadata{ + Name: structs.StringPtr(fmt.Sprintf("%s-%d", s.PackagePrefix, s.Epoch)), + Namespace: structs.StringPtr("default"), + Labels: map[string]string{ + "orchestration.atlan.com/certified": "true", + "orchestration.atlan.com/source": s.Name, + "orchestration.atlan.com/sourceCategory": "warehouse", + "orchestration.atlan.com/type": "miner", + "orchestration.atlan.com/verified": "true", + "package.argoproj.io/installer": "argopm", + "package.argoproj.io/name": fmt.Sprintf("%s-miner", s.Name), + "package.argoproj.io/registry": "httpsc-o-l-o-ns-l-a-s-hs-l-a-s-hpackages.atlan.com", + "orchestration.atlan.com/atlan-ui": "true", + }, + Annotations: map[string]string{ + "orchestration.atlan.com/allowSchedule": "true", + "orchestration.atlan.com/categories": "warehouse,miner", + "orchestration.atlan.com/docsUrl": "https://ask.atlan.com/hc/en-us/articles/6482067592337", + "orchestration.atlan.com/emoji": "\\uD83D\\uDE80", + "orchestration.atlan.com/icon": s.PackageIcon, + "orchestration.atlan.com/logo": s.PackageLogo, + "orchestration.atlan.com/marketplaceLink": fmt.Sprintf("https://packages.atlan.com/-/web/detail/%s", s.PackageName), + "orchestration.atlan.com/name": "Snowflake Miner", + "package.argoproj.io/author": "Atlan", + "package.argoproj.io/description": "Package to mine query history data from Snowflake and store it for further processing. The data mined will be used for generating lineage and usage metrics.", + "package.argoproj.io/homepage": fmt.Sprintf("https://packages.atlan.com/-/web/detail/%s", s.PackageName), + "package.argoproj.io/keywords": `["snowflake","warehouse","connector","miner"]`, + "package.argoproj.io/name": s.PackageName, + "package.argoproj.io/registry": "https://packages.atlan.com", + "package.argoproj.io/repository": "https://github.com/atlanhq/marketplace-packages.git", + "package.argoproj.io/support": "support@atlan.com", + "orchestration.atlan.com/atlanName": fmt.Sprintf("%s-%d", s.PackagePrefix, s.Epoch), + }, + } +} + +// ToWorkflow generates a workflow from the miner configuration +func (s *SnowflakeMiner) ToWorkflow() *structs.Workflow { + workflow := s.AbstractPackage.ToWorkflow() + workflow.Metadata = s.GetMetadata() // Override metadata + return workflow +} From 14bd64fd59c756fc0f0a55def0340dc8d108dedf Mon Sep 17 00:00:00 2001 From: Karanjot Singh Date: Mon, 17 Feb 2025 23:30:07 +0530 Subject: [PATCH 11/21] chore: Include main.go for example related to running a snowflake minor Signed-off-by: Karanjot Singh --- main.go | 26 +++++++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/main.go b/main.go index eced7d2..bec3a3e 100644 --- a/main.go +++ b/main.go @@ -1,11 +1,35 @@ package main -import "github.com/atlanhq/atlan-go/atlan/assets" +import ( + "github.com/atlanhq/atlan-go/atlan/assets" +) func main() { ctx := assets.NewContext() ctx.EnableLogging("debug") + /* + // Running Snowflake Miner + miner := assets.NewSnowflakeMiner("default/snowflake/1739484068"). + Direct(1739491200, "snowflake-database", "ACCOUNT_USAGE"). + ExcludeUsers([]string{"karanjot.singh"}). + PopularityWindow(30). + NativeLineage(true). + CustomConfig(map[string]interface{}{ + "test": true, + "feature": 1234, + }). + ToWorkflow() + + // Run the workflow + response, err := ctx.WorkflowClient.Run(miner, nil) + if err != nil { + fmt.Println("Error running workflow:", err) + return + } + + fmt.Println("Workflow started successfully:", response) + */ /* // Update a workflow result, _ := ctx.WorkflowClient.FindByID("csa-admin-export-1739443119") From dda2b3e2e1e3e52ca42100b15007fdf7530a068e Mon Sep 17 00:00:00 2001 From: Karanjot Singh Date: Mon, 17 Feb 2025 23:31:25 +0530 Subject: [PATCH 12/21] chore: lint Signed-off-by: Karanjot Singh --- atlan/assets/abstract_package.go | 6 +++--- atlan/assets/snowflake_miner.go | 5 +++-- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/atlan/assets/abstract_package.go b/atlan/assets/abstract_package.go index 83566ba..fa01c57 100644 --- a/atlan/assets/abstract_package.go +++ b/atlan/assets/abstract_package.go @@ -1,11 +1,11 @@ package assets import ( - "github.com/atlanhq/atlan-go/atlan/model/structs" + "encoding/json" "time" -) -import "encoding/json" + "github.com/atlanhq/atlan-go/atlan/model/structs" +) // AbstractPackage represents a base package type AbstractPackage struct { diff --git a/atlan/assets/snowflake_miner.go b/atlan/assets/snowflake_miner.go index d1b7c8c..2f5088a 100644 --- a/atlan/assets/snowflake_miner.go +++ b/atlan/assets/snowflake_miner.go @@ -3,9 +3,10 @@ package assets import ( "encoding/json" "fmt" - "github.com/atlanhq/atlan-go/atlan" "strconv" + "github.com/atlanhq/atlan-go/atlan" + "github.com/atlanhq/atlan-go/atlan/model/structs" ) @@ -111,7 +112,7 @@ func (s *SnowflakeMiner) GetMetadata() *structs.WorkflowMetadata { "orchestration.atlan.com/marketplaceLink": fmt.Sprintf("https://packages.atlan.com/-/web/detail/%s", s.PackageName), "orchestration.atlan.com/name": "Snowflake Miner", "package.argoproj.io/author": "Atlan", - "package.argoproj.io/description": "Package to mine query history data from Snowflake and store it for further processing. The data mined will be used for generating lineage and usage metrics.", + "package.argoproj.io/description": "Package to mine query history data from Snowflake and store it for further processing. The data mined will be used for generating lineage and usage metrics.", //nolint "package.argoproj.io/homepage": fmt.Sprintf("https://packages.atlan.com/-/web/detail/%s", s.PackageName), "package.argoproj.io/keywords": `["snowflake","warehouse","connector","miner"]`, "package.argoproj.io/name": s.PackageName, From b38ae6ce03a082d29d69675e8a58adab7eaf1ca1 Mon Sep 17 00:00:00 2001 From: Karanjot Singh Date: Mon, 17 Feb 2025 23:44:37 +0530 Subject: [PATCH 13/21] feat: Add model and fields for workflowRun as an Asset (used in Searching) Signed-off-by: Karanjot Singh --- atlan/assets/asset.go | 66 +++++++++++++++++++++++++++++++++ atlan/model/structs/workflow.go | 21 ++++++++++- 2 files changed, 86 insertions(+), 1 deletion(-) diff --git a/atlan/assets/asset.go b/atlan/assets/asset.go index ddfc240..8e50a42 100644 --- a/atlan/assets/asset.go +++ b/atlan/assets/asset.go @@ -328,6 +328,21 @@ type WorkflowFields struct { WORKFLOW_DELETED_AT *NumericField } +// WorkflowRunFields represents the fields for a workflow run. +type WorkflowRunFields struct { + AssetFields + WORKFLOW_RUN_WORKFLOW_GUID *KeywordField + WORKFLOW_RUN_TYPE *KeywordField + WORKFLOW_RUN_ON_ASSET_GUID *KeywordField + WORKFLOW_RUN_COMMENT *TextField + WORKFLOW_RUN_CONFIG *TextField + WORKFLOW_RUN_STATUS *KeywordField + WORKFLOW_RUN_EXPIRES_AT *NumericField + WORKFLOW_RUN_CREATED_BY *KeywordField + WORKFLOW_RUN_UPDATED_BY *KeywordField + WORKFLOW_RUN_DELETED_AT *NumericField +} + // NewSearchTable returns a new AtlasTable object for Searching func NewSearchTable() *AtlasTableFields { return &AtlasTableFields{ @@ -1083,6 +1098,57 @@ func NewWorkflowFields() *WorkflowFields { } } +// NewWorkflowRunFields initializes and returns a WorkflowRunFields struct. +func NewWorkflowRunFields() *WorkflowRunFields { + return &WorkflowRunFields{ + AssetFields: AssetFields{ + AttributesFields: AttributesFields{ + TYPENAME: NewKeywordTextField("typeName", "__typeName.keyword", "__typeName"), + GUID: NewKeywordField("guid", "__guid"), + CREATED_BY: NewKeywordField("createdBy", "__createdBy"), + UPDATED_BY: NewKeywordField("updatedBy", "__modifiedBy"), + STATUS: NewKeywordField("status", "__state"), + ATLAN_TAGS: NewKeywordTextField("classificationNames", "__traitNames", "__classificationsText"), + PROPOGATED_ATLAN_TAGS: NewKeywordTextField("classificationNames", "__propagatedTraitNames", "__classificationsText"), + ASSIGNED_TERMS: NewKeywordTextField("meanings", "__meanings", "__meaningsText"), + SUPERTYPE_NAMES: NewKeywordTextField("typeName", "__superTypeNames.keyword", "__superTypeNames"), + CREATE_TIME: NewNumericField("createTime", "__timestamp"), + UPDATE_TIME: NewNumericField("updateTime", "__modificationTimestamp"), + QUALIFIED_NAME: NewKeywordTextField("qualifiedName", "qualifiedName", "qualifiedName.text"), + }, + NAME: NewKeywordTextStemmedField("name", "name.keyword", "name", "name"), + DISPLAY_NAME: NewKeywordTextField("displayName", "displayName.keyword", "displayName"), + DESCRIPTION: NewKeywordTextField("description", "description", "description.text"), + USER_DESCRIPTION: NewKeywordTextField("userDescription", "userDescription", "userDescription.text"), + TENET_ID: NewKeywordField("tenetId", "tenetId"), + CERTIFICATE_STATUS: NewKeywordTextField("certificateStatus", "certificateStatus", "certificateStatus.text"), + CERTIFICATE_STATUS_MESSAGE: NewKeywordField("certificateStatusMessage", "certificateStatusMessage"), + CERTIFICATE_UPDATED_BY: NewNumericField("certificateUpdatedBy", "certificateUpdatedBy"), + ANNOUNCEMENT_TITLE: NewKeywordField("announcementTitle", "announcementTitle"), + ANNOUNCEMENT_MESSAGE: NewKeywordTextField("announcementMessage", "announcementMessage", "announcementMessage.text"), + ANNOUNCEMENT_TYPE: NewKeywordField("announcementType", "announcementType"), + ANNOUNCEMENT_UPDATED_AT: NewNumericField("announcementUpdatedAt", "announcementUpdatedAt"), + ANNOUNCEMENT_UPDATED_BY: NewKeywordField("announcementUpdatedBy", "announcementUpdatedBy"), + OWNER_USERS: NewKeywordTextField("ownerUsers", "ownerUsers", "ownerUsers.text"), + ADMIN_USERS: NewKeywordField("adminUsers", "adminUsers"), + VIEWER_USERS: NewKeywordField("viewerUsers", "viewerUsers"), + VIEWER_GROUPS: NewKeywordField("viewerGroups", "viewerGroups"), + CONNECTOR_NAME: NewKeywordTextField("connectorName", "connectorName", "connectorName.text"), + CONNECTION_QUALIFIED_NAME: NewKeywordTextField("connectionQualifiedName", "connectionQualifiedName", "connectionQualifiedName.text"), + }, + WORKFLOW_RUN_WORKFLOW_GUID: NewKeywordField("workflowRunWorkflowGuid", "workflowRunWorkflowGuid"), + WORKFLOW_RUN_TYPE: NewKeywordField("workflowRunType", "workflowRunType"), + WORKFLOW_RUN_ON_ASSET_GUID: NewKeywordField("workflowRunOnAssetGuid", "workflowRunOnAssetGuid"), + WORKFLOW_RUN_COMMENT: NewTextField("workflowRunComment", "workflowRunComment"), + WORKFLOW_RUN_CONFIG: NewTextField("workflowRunConfig", "workflowRunConfig"), + WORKFLOW_RUN_STATUS: NewKeywordField("workflowRunStatus", "workflowRunStatus"), + WORKFLOW_RUN_EXPIRES_AT: NewNumericField("workflowRunExpiresAt", "workflowRunExpiresAt"), + WORKFLOW_RUN_CREATED_BY: NewKeywordField("workflowRunCreatedBy", "workflowRunCreatedBy"), + WORKFLOW_RUN_UPDATED_BY: NewKeywordField("workflowRunUpdatedBy", "workflowRunUpdatedBy"), + WORKFLOW_RUN_DELETED_AT: NewNumericField("workflowRunDeletedAt", "workflowRunDeletedAt"), + } +} + // Methods on assets // GetbyGuid retrieves an asset by guid diff --git a/atlan/model/structs/workflow.go b/atlan/model/structs/workflow.go index 3c662e6..5980c8d 100644 --- a/atlan/model/structs/workflow.go +++ b/atlan/model/structs/workflow.go @@ -86,7 +86,7 @@ type Workflow struct { // WorkflowAsset defines the Asset structure for a workflow. type WorkflowAsset struct { Asset - WorkflowAttributes *WorkflowAttributes `json:"workflowAttributes"` + WorkflowAttributes *WorkflowAttributes `json:"Attributes"` } // WorkflowAttributes captures attributes of a workflow. @@ -227,3 +227,22 @@ type WorkflowScheduleResponse struct { Status *WorkflowScheduleStatus `json:"status,omitempty"` WorkflowMetadata *WorkflowMetadata `json:"workflowMetadata,omitempty"` } + +// WorkflowRun defines a workflow run as an Asset. +type WorkflowRun struct { + Asset + WorkflowRunAttributes *WorkflowRunAttributes `json:"Attributes"` +} + +type WorkflowRunAttributes struct { + WorkflowRunGuid *string `json:"workflowRunGuid,omitempty"` + WorkflowRunType *string `json:"workflowRunType,omitempty"` + WorkflowRunOnAssetGuid *string `json:"workflowRunOnAssetGuid,omitempty"` + WorkflowRunComment *string `json:"workflowRunComment,omitempty"` + WorkflowRunConfig *string `json:"workflowRunConfig,omitempty"` + WorkflowRunStatus *string `json:"workflowRunStatus,omitempty"` + WorkflowRunExpiresAt *string `json:"workflowRunExpiresAt,omitempty"` + WorkflowRunCreatedBy *string `json:"workflowRunCreatedBy,omitempty"` + WorkflowRunUpdatedBy *string `json:"workflowRunUpdatedBy,omitempty"` + WorkflowRunDeletedAt *string `json:"workflowRunDeletedAt,omitempty"` +} From 11033b6010166426be7c8cc43c5fb9f950453d33 Mon Sep 17 00:00:00 2001 From: Karanjot Singh Date: Tue, 18 Feb 2025 02:35:09 +0530 Subject: [PATCH 14/21] enhancement: Add case for handling workflow as a type Signed-off-by: Karanjot Singh --- atlan/assets/workflow_client.go | 23 +++++++++++++++++++---- main.go | 26 ++++++++++++++++++++++++++ 2 files changed, 45 insertions(+), 4 deletions(-) diff --git a/atlan/assets/workflow_client.go b/atlan/assets/workflow_client.go index c060967..50f6774 100644 --- a/atlan/assets/workflow_client.go +++ b/atlan/assets/workflow_client.go @@ -266,12 +266,29 @@ func (w *WorkflowClient) handleWorkflowTypes(workflow interface{}) (*structs.Wor case *structs.WorkflowSearchResultDetail: return wf, nil + case *structs.Workflow: + return convertWorkflowToSearchResult(wf) default: return nil, fmt.Errorf("invalid workflow type provided") } } +// convertWorkflowToSearchResult converts a Workflow into WorkflowSearchResultDetail. +func convertWorkflowToSearchResult(wf *structs.Workflow) (*structs.WorkflowSearchResultDetail, error) { + if wf == nil { + return nil, fmt.Errorf("workflow is nil") + } + if wf.Metadata == nil || wf.Spec == nil { + return nil, fmt.Errorf("workflow metadata or spec is nil") + } + + return &structs.WorkflowSearchResultDetail{ + Metadata: *wf.Metadata, + Spec: *wf.Spec, + }, nil +} + // Rerun executes the workflow immediately if it has been run before. // If idempotent is true, it only reruns if not already running. func (w *WorkflowClient) Rerun(workflow interface{}, idempotent bool) (*structs.WorkflowRunResponse, error) { @@ -560,10 +577,8 @@ func (w *WorkflowClient) Run(workflow interface{}, schedule *structs.WorkflowSch } if schedule != nil { - _, err := w.AddSchedule(workflow, schedule) - if err != nil { - return nil, err - } + workflowToUpdate, _ := w.handleWorkflowTypes(workflow) + w.addSchedule(workflowToUpdate, schedule) } responseData, err := DefaultAtlanClient.CallAPI(&WORKFLOW_RUN, nil, workflow) diff --git a/main.go b/main.go index bec3a3e..b211b20 100644 --- a/main.go +++ b/main.go @@ -1,13 +1,39 @@ package main import ( + "fmt" + "github.com/atlanhq/atlan-go/atlan/assets" + "github.com/atlanhq/atlan-go/atlan/model/structs" ) func main() { ctx := assets.NewContext() ctx.EnableLogging("debug") + // Add a schedule directly on run + + miner := assets.NewSnowflakeMiner("default/snowflake/1739484068"). + Direct(1739491200, "snowflake-database", "ACCOUNT_USAGE"). + ExcludeUsers([]string{"karanjot.singh"}). + PopularityWindow(30). + NativeLineage(true). + CustomConfig(map[string]interface{}{ + "test": true, + "feature": 1234, + }). + ToWorkflow() + + Schedule := structs.WorkflowSchedule{CronSchedule: "45 5 * * *", Timezone: "Europe/Paris"} + + // Run the workflow + response, err := ctx.WorkflowClient.Run(miner, &Schedule) + if err != nil { + fmt.Println("Error running workflow:", err) + return + } + fmt.Println(response.Spec) + /* // Running Snowflake Miner miner := assets.NewSnowflakeMiner("default/snowflake/1739484068"). From 19d6051daf7adc44d8d0c69f8812f52e7125056d Mon Sep 17 00:00:00 2001 From: Karanjot Singh Date: Tue, 18 Feb 2025 02:47:15 +0530 Subject: [PATCH 15/21] fix: Fix api path of GetScheduledRun() Signed-off-by: Karanjot Singh --- atlan/assets/workflow_client.go | 2 +- main.go | 57 +++++++++++++++++++-------------- 2 files changed, 34 insertions(+), 25 deletions(-) diff --git a/atlan/assets/workflow_client.go b/atlan/assets/workflow_client.go index 50f6774..09ce937 100644 --- a/atlan/assets/workflow_client.go +++ b/atlan/assets/workflow_client.go @@ -470,7 +470,7 @@ func (w *WorkflowClient) GetAllScheduledRuns() ([]structs.WorkflowScheduleRespon func (w *WorkflowClient) GetScheduledRun(workflowName string) (*structs.WorkflowScheduleResponse, error) { api := &GET_SCHEDULE_RUN - api.Path = fmt.Sprintf("schedules/%s/cron", workflowName) + api.Path = fmt.Sprintf("runs/cron/%s-cron", workflowName) rawJSON, err := DefaultAtlanClient.CallAPI(api, nil, nil) if err != nil { diff --git a/main.go b/main.go index b211b20..ed61ca6 100644 --- a/main.go +++ b/main.go @@ -2,38 +2,47 @@ package main import ( "fmt" - "github.com/atlanhq/atlan-go/atlan/assets" - "github.com/atlanhq/atlan-go/atlan/model/structs" ) func main() { ctx := assets.NewContext() ctx.EnableLogging("debug") - // Add a schedule directly on run - - miner := assets.NewSnowflakeMiner("default/snowflake/1739484068"). - Direct(1739491200, "snowflake-database", "ACCOUNT_USAGE"). - ExcludeUsers([]string{"karanjot.singh"}). - PopularityWindow(30). - NativeLineage(true). - CustomConfig(map[string]interface{}{ - "test": true, - "feature": 1234, - }). - ToWorkflow() - - Schedule := structs.WorkflowSchedule{CronSchedule: "45 5 * * *", Timezone: "Europe/Paris"} - - // Run the workflow - response, err := ctx.WorkflowClient.Run(miner, &Schedule) - if err != nil { - fmt.Println("Error running workflow:", err) - return - } - fmt.Println(response.Spec) + // To retrieve an existing scheduled workflow run by its name: + response, _ := ctx.WorkflowClient.GetScheduledRun("atlan-snowflake-miner-1739824311") + fmt.Println(response) + /* + // add a schedule to an existing workflow run + existingWorkflow, _ := ctx.WorkflowClient.FindByType(atlan.WorkflowPackageSnowflakeMiner, 1) + Schedule := structs.WorkflowSchedule{CronSchedule: "25 5 * * *", Timezone: "Europe/Paris"} + response, _ := ctx.WorkflowClient.AddSchedule(existingWorkflow[0], &Schedule) + fmt.Println(response) + */ + /* + // Add a schedule directly on run + miner := assets.NewSnowflakeMiner("default/snowflake/1739484068"). + Direct(1739491200, "snowflake-database", "ACCOUNT_USAGE"). + ExcludeUsers([]string{"karanjot.singh"}). + PopularityWindow(30). + NativeLineage(true). + CustomConfig(map[string]interface{}{ + "test": true, + "feature": 1234, + }). + ToWorkflow() + + Schedule := structs.WorkflowSchedule{CronSchedule: "45 5 * * *", Timezone: "Europe/Paris"} + + // Run the workflow + response, err := ctx.WorkflowClient.Run(miner, &Schedule) + if err != nil { + fmt.Println("Error running workflow:", err) + return + } + fmt.Println(response.Spec) + */ /* // Running Snowflake Miner miner := assets.NewSnowflakeMiner("default/snowflake/1739484068"). From c8cdc797a519c19966e21ffffcc4c91c940c912d Mon Sep 17 00:00:00 2001 From: Karanjot Singh Date: Tue, 18 Feb 2025 02:54:51 +0530 Subject: [PATCH 16/21] fix: GetAllScheduledRuns() returns a JSON object ({}) instead of an array ([]) Signed-off-by: Karanjot Singh --- atlan/assets/workflow_client.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/atlan/assets/workflow_client.go b/atlan/assets/workflow_client.go index 09ce937..283f549 100644 --- a/atlan/assets/workflow_client.go +++ b/atlan/assets/workflow_client.go @@ -455,16 +455,18 @@ func (w *WorkflowClient) RemoveSchedule(workflow interface{}) (*structs.Workflow return &response, nil } -func (w *WorkflowClient) GetAllScheduledRuns() ([]structs.WorkflowScheduleResponse, error) { +func (w *WorkflowClient) GetAllScheduledRuns() (*structs.WorkflowScheduleResponse, error) { rawJSON, err := DefaultAtlanClient.CallAPI(&GET_ALL_SCHEDULE_RUNS, nil, nil) if err != nil { return nil, err } - var response []structs.WorkflowScheduleResponse + var response *structs.WorkflowScheduleResponse + if err := json.Unmarshal(rawJSON, &response); err != nil { return nil, err } + return response, nil } From c74e1b050b417883351415ede764945c75cf979e Mon Sep 17 00:00:00 2001 From: Karanjot Singh Date: Tue, 18 Feb 2025 02:58:02 +0530 Subject: [PATCH 17/21] chore: Include main.go for examples related to workflow schedules Signed-off-by: Karanjot Singh --- main.go | 24 ++++++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/main.go b/main.go index ed61ca6..b92c50d 100644 --- a/main.go +++ b/main.go @@ -1,7 +1,6 @@ package main import ( - "fmt" "github.com/atlanhq/atlan-go/atlan/assets" ) @@ -9,9 +8,26 @@ func main() { ctx := assets.NewContext() ctx.EnableLogging("debug") - // To retrieve an existing scheduled workflow run by its name: - response, _ := ctx.WorkflowClient.GetScheduledRun("atlan-snowflake-miner-1739824311") - fmt.Println(response) + /* + // Remove a schedule + existingWorkflow, _ := ctx.WorkflowClient.FindByType(atlan.WorkflowPackageSnowflakeMiner, 1) + response, _ := ctx.WorkflowClient.RemoveSchedule(existingWorkflow[0]) + fmt.Println(response) + */ + /* + // Get all scheduled runs + response, err := ctx.WorkflowClient.GetAllScheduledRuns() + if err != nil { + fmt.Println(err) + } + fmt.Println(*response.Metadata.ResourceVersion) + */ + /* + // To retrieve an existing scheduled workflow run by its name: + response, _ := ctx.WorkflowClient.GetScheduledRun("atlan-snowflake-miner-1739824311") + fmt.Println(response) + + */ /* // add a schedule to an existing workflow run existingWorkflow, _ := ctx.WorkflowClient.FindByType(atlan.WorkflowPackageSnowflakeMiner, 1) From 1a429efc066f09c0706d01c53f6109611d03e8f8 Mon Sep 17 00:00:00 2001 From: Karanjot Singh Date: Tue, 18 Feb 2025 03:26:47 +0530 Subject: [PATCH 18/21] fix: Add a check in findbyID() of workflows if the hit is 0 return an err with no workflows found Signed-off-by: Karanjot Singh --- atlan/assets/workflow_client.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/atlan/assets/workflow_client.go b/atlan/assets/workflow_client.go index 283f549..3015fd0 100644 --- a/atlan/assets/workflow_client.go +++ b/atlan/assets/workflow_client.go @@ -96,6 +96,11 @@ func (w *WorkflowClient) FindByID(id string) (*structs.WorkflowSearchResult, err if len(response.Hits.Hits) > 0 { return &response.Hits.Hits[0], nil } + + if len(response.Hits.Hits) == 0 { + return nil, errors.New("no workflow found") + } + return nil, nil } From 48cc83696607778b1b796261bb8d41d32a09f871 Mon Sep 17 00:00:00 2001 From: Karanjot Singh Date: Tue, 18 Feb 2025 03:55:58 +0530 Subject: [PATCH 19/21] chore: Add better comments to workflow client methods Signed-off-by: Karanjot Singh --- atlan/assets/workflow_client.go | 197 ++++++++++++++++++++++++++++++-- 1 file changed, 190 insertions(+), 7 deletions(-) diff --git a/atlan/assets/workflow_client.go b/atlan/assets/workflow_client.go index 3015fd0..d0521eb 100644 --- a/atlan/assets/workflow_client.go +++ b/atlan/assets/workflow_client.go @@ -23,6 +23,13 @@ type WorkflowClient struct { } // FindByType searches for workflows by their type prefix. +// Params: +// - prefix: The workflow package type (atlan.WorkflowPackage) to search for (for example atlan.WorkflowPackageSnowflakeMiner). +// - maxResults: The maximum number of workflows to return. +// +// Returns: +// - A slice of structs.WorkflowSearchResult containing the workflows found. +// - An error if any occurs during the request. func (w *WorkflowClient) FindByType(prefix atlan.WorkflowPackage, maxResults int) ([]structs.WorkflowSearchResult, error) { var query model.Query = &model.BoolQuery{ Filter: []model.Query{ @@ -65,6 +72,13 @@ func (w *WorkflowClient) FindByType(prefix atlan.WorkflowPackage, maxResults int } // FindByID searches for a workflow by its ID. +// Params: +// - id: The unique ID of the workflow to search for. +// (e.g: `atlan-snowflake-miner-1714638976-mzdza`) +// +// Returns: +// - A pointer to a structs.WorkflowSearchResult containing the workflow found, or nil if no workflow is found. +// - An error if any occurs during the request or unmarshalling. func (w *WorkflowClient) FindByID(id string) (*structs.WorkflowSearchResult, error) { var query model.Query = &model.BoolQuery{ Filter: []model.Query{ @@ -105,6 +119,13 @@ func (w *WorkflowClient) FindByID(id string) (*structs.WorkflowSearchResult, err } // FindRunByID searches for a workflow run by its ID. +// Params: +// - id: The unique ID of the workflow run to search for. +// (e.g: `atlan-snowflake-miner-1714638976-mzdza`) +// +// Returns: +// - A pointer to a structs.WorkflowSearchResult containing the workflow run found, or nil if no workflow run is found. +// - An error if any occurs during the request or unmarshalling. func (w *WorkflowClient) FindRunByID(id string) (*structs.WorkflowSearchResult, error) { var query model.Query = &model.BoolQuery{ Filter: []model.Query{ @@ -126,7 +147,15 @@ func (w *WorkflowClient) FindRunByID(id string) (*structs.WorkflowSearchResult, return nil, nil } -// findRuns retrieves existing workflow runs. +// findRuns retrieves existing workflow runs based on a given query. +// Params: +// - query: The query to filter the workflow runs. +// - from: The starting index to retrieve the workflow runs (default: `0`). +// - size: The number of workflow runs to retrieve (default: `100`). +// +// Returns: +// - A pointer to structs.WorkflowSearchResponse containing the retrieved workflow runs. +// - An error if any occurs during the request or unmarshalling. func (w *WorkflowClient) findRuns(query model.Query, from, size int) (*structs.WorkflowSearchResponse, error) { request := model.WorkflowSearchRequest{ Query: query, @@ -147,7 +176,13 @@ func (w *WorkflowClient) findRuns(query model.Query, from, size int) (*structs.W return &response, nil } -// FindLatestRun retrieves the latest run of a given workflow. +// FindLatestRun retrieves the latest run of a given workflow by its name. +// Params: +// - workflowName: The name of the workflow to search for. +// +// Returns: +// - A pointer to a structs.WorkflowSearchResult containing the latest workflow run found, or nil if no run is found. +// - An error if any occurs during the request or unmarshalling. func (w *WorkflowClient) FindLatestRun(workflowName string) (*structs.WorkflowSearchResult, error) { var query model.Query = &model.BoolQuery{ Filter: []model.Query{ @@ -172,7 +207,13 @@ func (w *WorkflowClient) FindLatestRun(workflowName string) (*structs.WorkflowSe return nil, nil } -// FindCurrentRun retrieves the most current, still-running workflow. +// FindCurrentRun retrieves the most current, still-running workflow for a given workflow name. +// Params: +// - workflowName: The name of the workflow to search for. +// +// Returns: +// - A pointer to a structs.WorkflowSearchResult containing the currently running workflow, or nil if no run is found or it's not running. +// - An error if any occurs during the request or unmarshalling. func (w *WorkflowClient) FindCurrentRun(workflowName string) (*structs.WorkflowSearchResult, error) { var query model.Query = &model.BoolQuery{ Filter: []model.Query{ @@ -200,6 +241,15 @@ func (w *WorkflowClient) FindCurrentRun(workflowName string) (*structs.WorkflowS } // GetRuns retrieves all workflow runs filtered by workflow name and phase. +// Params: +// - workflowName: The name of the workflow to filter by. +// - workflowPhase: The phase of the workflow to filter by. +// - from: The starting index to retrieve the workflow runs. +// - size: The number of workflow runs to retrieve. +// +// Returns: +// - A slice of structs.WorkflowSearchResult containing the workflow runs found. +// - An error if any occurs during the request or unmarshalling. func (w *WorkflowClient) GetRuns(workflowName string, workflowPhase atlan.AtlanWorkflowPhase, from, size int) ([]structs.WorkflowSearchResult, error) { var query model.Query = &model.BoolQuery{ Must: []model.Query{ @@ -227,7 +277,13 @@ func (w *WorkflowClient) GetRuns(workflowName string, workflowPhase atlan.AtlanW return response.Hits.Hits, nil } -// Stop stops a running workflow. +// Stop stops a running workflow by its run ID. +// Params: +// - workflowRunID: The unique ID of the workflow run to stop. +// +// Returns: +// - A pointer to structs.WorkflowRunResponse containing the result of the stop action. +// - An error if any occurs during the request or unmarshalling. func (w *WorkflowClient) Stop(workflowRunID string) (*structs.WorkflowRunResponse, error) { api := &STOP_WORKFLOW_RUN api.Path = fmt.Sprintf("runs/%s/stop", workflowRunID) @@ -245,7 +301,12 @@ func (w *WorkflowClient) Stop(workflowRunID string) (*structs.WorkflowRunRespons return &response, nil } -// Delete archives (deletes) the provided workflow. +// Delete archives (deletes) the provided workflow by its name. +// Params: +// - workflowName: The name of the workflow to archive. +// +// Returns: +// - An error if any occurs during the request or API call. func (w *WorkflowClient) Delete(workflowName string) error { api := &WORKFLOW_ARCHIVE api.Path = fmt.Sprintf("workflows/%s/archive", workflowName) @@ -254,6 +315,12 @@ func (w *WorkflowClient) Delete(workflowName string) error { } // handleWorkflowTypes determines the workflow details based on its type. +// Params: +// - workflow: The workflow to handle, which can be a WorkflowPackage, WorkflowSearchResult, WorkflowSearchResultDetail, or Workflow. +// +// Returns: +// - A pointer to structs.WorkflowSearchResultDetail containing the details of the workflow. +// - An error if the workflow type is invalid or any issue occurs. func (w *WorkflowClient) handleWorkflowTypes(workflow interface{}) (*structs.WorkflowSearchResultDetail, error) { switch wf := workflow.(type) { case atlan.WorkflowPackage: @@ -279,7 +346,13 @@ func (w *WorkflowClient) handleWorkflowTypes(workflow interface{}) (*structs.Wor } } -// convertWorkflowToSearchResult converts a Workflow into WorkflowSearchResultDetail. +// convertWorkflowToSearchResult converts a Workflow into a WorkflowSearchResultDetail. +// Params: +// - wf: The Workflow to convert. +// +// Returns: +// - A pointer to structs.WorkflowSearchResultDetail containing the converted details of the workflow. +// - An error if any issues occur while converting. func convertWorkflowToSearchResult(wf *structs.Workflow) (*structs.WorkflowSearchResultDetail, error) { if wf == nil { return nil, fmt.Errorf("workflow is nil") @@ -295,7 +368,13 @@ func convertWorkflowToSearchResult(wf *structs.Workflow) (*structs.WorkflowSearc } // Rerun executes the workflow immediately if it has been run before. -// If idempotent is true, it only reruns if not already running. +// Params: +// - workflow: The workflow to rerun, which can be a WorkflowPackage, WorkflowSearchResult, WorkflowSearchResultDetail, or Workflow. +// - idempotent: A boolean indicating whether the workflow should only be rerun if not already running. +// +// Returns: +// - A pointer to structs.WorkflowRunResponse containing the result of the rerun. +// - An error if any occurs during the rerun process. func (w *WorkflowClient) Rerun(workflow interface{}, idempotent bool) (*structs.WorkflowRunResponse, error) { detail, err := w.handleWorkflowTypes(workflow) if err != nil { @@ -337,6 +416,12 @@ func (w *WorkflowClient) Rerun(workflow interface{}, idempotent bool) (*structs. } // Update modifies the configuration of an existing workflow. +// Params: +// - workflow: The workflow to update, which is a pointer to structs.Workflow. +// +// Returns: +// - A pointer to structs.WorkflowResponse containing the updated workflow result. +// - An error if any occurs during the update process. func (w *WorkflowClient) Update(workflow *structs.Workflow) (*structs.WorkflowResponse, error) { api := &WORKFLOW_UPDATE api.Path = fmt.Sprintf("workflows/%s", *workflow.Metadata.Name) @@ -355,6 +440,13 @@ func (w *WorkflowClient) Update(workflow *structs.Workflow) (*structs.WorkflowRe } // UpdateOwner assigns a new owner to the workflow. +// Params: +// - workflowName: The name of the workflow to update. +// - username: The username of the new owner to assign. +// +// Returns: +// - A pointer to structs.WorkflowResponse containing the updated workflow result. +// - An error if any occurs during the ownership change. func (w *WorkflowClient) UpdateOwner(workflowName, username string) (*structs.WorkflowResponse, error) { api := &WORKFLOW_CHANGE_OWNER api.Path = fmt.Sprintf("workflows/%s/changeownership", workflowName) @@ -377,6 +469,13 @@ func (w *WorkflowClient) UpdateOwner(workflowName, username string) (*structs.Wo // Methods related to workflow schedules // Monitor the status of the workflow's run. +// Params: +// - workflowResponse: The response containing the workflow details to monitor. +// - logger: An optional logger for printing the workflow status during monitoring. +// +// Returns: +// - The current workflow phase (atlan.AtlanWorkflowPhase) indicating the status of the workflow run. +// - An error if any occurs during the monitoring process. func (w *WorkflowClient) Monitor(workflowResponse *structs.WorkflowResponse, logger *log.Logger) (*atlan.AtlanWorkflowPhase, error) { if workflowResponse.Metadata == nil || *workflowResponse.Metadata.Name == "" { if logger != nil { @@ -412,6 +511,19 @@ func (w *WorkflowClient) addSchedule(workflow *structs.WorkflowSearchResultDetai workflow.Metadata.Annotations[workflowRunTimezone] = schedule.Timezone } +// AddSchedule adds a schedule for an existing workflow run. +// +// This method attaches a cron schedule and timezone to the given workflow. +// +// Param: +// - workflow: The workflow object to schedule, can be of type WorkflowSearchResultDetail or similar. +// - schedule: A WorkflowSchedule object containing: +// - Cron schedule expression (e.g., `5 4 * * *`). +// - Timezone for the cron schedule (e.g., `Europe/Paris`). +// +// Returns: +// - A WorkflowResponse object containing the details of the scheduled workflow. +// - Error if any occurred during the process. func (w *WorkflowClient) AddSchedule(workflow interface{}, schedule *structs.WorkflowSchedule) (*structs.WorkflowResponse, error) { workflowToUpdate, err := w.handleWorkflowTypes(workflow) if err != nil { @@ -435,6 +547,16 @@ func (w *WorkflowClient) AddSchedule(workflow interface{}, schedule *structs.Wor return &response, nil } +// RemoveSchedule removes the schedule from an existing workflow run. +// +// This method removes the cron schedule and timezone annotation from the given workflow. +// +// Param: +// - workflow: The workflow object to remove the schedule from, can be of type WorkflowSearchResultDetail or similar. +// +// Returns: +// - A WorkflowResponse object with the updated workflow details. +// - Error if any occurred during the process. func (w *WorkflowClient) RemoveSchedule(workflow interface{}) (*structs.WorkflowResponse, error) { workflowToUpdate, err := w.handleWorkflowTypes(workflow) if err != nil { @@ -460,6 +582,13 @@ func (w *WorkflowClient) RemoveSchedule(workflow interface{}) (*structs.Workflow return &response, nil } +// GetAllScheduledRuns retrieves all scheduled runs for workflows. +// +// This method fetches the list of all scheduled workflow runs. +// +// Returns: +// - A WorkflowScheduleResponse containing the list of scheduled workflows. +// - Error if any occurred during the API call. func (w *WorkflowClient) GetAllScheduledRuns() (*structs.WorkflowScheduleResponse, error) { rawJSON, err := DefaultAtlanClient.CallAPI(&GET_ALL_SCHEDULE_RUNS, nil, nil) if err != nil { @@ -475,6 +604,16 @@ func (w *WorkflowClient) GetAllScheduledRuns() (*structs.WorkflowScheduleRespons return response, nil } +// GetScheduledRun retrieves an existing scheduled run for a workflow. +// +// This method fetches the scheduled workflow run for the given workflow name. +// +// Param: +// - workflowName: The name of the workflow (e.g., `atlan-snowflake-miner-1714638976`). +// +// Returns: +// - A WorkflowScheduleResponse containing the scheduled run details. +// - Error if any occurred during the API call. func (w *WorkflowClient) GetScheduledRun(workflowName string) (*structs.WorkflowScheduleResponse, error) { api := &GET_SCHEDULE_RUN api.Path = fmt.Sprintf("runs/cron/%s-cron", workflowName) @@ -491,6 +630,17 @@ func (w *WorkflowClient) GetScheduledRun(workflowName string) (*structs.Workflow return &response, nil } +// FindScheduleQuery searches for scheduled query workflows by their saved query identifier. +// +// This method retrieves scheduled workflows related to a specific saved query ID. +// +// Param: +// - savedQueryID: The identifier of the saved query. +// - maxResults: The maximum number of results to retrieve. Defaults to `10`. +// +// Returns: +// - A slice of WorkflowSearchResult containing the matching scheduled workflows. +// - Error if any occurred during the search process. func (w *WorkflowClient) FindScheduleQuery(savedQueryID string, maxResults int) ([]structs.WorkflowSearchResult, error) { if maxResults <= 0 { maxResults = 10 @@ -536,6 +686,16 @@ func (w *WorkflowClient) FindScheduleQuery(savedQueryID string, maxResults int) return nil, nil } +// ReRunScheduleQuery re-triggers a scheduled query workflow by its schedule query ID. +// +// This method re-runs a scheduled workflow using the given schedule query identifier. +// +// Param: +// - scheduleQueryID: The identifier of the schedule query workflow. +// +// Returns: +// - A WorkflowRunResponse containing details of the re-triggered workflow. +// - Error if any occurred during the re-run process. func (w *WorkflowClient) ReRunScheduleQuery(scheduleQueryID string) (*structs.WorkflowRunResponse, error) { request := structs.ReRunRequest{ Namespace: "default", @@ -554,6 +714,17 @@ func (w *WorkflowClient) ReRunScheduleQuery(scheduleQueryID string) (*structs.Wo return &response, nil } +// FindScheduleQueryBetween searches for scheduled query workflows within a specific date range. +// +// This method retrieves scheduled workflows that fall between the specified start and end dates. +// +// Param: +// - request: A ScheduleQueriesSearchRequest object containing the start and end dates in ISO 8601 format. +// - missed: If true, searches for missed scheduled workflows. +// +// Returns: +// - A slice of WorkflowRunResponse containing the matching scheduled workflows within the date range. +// - Error if any occurred during the search process. func (w *WorkflowClient) FindScheduleQueryBetween(request structs.ScheduleQueriesSearchRequest, missed bool) ([]structs.WorkflowRunResponse, error) { queryParams := map[string]string{ "startDate": request.StartDate, @@ -578,6 +749,18 @@ func (w *WorkflowClient) FindScheduleQueryBetween(request structs.ScheduleQuerie } // Run executes an Atlan workflow with an optional schedule. +// +// This method triggers the workflow and attaches a schedule if provided. +// +// Param: +// - workflow: The workflow object to execute, can be of any type (WorkflowResponse, WorkflowSearchResult, etc.). +// - schedule: A WorkflowSchedule object containing: +// - Cron schedule expression (e.g., `5 4 * * *`). +// - Timezone for the cron schedule (e.g., `Europe/Paris`). +// +// Returns: +// - A WorkflowResponse object containing the details of the executed workflow. +// - Error if any occurred during the execution process. func (w *WorkflowClient) Run(workflow interface{}, schedule *structs.WorkflowSchedule) (*structs.WorkflowResponse, error) { if workflow == nil { return nil, errors.New("workflow cannot be nil") From 17915bb5d2bdf232416ac1c100dd24a0d92467ed Mon Sep 17 00:00:00 2001 From: Karanjot Singh Date: Tue, 18 Feb 2025 03:59:06 +0530 Subject: [PATCH 20/21] chore: Add better comments to snowflake miner methods Signed-off-by: Karanjot Singh --- atlan/assets/snowflake_miner.go | 50 +++++++++++++++++++++++++++++---- 1 file changed, 44 insertions(+), 6 deletions(-) diff --git a/atlan/assets/snowflake_miner.go b/atlan/assets/snowflake_miner.go index 2f5088a..6236266 100644 --- a/atlan/assets/snowflake_miner.go +++ b/atlan/assets/snowflake_miner.go @@ -22,7 +22,13 @@ type SnowflakeMiner struct { PackageLogo string } -// NewSnowflakeMiner initializes a Snowflake miner +// NewSnowflakeMiner initializes a new Snowflake miner. +// +// Param: +// - connectionQualifiedName: the qualified name of the connection to use for the miner +// +// Returns: +// - SnowflakeMiner instance, initialized with the provided connection qualified name and default values func NewSnowflakeMiner(connectionQualifiedName string) *SnowflakeMiner { return &SnowflakeMiner{ AbstractMiner: NewAbstractMiner( @@ -40,7 +46,15 @@ func NewSnowflakeMiner(connectionQualifiedName string) *SnowflakeMiner { } } -// Direct sets up the miner to extract directly from Snowflake +// Direct sets up the miner to extract directly from Snowflake using the specified start epoch and database/schema. +// +// Param: +// - startEpoch: the epoch time from which to start mining +// - database: the database name to extract from (can be empty for the default database) +// - schema: the schema name to extract from (can be empty for the default schema) +// +// Returns: +// - SnowflakeMiner instance, set up for direct extraction from Snowflake func (s *SnowflakeMiner) Direct(startEpoch int64, database, schema string) *SnowflakeMiner { if database == "" && schema == "" { s.Parameters = append(s.Parameters, structs.NameValuePair{Name: "snowflake-database", Value: "default"}) @@ -55,28 +69,52 @@ func (s *SnowflakeMiner) Direct(startEpoch int64, database, schema string) *Snow return s } -// ExcludeUsers excludes certain users from usage metrics +// ExcludeUsers excludes certain users from being considered in the usage metrics calculation for assets (e.g., system users). +// +// Param: +// - users: a list of user names to exclude from the usage metrics +// +// Returns: +// - SnowflakeMiner instance, updated with the specified users to exclude func (s *SnowflakeMiner) ExcludeUsers(users []string) *SnowflakeMiner { userJSON, _ := json.Marshal(users) s.Parameters = append(s.Parameters, structs.NameValuePair{Name: "popularity-exclude-user-config", Value: string(userJSON)}) return s } -// PopularityWindow sets the number of days for popularity metrics +// PopularityWindow sets the number of days to consider for calculating popularity metrics for assets. +// +// Param: +// - days: number of days to use for the popularity window (default is 30) +// +// Returns: +// - SnowflakeMiner instance, updated with the popularity window configuration func (s *SnowflakeMiner) PopularityWindow(days int) *SnowflakeMiner { s.AdvancedConfig = true s.Parameters = append(s.Parameters, structs.NameValuePair{Name: "popularity-window-days", Value: strconv.Itoa(days)}) return s } -// NativeLineage enables or disables native lineage from Snowflake +// NativeLineage enables or disables the use of Snowflake's native lineage feature for tracking lineage information. +// +// Param: +// - enabled: if true, native lineage from Snowflake will be enabled +// +// Returns: +// - SnowflakeMiner instance, updated with the native lineage setting func (s *SnowflakeMiner) NativeLineage(enabled bool) *SnowflakeMiner { s.AdvancedConfig = true s.Parameters = append(s.Parameters, structs.NameValuePair{Name: "native-lineage-active", Value: fmt.Sprintf("%t", enabled)}) return s } -// CustomConfig sets a custom configuration for the miner +// CustomConfig sets a custom configuration JSON for the Snowflake miner, allowing experimental feature flags or custom settings. +// +// Param: +// - config: a map of custom configurations to be applied to the miner +// +// Returns: +// - SnowflakeMiner instance, updated with the custom configuration func (s *SnowflakeMiner) CustomConfig(config map[string]interface{}) *SnowflakeMiner { if len(config) > 0 { configJSON, _ := json.Marshal(config) From b3eebb918d9e348e664b06a347c25641bbe8dc0d Mon Sep 17 00:00:00 2001 From: Karanjot Singh Date: Tue, 18 Feb 2025 23:48:25 +0530 Subject: [PATCH 21/21] feat: Add support for running workflow through JSON Signed-off-by: Karanjot Singh --- atlan/assets/workflow_client.go | 23 ++++-- main.go | 136 ++++++++++++++++++++++++++++++++ 2 files changed, 152 insertions(+), 7 deletions(-) diff --git a/atlan/assets/workflow_client.go b/atlan/assets/workflow_client.go index d0521eb..92ef90b 100644 --- a/atlan/assets/workflow_client.go +++ b/atlan/assets/workflow_client.go @@ -752,11 +752,9 @@ func (w *WorkflowClient) FindScheduleQueryBetween(request structs.ScheduleQuerie // // This method triggers the workflow and attaches a schedule if provided. // -// Param: -// - workflow: The workflow object to execute, can be of any type (WorkflowResponse, WorkflowSearchResult, etc.). -// - schedule: A WorkflowSchedule object containing: -// - Cron schedule expression (e.g., `5 4 * * *`). -// - Timezone for the cron schedule (e.g., `Europe/Paris`). +// Params: +// - workflow: The workflow object to execute, can be a JSON string or an object (WorkflowResponse, WorkflowSearchResult, etc.). +// - schedule: A WorkflowSchedule object containing cron expression and timezone. // // Returns: // - A WorkflowResponse object containing the details of the executed workflow. @@ -766,12 +764,23 @@ func (w *WorkflowClient) Run(workflow interface{}, schedule *structs.WorkflowSch return nil, errors.New("workflow cannot be nil") } + var workflowPayload interface{} + + switch v := workflow.(type) { + case string: // If workflow is a JSON string, unmarshal it + if err := json.Unmarshal([]byte(v), &workflowPayload); err != nil { + return nil, fmt.Errorf("invalid JSON workflow: %w", err) + } + default: + workflowPayload = workflow + } + if schedule != nil { - workflowToUpdate, _ := w.handleWorkflowTypes(workflow) + workflowToUpdate, _ := w.handleWorkflowTypes(workflowPayload) w.addSchedule(workflowToUpdate, schedule) } - responseData, err := DefaultAtlanClient.CallAPI(&WORKFLOW_RUN, nil, workflow) + responseData, err := DefaultAtlanClient.CallAPI(&WORKFLOW_RUN, nil, workflowPayload) if err != nil { return nil, fmt.Errorf("error executing workflow: %w", err) } diff --git a/main.go b/main.go index b92c50d..74b329d 100644 --- a/main.go +++ b/main.go @@ -1,6 +1,8 @@ package main import ( + "fmt" + "github.com/atlanhq/atlan-go/atlan/assets" ) @@ -8,6 +10,140 @@ func main() { ctx := assets.NewContext() ctx.EnableLogging("debug") + workflowJSON := `{ + "metadata": { + "labels": { + "orchestration.atlan.com/certified": "true", + "orchestration.atlan.com/source": "snowflake", + "orchestration.atlan.com/sourceCategory": "warehouse", + "orchestration.atlan.com/type": "miner", + "orchestration.atlan.com/verified": "true", + "package.argoproj.io/installer": "argopm", + "package.argoproj.io/name": "a-t-ratlans-l-a-s-hsnowflake-miner", + "package.argoproj.io/parent": "", + "package.argoproj.io/registry": "local", + "package.argoproj.io/version": "1.1.70", + "orchestration.atlan.com/atlan-ui": "true" + }, + "annotations": { + "orchestration.atlan.com/allowSchedule": "true", + "orchestration.atlan.com/categories": "warehouse,miner", + "orchestration.atlan.com/dependentPackage": "", + "orchestration.atlan.com/docsUrl": "https://ask.atlan.com/hc/en-us/articles/6482067592337", + "orchestration.atlan.com/emoji": "🚀", + "orchestration.atlan.com/icon": "https://docs.snowflake.com/en/_images/logo-snowflake-sans-text.png", + "orchestration.atlan.com/logo": "https://1amiydhcmj36tz3733v94f15-wpengine.netdna-ssl.com/wp-content/themes/snowflake/assets/img/logo-blue.svg", + "orchestration.atlan.com/marketplaceLink": "https://packages.atlan.com/-/web/detail/@atlan/snowflake-miner", + "orchestration.atlan.com/name": "Snowflake Miner", + "package.argoproj.io/author": "Atlan", + "package.argoproj.io/description": "Package to mine query history data from Snowflake and store it for further processing. The data mined will be used for generating lineage and usage metrics.", + "package.argoproj.io/homepage": "https://packages.atlan.com/-/web/detail/@atlan/snowflake-miner", + "package.argoproj.io/keywords": "[\"snowflake\",\"warehouse\",\"connector\",\"miner\"]", + "package.argoproj.io/name": "@atlan/snowflake-miner", + "package.argoproj.io/parent": ".", + "package.argoproj.io/registry": "local", + "package.argoproj.io/repository": "https://github.com/atlanhq/marketplace-packages.git", + "package.argoproj.io/support": "support@atlan.com", + "orchestration.atlan.com/atlanName": "atlan-snowflake-miner-1739813867" + }, + "name": "atlan-snowflake-miner-1739813867", + "namespace": "default" + }, + "spec": { + "templates": [ + { + "name": "main", + "dag": { + "tasks": [ + { + "name": "run", + "arguments": { + "parameters": [ + { + "name": "connection-qualified-name", + "value": "default/snowflake/1739484068" + }, + { + "name": "extraction-method", + "value": "query_history" + }, + { + "name": "miner-start-time-epoch", + "value": "1739491200" + }, + { + "name": "snowflake-database", + "value": "default" + }, + { + "name": "database-name", + "value": "SNOWFLAKE" + }, + { + "name": "schema-name", + "value": "ACCOUNT_USAGE" + }, + { + "name": "sql-json-key", + "value": "QUERY_TEXT" + }, + { + "name": "catalog-json-key", + "value": "DATABASE_NAME" + }, + { + "name": "schema-json-key", + "value": "SCHEMA_NAME" + }, + { + "name": "session-json-key", + "value": "SESSION_ID" + }, + { + "name": "popularity-window-days", + "value": 30 + }, + { + "name": "calculate-popularity", + "value": "true" + }, + { + "name": "control-config-strategy", + "value": "default" + }, + { + "name": "native-lineage-active", + "value": false + } + ] + }, + "templateRef": { + "name": "atlan-snowflake-miner", + "template": "main", + "clusterScope": true + } + } + ] + } + } + ], + "entrypoint": "main", + "workflowMetadata": { + "annotations": { + "package.argoproj.io/name": "@atlan/snowflake-miner" + } + } + }, + "payload": [] + }` // Run the workflow + response, err := ctx.WorkflowClient.Run(workflowJSON, nil) + if err != nil { + fmt.Println("Error running workflow:", err) + return + } + + fmt.Println("Workflow started successfully:", response) + /* // Remove a schedule existingWorkflow, _ := ctx.WorkflowClient.FindByType(atlan.WorkflowPackageSnowflakeMiner, 1)