Skip to content

Commit

Permalink
Merge pull request #3 from conductor-sdk/factory-methods-and-sync
Browse files Browse the repository at this point in the history
  • Loading branch information
gardusig authored May 3, 2023
2 parents 210d586 + 37756b4 commit 6c87356
Show file tree
Hide file tree
Showing 14 changed files with 1,089 additions and 714 deletions.
225 changes: 27 additions & 198 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,216 +1,45 @@
# Netflix Conductor SDK - Clojure
# Netflix Conductor Clojure SDK

Software Development Kit for Netflix Conductor, written on and providing support for Clojure.
The `conductor-clojure` repository provides the client SDKs to build task workers in clojure

## [Get SDK](https://clojars.org/io.orkes/conductor-clojure)
Building the task workers in clojure mainly consists of the following steps:

## Quick Guide
1. Setup conductor-clojure package
2. [Create and run task workers](workers_sdk.md)
3. [Create workflows using code](workflow_sdk.md)
4. [Api Docs](docs/api/README.md)

### Setup Conductor Clojure Package

### Create connection options
* Get the package from clojars

```clojure
(def options {
:url "http://localhost:8080/api/" ;; Conductor Server Path
:app-key "THIS-IS-SOME-APP-KEY" ;; Optional if using Orkes Conductor
:app-secret "THIS-IS-SOME-APP-SECRET" ;; Optional if using Orkes Conductor
} )
```
### Creating a task using the above options

``` clojure
(ns some.namespace
(:require [io.orkes.metadata :as metadata])

;; Will Create a task. returns nil
(metadata/register-tasks options [{
:name "cool_clj_task"
:description "some description"
:ownerEmail "somemail@mail.com"
:retryCount 3
:timeoutSeconds 300
:responseTimeoutSeconds 180 }])
)
```

### Creating a Workflow that uses the above task

``` clojure
(ns some.namespace
(:require [io.orkes.metadata :as metadata])

;; Will Register a workflow that uses the above task returns nil
(metadata/register-workflow-def options {
:name "cool_clj_workflow"
:description "created programatically from clj"
:version 1
:tasks [ {
:name "cool_clj_task"
:taskReferenceName "cool_clj_task_ref"
:inputParameters {}
:type "SIMPLE"
} ]
:inputParameters []
:outputParameters {:message "${clj_prog_task_ref.output.:message}"}
:schemaVersion 2
:restartable true
:ownerEmail "owner@yahoo.com"
:timeoutSeconds 0
}))

:deps {org.clojure/clojure {:mvn/version "1.11.0"}
io.orkes/conductor-clojure {:mvn/version "0.3.0"}}
```
### Create and run a list of workers

``` clojure
;; Creates a worker and starts polling for work. will return an instance of Runner which can then be used to shutdown
(def shutdown-fn (runner-executor-for-workers
(list {
:name "cool_clj_task"
:execute (fn [someData]
[:completed {:message "Hi From Clj i was created programatically"}])
})
options ))

;; Shutdown the polling for the workers defined above
(shutdown-fn)

```
## Options
Options are a map with optional parameters.
```
(def options {
:url "http://localhost:8080/api/" ;; Api url (Optional will default to "http://localhost:8080")
:app-key "THIS-IS-SOME-APP-KEY" ;; Application Key (This is only relevant if you are using Orkes Conductor)
:app-secret "THIS-IS-SOME-APP-SECRET" ;; Application Secret (This is only relevant if you are using Orkes Conductor)
} )
```
## Configurations

### Authentication Settings (Optional)
Configure the authentication settings if your Conductor server requires authentication.
* keyId: Key for authentication.
* keySecret: Secret for the key.

