Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

APP-5188 : Manage Workflows and Workflows Schedules | Support for Snowflake Miner with Abstract Package #88

Merged
merged 21 commits into from
Feb 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
748a016
feat: Add models for workflow
0xquark Feb 13, 2025
8d5571b
feat: Add custom marshalling for WorkflowSearchRequest
0xquark Feb 13, 2025
cf84932
feat: Add WorkflowClient and Methods related to running, retrieving a…
0xquark Feb 13, 2025
214afd7
chore: Include main.go for examples related to using workflows
0xquark Feb 13, 2025
dadb266
chore: fix linting errors
0xquark Feb 13, 2025
70035ac
feat: Add workflow as an Asset(used in searching)
0xquark Feb 13, 2025
007d3e5
feat: Add methods related to workflow schedules such as Retrieving, A…
0xquark Feb 13, 2025
fec6dee
feat: add method for running a workflow with optional schedule
0xquark Feb 17, 2025
6fd67f9
feat: Implement abstract methods and structures for overriding by cus…
0xquark Feb 17, 2025
56b8c5c
feat: Implement snowflake miner
0xquark Feb 17, 2025
14bd64f
chore: Include main.go for example related to running a snowflake minor
0xquark Feb 17, 2025
dda2b3e
chore: lint
0xquark Feb 17, 2025
b38ae6c
feat: Add model and fields for workflowRun as an Asset (used in Searc…
0xquark Feb 17, 2025
11033b6
enhancement: Add case for handling workflow as a type
0xquark Feb 17, 2025
19d6051
fix: Fix api path of GetScheduledRun()
0xquark Feb 17, 2025
c8cdc79
fix: GetAllScheduledRuns() returns a JSON object ({}) instead of an a…
0xquark Feb 17, 2025
c74e1b0
chore: Include main.go for examples related to workflow schedules
0xquark Feb 17, 2025
1a429ef
fix: Add a check in findbyID() of workflows if the hit is 0 return an…
0xquark Feb 17, 2025
48cc836
chore: Add better comments to workflow client methods
0xquark Feb 17, 2025
17915bb
chore: Add better comments to snowflake miner methods
0xquark Feb 17, 2025
b3eebb9
feat: Add support for running workflow through JSON
0xquark Feb 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 98 additions & 0 deletions atlan/assets/abstract_package.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
package assets

import (
"encoding/json"
"time"

"github.com/atlanhq/atlan-go/atlan/model/structs"
)

// AbstractPackage represents a base package
type AbstractPackage struct {
Parameters []structs.NameValuePair
CredentialsBody map[string]interface{}
PackageName string
PackagePrefix string
}

// NewAbstractPackage initializes an abstract package
func NewAbstractPackage(packageName, packagePrefix string) *AbstractPackage {
return &AbstractPackage{
PackageName: packageName,
PackagePrefix: packagePrefix,
Parameters: []structs.NameValuePair{},
CredentialsBody: map[string]interface{}{},
}
}

func (p *AbstractPackage) ToWorkflow() *structs.Workflow {
metadata := p.GetMetadata()

spec := structs.WorkflowSpec{
Entrypoint: structs.StringPtr("main"),
Templates: []structs.WorkflowTemplate{
{
Name: "main",
DAG: structs.WorkflowDAG{
Tasks: []structs.WorkflowTask{
{
Name: "run",
Arguments: structs.WorkflowParameters{
Parameters: p.Parameters,
},
TemplateRef: structs.WorkflowTemplateRef{
Name: p.PackagePrefix,
Template: "main",
ClusterScope: true,
},
},
},
},
},
},
WorkflowMetadata: metadata,
}

var payload []structs.PackageParameter
if len(p.CredentialsBody) > 0 {
credJSON, _ := json.Marshal(p.CredentialsBody)
payload = append(payload, structs.PackageParameter{
Parameter: "credentialGuid",
Type: "credential",
Body: credJSON,
})
}

return &structs.Workflow{
Metadata: metadata,
Spec: &spec,
Payload: payload,
}
}

// GetMetadata should be implemented by subclasses
func (p *AbstractPackage) GetMetadata() *structs.WorkflowMetadata {
// Default (empty) metadata implementation, to be overridden by child structs
return &structs.WorkflowMetadata{}
}

// AbstractMiner represents a base miner package
type AbstractMiner struct {
*AbstractPackage
Epoch int64
}

// NewAbstractMiner initializes an abstract miner
func NewAbstractMiner(connectionQualifiedName, packageName, packagePrefix string) *AbstractMiner {
epoch := time.Now().Unix()
packageInstance := NewAbstractPackage(packageName, packagePrefix)
packageInstance.Parameters = append(packageInstance.Parameters, structs.NameValuePair{
Name: "connection-qualified-name",
Value: connectionQualifiedName,
})

return &AbstractMiner{
AbstractPackage: packageInstance,
Epoch: epoch,
}
}
127 changes: 127 additions & 0 deletions atlan/assets/asset.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,33 @@ type PurposeFields struct {
PURPOSE_CLASSIFICATIONS *KeywordField
}

type WorkflowFields struct {
AssetFields
WORKFLOW_TEMPLATE_GUID *KeywordField
WORKFLOW_TYPE *KeywordField
WORKFLOW_CONFIG *TextField
WORKFLOW_STATUS *KeywordField
WORKFLOW_RUN_EXPIRES_IN *TextField
WORKFLOW_CREATED_BY *KeywordField
WORKFLOW_UPDATED_BY *KeywordField
WORKFLOW_DELETED_AT *NumericField
}

// WorkflowRunFields represents the fields for a workflow run.
type WorkflowRunFields struct {
AssetFields
WORKFLOW_RUN_WORKFLOW_GUID *KeywordField
WORKFLOW_RUN_TYPE *KeywordField
WORKFLOW_RUN_ON_ASSET_GUID *KeywordField
WORKFLOW_RUN_COMMENT *TextField
WORKFLOW_RUN_CONFIG *TextField
WORKFLOW_RUN_STATUS *KeywordField
WORKFLOW_RUN_EXPIRES_AT *NumericField
WORKFLOW_RUN_CREATED_BY *KeywordField
WORKFLOW_RUN_UPDATED_BY *KeywordField
WORKFLOW_RUN_DELETED_AT *NumericField
}

// NewSearchTable returns a new AtlasTable object for Searching
func NewSearchTable() *AtlasTableFields {
return &AtlasTableFields{
Expand Down Expand Up @@ -1022,6 +1049,106 @@ func NewPurposeFields() *PurposeFields {
}
}

// NewWorkflowFields initializes and returns a WorkflowFields struct.
func NewWorkflowFields() *WorkflowFields {
return &WorkflowFields{
AssetFields: AssetFields{
AttributesFields: AttributesFields{
TYPENAME: NewKeywordTextField("typeName", "__typeName.keyword", "__typeName"),
GUID: NewKeywordField("guid", "__guid"),
CREATED_BY: NewKeywordField("createdBy", "__createdBy"),
UPDATED_BY: NewKeywordField("updatedBy", "__modifiedBy"),
STATUS: NewKeywordField("status", "__state"),
ATLAN_TAGS: NewKeywordTextField("classificationNames", "__traitNames", "__classificationsText"),
PROPOGATED_ATLAN_TAGS: NewKeywordTextField("classificationNames", "__propagatedTraitNames", "__classificationsText"),
ASSIGNED_TERMS: NewKeywordTextField("meanings", "__meanings", "__meaningsText"),
SUPERTYPE_NAMES: NewKeywordTextField("typeName", "__superTypeNames.keyword", "__superTypeNames"),
CREATE_TIME: NewNumericField("createTime", "__timestamp"),
UPDATE_TIME: NewNumericField("updateTime", "__modificationTimestamp"),
QUALIFIED_NAME: NewKeywordTextField("qualifiedName", "qualifiedName", "qualifiedName.text"),
},
NAME: NewKeywordTextStemmedField("name", "name.keyword", "name", "name"),
DISPLAY_NAME: NewKeywordTextField("displayName", "displayName.keyword", "displayName"),
DESCRIPTION: NewKeywordTextField("description", "description", "description.text"),
USER_DESCRIPTION: NewKeywordTextField("userDescription", "userDescription", "userDescription.text"),
TENET_ID: NewKeywordField("tenetId", "tenetId"),
CERTIFICATE_STATUS: NewKeywordTextField("certificateStatus", "certificateStatus", "certificateStatus.text"),
CERTIFICATE_STATUS_MESSAGE: NewKeywordField("certificateStatusMessage", "certificateStatusMessage"),
CERTIFICATE_UPDATED_BY: NewNumericField("certificateUpdatedBy", "certificateUpdatedBy"),
ANNOUNCEMENT_TITLE: NewKeywordField("announcementTitle", "announcementTitle"),
ANNOUNCEMENT_MESSAGE: NewKeywordTextField("announcementMessage", "announcementMessage", "announcementMessage.text"),
ANNOUNCEMENT_TYPE: NewKeywordField("announcementType", "announcementType"),
ANNOUNCEMENT_UPDATED_AT: NewNumericField("announcementUpdatedAt", "announcementUpdatedAt"),
ANNOUNCEMENT_UPDATED_BY: NewKeywordField("announcementUpdatedBy", "announcementUpdatedBy"),
OWNER_USERS: NewKeywordTextField("ownerUsers", "ownerUsers", "ownerUsers.text"),
ADMIN_USERS: NewKeywordField("adminUsers", "adminUsers"),
VIEWER_USERS: NewKeywordField("viewerUsers", "viewerUsers"),
VIEWER_GROUPS: NewKeywordField("viewerGroups", "viewerGroups"),
CONNECTOR_NAME: NewKeywordTextField("connectorName", "connectorName", "connectorName.text"),
CONNECTION_QUALIFIED_NAME: NewKeywordTextField("connectionQualifiedName", "connectionQualifiedName", "connectionQualifiedName.text"),
},
WORKFLOW_TEMPLATE_GUID: NewKeywordField("workflowTemplateGuid", "workflowTemplateGuid"),
WORKFLOW_TYPE: NewKeywordField("workflowType", "workflowType"),
WORKFLOW_CONFIG: NewTextField("workflowConfig", "workflowConfig"),
WORKFLOW_STATUS: NewKeywordField("workflowStatus", "workflowStatus"),
WORKFLOW_RUN_EXPIRES_IN: NewTextField("workflowRunExpiresIn", "workflowRunExpiresIn"),
WORKFLOW_CREATED_BY: NewKeywordField("workflowCreatedBy", "workflowCreatedBy"),
WORKFLOW_UPDATED_BY: NewKeywordField("workflowUpdatedBy", "workflowUpdatedBy"),
WORKFLOW_DELETED_AT: NewNumericField("workflowDeletedAt", "workflowDeletedAt"),
}
}

// NewWorkflowRunFields initializes and returns a WorkflowRunFields struct.
func NewWorkflowRunFields() *WorkflowRunFields {
return &WorkflowRunFields{
AssetFields: AssetFields{
AttributesFields: AttributesFields{
TYPENAME: NewKeywordTextField("typeName", "__typeName.keyword", "__typeName"),
GUID: NewKeywordField("guid", "__guid"),
CREATED_BY: NewKeywordField("createdBy", "__createdBy"),
UPDATED_BY: NewKeywordField("updatedBy", "__modifiedBy"),
STATUS: NewKeywordField("status", "__state"),
ATLAN_TAGS: NewKeywordTextField("classificationNames", "__traitNames", "__classificationsText"),
PROPOGATED_ATLAN_TAGS: NewKeywordTextField("classificationNames", "__propagatedTraitNames", "__classificationsText"),
ASSIGNED_TERMS: NewKeywordTextField("meanings", "__meanings", "__meaningsText"),
SUPERTYPE_NAMES: NewKeywordTextField("typeName", "__superTypeNames.keyword", "__superTypeNames"),
CREATE_TIME: NewNumericField("createTime", "__timestamp"),
UPDATE_TIME: NewNumericField("updateTime", "__modificationTimestamp"),
QUALIFIED_NAME: NewKeywordTextField("qualifiedName", "qualifiedName", "qualifiedName.text"),
},
NAME: NewKeywordTextStemmedField("name", "name.keyword", "name", "name"),
DISPLAY_NAME: NewKeywordTextField("displayName", "displayName.keyword", "displayName"),
DESCRIPTION: NewKeywordTextField("description", "description", "description.text"),
USER_DESCRIPTION: NewKeywordTextField("userDescription", "userDescription", "userDescription.text"),
TENET_ID: NewKeywordField("tenetId", "tenetId"),
CERTIFICATE_STATUS: NewKeywordTextField("certificateStatus", "certificateStatus", "certificateStatus.text"),
CERTIFICATE_STATUS_MESSAGE: NewKeywordField("certificateStatusMessage", "certificateStatusMessage"),
CERTIFICATE_UPDATED_BY: NewNumericField("certificateUpdatedBy", "certificateUpdatedBy"),
ANNOUNCEMENT_TITLE: NewKeywordField("announcementTitle", "announcementTitle"),
ANNOUNCEMENT_MESSAGE: NewKeywordTextField("announcementMessage", "announcementMessage", "announcementMessage.text"),
ANNOUNCEMENT_TYPE: NewKeywordField("announcementType", "announcementType"),
ANNOUNCEMENT_UPDATED_AT: NewNumericField("announcementUpdatedAt", "announcementUpdatedAt"),
ANNOUNCEMENT_UPDATED_BY: NewKeywordField("announcementUpdatedBy", "announcementUpdatedBy"),
OWNER_USERS: NewKeywordTextField("ownerUsers", "ownerUsers", "ownerUsers.text"),
ADMIN_USERS: NewKeywordField("adminUsers", "adminUsers"),
VIEWER_USERS: NewKeywordField("viewerUsers", "viewerUsers"),
VIEWER_GROUPS: NewKeywordField("viewerGroups", "viewerGroups"),
CONNECTOR_NAME: NewKeywordTextField("connectorName", "connectorName", "connectorName.text"),
CONNECTION_QUALIFIED_NAME: NewKeywordTextField("connectionQualifiedName", "connectionQualifiedName", "connectionQualifiedName.text"),
},
WORKFLOW_RUN_WORKFLOW_GUID: NewKeywordField("workflowRunWorkflowGuid", "workflowRunWorkflowGuid"),
WORKFLOW_RUN_TYPE: NewKeywordField("workflowRunType", "workflowRunType"),
WORKFLOW_RUN_ON_ASSET_GUID: NewKeywordField("workflowRunOnAssetGuid", "workflowRunOnAssetGuid"),
WORKFLOW_RUN_COMMENT: NewTextField("workflowRunComment", "workflowRunComment"),
WORKFLOW_RUN_CONFIG: NewTextField("workflowRunConfig", "workflowRunConfig"),
WORKFLOW_RUN_STATUS: NewKeywordField("workflowRunStatus", "workflowRunStatus"),
WORKFLOW_RUN_EXPIRES_AT: NewNumericField("workflowRunExpiresAt", "workflowRunExpiresAt"),
WORKFLOW_RUN_CREATED_BY: NewKeywordField("workflowRunCreatedBy", "workflowRunCreatedBy"),
WORKFLOW_RUN_UPDATED_BY: NewKeywordField("workflowRunUpdatedBy", "workflowRunUpdatedBy"),
WORKFLOW_RUN_DELETED_AT: NewNumericField("workflowRunDeletedAt", "workflowRunDeletedAt"),
}
}

// Methods on assets

// GetbyGuid retrieves an asset by guid
Expand Down
19 changes: 10 additions & 9 deletions atlan/assets/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,16 @@ import (

// AtlanClient defines the Atlan API client structure.
type AtlanClient struct {
Session *http.Client
host string
ApiKey string
requestParams map[string]interface{}
logger logger.Logger
RoleClient *RoleClient
GroupClient *GroupClient
UserClient *UserClient
TokenClient *TokenClient
Session *http.Client
host string
ApiKey string
requestParams map[string]interface{}
logger logger.Logger
RoleClient *RoleClient
GroupClient *GroupClient
UserClient *UserClient
TokenClient *TokenClient
WorkflowClient *WorkflowClient
SearchAssets
}

Expand Down
108 changes: 108 additions & 0 deletions atlan/assets/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,17 @@ const (

// Tokens API
TOKENS_API = "apikeys"

// Workflows API
WORKFLOW_API = "workflows"
WORKFLOW_INDEX_API = "workflows/indexsearch"
WORKFLOW_INDEX_RUN_API = "runs/indexsearch"
SCHEDULE_QUERY_WORKFLOWS_SEARCH_API = "runs/cron/scheduleQueriesBetweenDuration"
SCHEDULE_QUERY_WORKFLOWS_MISSED_API = "runs/cron/missedScheduleQueriesBetweenDuration"
WORKFLOW_OWNER_RERUN_API = "workflows/triggerAsOwner"
WORKFLOW_RERUN_API = "workflows/submit"
WORKFLOW_RUN_API = "workflows?submit=true"
WORKFLOW_SCHEDULE_RUN = "runs"
)

// API defines the structure of an API call.
Expand Down Expand Up @@ -319,6 +330,103 @@ var (
Status: http.StatusOK,
Endpoint: HeraclesEndpoint,
}

// Workflows

SCHEDULE_QUERY_WORKFLOWS_SEARCH = API{
Path: SCHEDULE_QUERY_WORKFLOWS_SEARCH_API,
Method: http.MethodGet,
Status: http.StatusOK,
Endpoint: HeraclesEndpoint,
}

SCHEDULE_QUERY_WORKFLOWS_MISSED = API{
Path: SCHEDULE_QUERY_WORKFLOWS_MISSED_API,
Method: http.MethodGet,
Status: http.StatusOK,
Endpoint: HeraclesEndpoint,
}

WORKFLOW_INDEX_SEARCH = API{
Path: WORKFLOW_INDEX_API,
Method: http.MethodPost,
Status: http.StatusOK,
Endpoint: HeraclesEndpoint,
}

WORKFLOW_INDEX_RUN_SEARCH = API{
Path: WORKFLOW_INDEX_RUN_API,
Method: http.MethodPost,
Status: http.StatusOK,
Endpoint: HeraclesEndpoint,
}

// triggers a workflow using the current user's credentials

WORKFLOW_RERUN = API{
Path: WORKFLOW_RUN_API,
Method: http.MethodPost,
Status: http.StatusOK,
Endpoint: HeraclesEndpoint,
}

// triggers a workflow using the workflow owner's credentials

WORKFLOW_OWNER_RERUN = API{
Path: WORKFLOW_OWNER_RERUN_API,
Method: http.MethodPost,
Status: http.StatusOK,
Endpoint: HeraclesEndpoint,
}

WORKFLOW_UPDATE = API{
Path: WORKFLOW_API + "/%s",
Method: http.MethodPost,
Status: http.StatusOK,
Endpoint: HeraclesEndpoint,
}

WORKFLOW_ARCHIVE = API{
Path: WORKFLOW_API + "/%s/archive",
Method: http.MethodPost,
Status: http.StatusOK,
Endpoint: HeraclesEndpoint,
}

GET_ALL_SCHEDULE_RUNS = API{
Path: WORKFLOW_SCHEDULE_RUN + "/cron",
Method: http.MethodGet,
Status: http.StatusOK,
Endpoint: HeraclesEndpoint,
}

GET_SCHEDULE_RUN = API{
Path: WORKFLOW_SCHEDULE_RUN + "/cron/%s",
Method: http.MethodGet,
Status: http.StatusOK,
Endpoint: HeraclesEndpoint,
}

STOP_WORKFLOW_RUN = API{
Path: WORKFLOW_SCHEDULE_RUN + "/%s/stop",
Method: http.MethodPost,
Status: http.StatusOK,
Endpoint: HeraclesEndpoint,
}

WORKFLOW_CHANGE_OWNER = API{
Path: WORKFLOW_API + "/%s/changeownership",
Method: http.MethodPost,
Status: http.StatusOK,
Endpoint: HeraclesEndpoint,
}

WORKFLOW_RUN = API{
Path: WORKFLOW_RUN_API,
Method: http.MethodPost,
Status: http.StatusOK,
Endpoint: HeraclesEndpoint,
}
)

// Constants for the Atlas search DSL
Expand Down
Loading
Loading