## Metadata Namespace
Holds the functions to register workflows and tasks.
### Access Control Setup
See [Access Control](https://orkes.io/content/docs/getting-started/concepts/access-control) for more details on role-based access control with Conductor and generating API keys for your environment.

`(:require [conductor.metadata :as metadata])`

## Registering Tasks

Takes the options map and a list/vector of tasks to register. On success, it will return nil.
### Configure API Client options

```clojure
(metadata/register-tasks options [{
:name "cool_clj_task_b"
:description "some description"
:ownerEmail "mail@gmail.com"
:retryCount 3
:timeoutSeconds 300
:responseTimeoutSeconds 180 },
{
:name "cool_clj_task_z"
:description "some description"
:ownerEmail "mail@gmail.com"
:retryCount 3
:timeoutSeconds 300
:responseTimeoutSeconds 180 }
{
:name "cool_clj_task_x"
:description "some description"
:ownerEmail "mail@gmail.com"
:retryCount 3
:timeoutSeconds 300
:responseTimeoutSeconds 180 }
])
```

## Registering a Workspace​
```clojure
(metadata/register-workflow-def options {
:name "cool_clj_workflow_2"
:description "created programatically from clj"
:version 1
:tasks [ {
:name "cool_clj_task_b"
:taskReferenceName "cool_clj_task_ref"
:inputParameters {}
:type "SIMPLE"
},
{
:name "something",
:taskReferenceName "other"
:inputParameters {}
:type "FORK_JOIN"
:forkTasks [[
{
:name "cool_clj_task_z"
:taskReferenceName "cool_clj_task_z_ref"
:inputParameters {}
:type "SIMPLE"
}
]
[
{
:name "cool_clj_task_x"
:taskReferenceName "cool_clj_task_x_ref"
:inputParameters {}
:type "SIMPLE"
}
]
]
}
{
:name "join"
:type "JOIN"
:taskReferenceName "join_ref"
:joinOn [ "cool_clj_task_z", "cool_clj_task_x"]
}
]
:inputParameters []
:outputParameters {"message" "${clj_prog_task_ref.output.:message}"}
:schemaVersion 2
:restartable true
:ownerEmail "mail@yahoo.com"
:timeoutSeconds 0
:timeoutPolicy "ALERT_ONLY"
})
```


## TaskRunner Namespace​
The taskrunner namespace holds the function to start a workflow and run a worker.

`[io.orkes.taskrunner :as conductor]`

``` clojure
;; Creates a worker and starts polling for work. will return an instance of Runner which can then be used to shutdown
(def shutdown-fn (conductor/runner-executor-for-workers
options
(list {
:name "cool_clj_task"
:execute (fn [someData]
[:completed {:message "Hi From Clj i was created programatically"}])
})
))
;;; define options
{:app-key "some-key",
:app-secret "some-secret",
:url "http://localhost:8080/api/"}

;; Shutdown the polling for the workers defined above
(shutdown-fn)

```
The (runner-executor-for-workers) function will take a list of worker implementations, maps, and options and start polling for work. It will return a TaskRunnerConfigurer instance, which you can shut down by calling the .shutdown() java method.

## Utils
Treat conductor workflows as simple tree data structures.


`[io.orkes.utils :as ut]`

``` clojure
;; Rename every single task to fakeName. Wil transverse the whole tree and applies the transformation function.

(ut/map-wf-tasks #(assoc % :name "fakeName")
wf-fork-example)

;; Given a workflow wf-fork-example in this case will return a new workflow without the task with the taskReferenceName "cool_clj_task_ref"
(ut/filter-wf-tasks
#(not= (:taskReferenceName %) "cool_clj_task_ref")
wf-fork-example)

```
### Next: [Create and run task workers](workers_sdk.md)

# Netflix Conductor SDK - Clojure

Software Development Kit for Netflix Conductor, written on and providing support for Clojure.
6 changes: 3 additions & 3 deletions build.clj
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
[org.corfield.build :as bb]))

(def lib 'io.orkes/conductor-clojure)
(def version "0.2.0")
#_ ; alternatively, use MAJOR.MINOR.COMMITS:
(def version (format "1.0.%s" (b/git-count-revs nil)))
(def version "0.3.0")
#_; alternatively, use MAJOR.MINOR.COMMITS:
(def version (format "1.0.%s" (b/git-count-revs nil)))

(defn test "Run the tests." [opts]
(bb/run-tests opts))
Expand Down
8 changes: 5 additions & 3 deletions deps.edn
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{:paths ["src" "resources"]
:deps {org.clojure/clojure {:mvn/version "1.11.0"}
http-kit/http-kit {:mvn/version "2.6.0-alpha1"}
cheshire/cheshire {:mvn/version "5.10.2"}
http-kit/http-kit {:mvn/version "2.6.0"}
cheshire/cheshire {:mvn/version "5.11.0"}
org.clojure/tools.logging {:mvn/version "1.1.0"}
org.clojure/core.async {:mvn/version "1.5.648"}
;; ch.qos.logback/logback-classic {:mvn/version "1.2.5"}
Expand All @@ -14,4 +14,6 @@
:extra-deps {org.clojure/test.check {:mvn/version "1.1.1"}
com.netflix.conductor/conductor-java-sdk {:mvn/version "3.8.0"}
io.github.cognitect-labs/test-runner
{:git/tag "v0.5.0" :git/sha "48c3c67"}}}}}
{:git/tag "v0.5.0" :git/sha "48c3c67"}}
:main-opts ["-m" "cognitect.test-runner"]
:exec-fn cognitect.test-runner.api/test}}}
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xs
<modelVersion>4.0.0</modelVersion>
<groupId>io.orkes</groupId>
<artifactId>conductor-clojure</artifactId>
<version>0.2.0</version>
<version>0.3.0</version>
<name>io.orkes.conductor/conductor-clojure</name>
<description>A conductor SDK for clojure</description>
<url>https://github.com/conductor-sdk/conductor-clojure</url>
Expand Down
33 changes: 16 additions & 17 deletions src/io/orkes/metadata.clj
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
(def json-headers
{"Content-Type" "application/json", "Accept" "application/json"})


(defn meta-client [options] (generic-client options "metadata"))

(defn get-workflow-def-using-client
Expand All @@ -41,17 +40,20 @@
(meta-client)
(get-workflow-def-using-client))))

(defn register-workflow-def-using-client
(defn
register-workflow-def-using-client
"Takes a client and a workflow definition in edn, will register a worflow in conductor"
[client workflow]
(client "workflow" :method :post :body workflow))
([client workflow overwrite]
(client "workflow" :method :post :body workflow :query-params {"overwrite" overwrite}))
([client workflow] (register-workflow-def-using-client client workflow false)))

(defn register-workflow-def
"Takes a map of options, and an EDN defined workflow. Will register a workflow"
[options workflow]
(-> options
(meta-client)
(register-workflow-def-using-client workflow)))
([options workflow overwrite]
(-> options
(meta-client)
(register-workflow-def-using-client workflow overwrite)))
([options workflow] (register-workflow-def options workflow false)))

(defn update-workflows-def-using-client
"takes a client and a list of workflows definition in edn, will update all workflows in list"
Expand All @@ -65,7 +67,6 @@
(meta-client)
(update-workflows-def-using-client workflows)))


(defn unregister-workflow-def-using-client
"Takes a client a name and a version. will unregister workflow. returns nil on success"
[client name version]
Expand Down Expand Up @@ -93,7 +94,7 @@

(defn update-task-definition-with-client
[client task-definition]
(client "taskdefs" :method :put :body task-definition ) )
(client "taskdefs" :method :put :body task-definition))

(defn update-task-definition
"Takes a map of options, and a list of workflow definitions. will update every workflow on the list"
Expand All @@ -113,7 +114,6 @@
(-> (meta-client options)
(get-task-def-with-client task-def)))


(defn unregister-task-with-client
"Takes a client and a task-name. Unregisters the task. Returns nil"
[client task-ref]
Expand All @@ -131,7 +131,7 @@
{:app-key "1f8f740c-9117-4016-9cb8-c1d43ed75bb4",
:app-secret "zR0kkWGx17HDNhH2zlfu2IrGtATlmnyQS6FrHlDZXriSsW7M",
:url "http://localhost:8080/api/"})
(count (get-workflow-def options) )
(count (get-workflow-def options))
(def cool-b-task
{:name "cool_clj_task_n",
:description "some description",
Expand All @@ -147,7 +147,7 @@
:taskReferenceName "cool_clj_task_ref",
:inputParameters {},
:type "SIMPLE"}
{:name "something",
{:name "something_else",
:taskReferenceName "other",
:inputParameters {},
:type "FORK_JOIN",
Expand All @@ -168,16 +168,15 @@
:timeoutPolicy "ALERT_ONLY"})

(register-tasks options [cool-b-task])
(register-workflow-def options wf-sample)
(register-workflow-def options wf-sample true)
(update-workflows-def options [wf-sample])
(json/generate-string cool-b-task)
(spit "/tmp/testw.edn" (with-out-str (pr (get-workflow-def options "testing_loop_iterations" 1) ) ) )
(spit "/tmp/testw.edn" (with-out-str (pr (get-workflow-def options "testing_loop_iterations" 1))))
(def wf (get-workflow-def options "si" 1))
(:tasks wf)
(register-workflow-def options (assoc wf :version 29))
(unregister-workflow-def options "cool_clj_workflow_2" 1)
(def some-task (get-task-def options "cool_clj_task_b"))
(update-task-definition options
(assoc cool-b-task :ownerEmail "othermaila@mail.com"))
(unregister-task options "cool_clj_task_b")
)
(unregister-task options "cool_clj_task_b"))
Loading

0 comments on commit 6c87356

Please sign in to comment.