diff --git a/.circleci/config.yml b/.circleci/config.yml index a7ec2eb..c201289 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -7,14 +7,17 @@ version: 2.1 jobs: build: docker: - - image: cimg/clojure:1.11.1 + - image: cimg/openjdk:23.0.1 steps: - checkout - run: | - lein version - lein cljfmt check - lein cloverage - lein jar + curl -sL https://raw.githubusercontent.com/technomancy/leiningen/stable/bin/lein > ./lein + chmod +x ./lein + + ./lein version + ./lein cljfmt check + ./lein cloverage + ./lein jar workflows: build-workflow: diff --git a/.gitignore b/.gitignore index 1a4c6d1..7928361 100644 --- a/.gitignore +++ b/.gitignore @@ -12,4 +12,7 @@ pom.xml.asc .hgignore .hg/ .idea* -*.iml \ No newline at end of file +*.iml +.calva/ +.lsp/ +.clj-kondo/ diff --git a/README.md b/README.md index ced1bd9..e0c3b24 100644 --- a/README.md +++ b/README.md @@ -9,11 +9,9 @@ This Clojure SDK is a framework for authoring Workflows and Activities in Clojur ### Status -**Alpha** +**Stable** -This SDK is battle-tested and used in production but is undergoing active development and is subject to breaking changes (*). Some significant features (Versioning, Queries, and Child-Workflows, etc) are missing/incomplete. - -> (*) We will always bump at least the minor version when breaking changes are introduced and include a release note. +This SDK is feature complete with a stable API and used in production. Any future breaking changes will be managed by bumping at least the minor version and including a release note. ### Clojure SDK @@ -41,7 +39,7 @@ Pull requests are welcome. Please include a [DCO](https://en.wikipedia.org/wiki ## License -Copyright (C) 2022 Manetu, Inc. All Rights Reserved. +Copyright (C) Manetu, Inc. All Rights Reserved. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this material except in compliance with the License. diff --git a/dev-resources/user.clj b/dev-resources/user.clj index bbb524b..07daa60 100644 --- a/dev-resources/user.clj +++ b/dev-resources/user.clj @@ -1,4 +1,4 @@ -;; Copyright © 2022 Manetu, Inc. All rights reserved +;; Copyright © Manetu, Inc. All rights reserved (ns user (:require [clojure.tools.namespace.repl :refer [refresh]] diff --git a/dev-resources/utils.clj b/dev-resources/utils.clj new file mode 100644 index 0000000..fed4261 --- /dev/null +++ b/dev-resources/utils.clj @@ -0,0 +1,81 @@ +(ns utils + (:require [taoensso.timbre :as log] + [temporal.activity :as a :refer [defactivity]] + [temporal.client.core :as c] + [temporal.client.worker :as worker] + [temporal.workflow :as w :refer [defworkflow]]) + (:import [java.time Duration])) + +(def client (atom nil)) +(def current-worker-thread (atom nil)) + +(def default-client-options {:target "localhost:7233" + :namespace "default" + :enable-https false}) + +(def default-worker-options {:task-queue "default"}) + +(def default-workflow-options {:task-queue "default" + :workflow-execution-timeout (Duration/ofSeconds 30) + :retry-options {:maximum-attempts 1} + :workflow-id "test-workflow"}) + +(defactivity greet-activity + [ctx {:keys [name] :as args}] + (log/info "greet-activity:" args) + (str "Hi, " name)) + +(defworkflow user-child-workflow + [{names :names :as args}] + (log/info "child-workflow:" names) + (for [name names] + @(a/invoke greet-activity {:name name}))) + +(defworkflow user-parent-workflow + [args] + (log/info "parent-workflow:" args) + @(w/invoke user-child-workflow args (merge default-workflow-options {:workflow-id "child-workflow"}))) + +(defn create-temporal-client + "Creates a new temporal client if the old one does not exist" + ([] (create-temporal-client nil)) + ([options] + (when-not @client + (let [options (merge default-client-options options)] + (log/info "creating temporal client" options) + (reset! client (c/create-client options)))))) + +(defn worker-loop + ([client] (worker-loop client nil)) + ([client options] + (let [options (merge default-worker-options options)] + (log/info "starting temporal worker" options) + (worker/start client options)))) + +(defn create-temporal-worker + "Starts a new instance running on another daemon thread, + stops the current temporal worker and thread if they exist" + ([client] (create-temporal-worker client nil)) + ([client options] + (when (and @current-worker-thread (.isAlive @current-worker-thread)) + (.interrupt @current-worker-thread) + (reset! current-worker-thread nil)) + (let [thread (Thread. (partial worker-loop client options))] + (doto thread + (.setDaemon true) + (.start)) + (reset! current-worker-thread thread)))) + +(defn execute-workflow + ([client workflow arguments] (execute-workflow client workflow arguments nil)) + ([client workflow arguments options] + (let [options (merge default-workflow-options options) + workflow (c/create-workflow client workflow options)] + (log/info "executing workflow" arguments) + (c/start workflow arguments) + @(c/get-result workflow)))) + +(comment + (do (create-temporal-client) + (create-temporal-worker @client) + (execute-workflow @client user-parent-workflow {:names ["Hanna" "Bob" "Tracy" "Felix"]}))) diff --git a/doc/child_workflows.md b/doc/child_workflows.md new file mode 100644 index 0000000..11228dc --- /dev/null +++ b/doc/child_workflows.md @@ -0,0 +1,63 @@ +# Child Workflows + +## What is a Child Workflow? + +A Child Workflow is a Workflow execution spawned from within a Workflow. + +Child Workflows orchestrate invocations of Activities just like Workflows do. + +Child Workflows should not be used for code organization, however they can be used to partition a Workflow execution's event history into smaller chunks which helps avoid the roughly *~50MB* Workflow event history limit, amongst other use cases. + +You should visit the [workflows](./workflows.md) page to learn more about Workflows, their constraints, and their executions in general. + +For more information about Child Workflows in general visit [Temporal Child Workflows](https://docs.temporal.io/encyclopedia/child-workflows) + +## Implementing Child Workflows + +In this Clojure SDK programming model, a Temporal Workflow is a function declared with [defworkflow](https://cljdoc.org/d/io.github.manetu/temporal-sdk/CURRENT/api/temporal.workflow#defworkflow) + +And a Child workflow is declared in the exact same way + +### Example + +```clojure +(require '[temporal.workflow :refer [defworkflow]]) + +(defworkflow my-workflow + [{:keys [foo]}] + ...) +``` +## Starting Child Workflow Executions + +In this Clojure SDK, Workflows start Child Workflows with [temporal.workflow/invoke](https://cljdoc.org/d/io.github.manetu/temporal-sdk/CURRENT/api/temporal.workflow#invoke) + +The options (`ChildWorkflowOptions`) provided to [temporal.workflow/invoke](https://cljdoc.org/d/io.github.manetu/temporal-sdk/CURRENT/api/temporal.workflow#invoke) are similar to the ones required to create a regular workflow with [temporal.client.core/create-workflow](https://cljdoc.org/d/io.github.manetu/temporal-sdk/CURRENT/api/temporal.client.core#create-workflow) + +One big difference however is Child Workflows can provide options to control what happens to themselves with their parents close/fail/complete. + +When a Parent Workflow Execution stops, the Temporal Cluster determines what will happen to any running child workflow executions based on the `:parent-close-policy` option. + +See [Temporal Parent Close Policy](https://docs.temporal.io/encyclopedia/child-workflows#parent-close-policy) for more information + +### Example + +```clojure +(require '[temporal.workflow :refer [defworkflow]]) +(require '[temporal.activity :refer [defactivity] :as a]) + +(defactivity child-greeter-activity + [ctx {:keys [name] :as args}] + (str "Hi, " name)) + +(defworkflow child-workflow + [{:keys [names] :as args}] + (for [name names] + @(a/invoke child-greeter-activity {:name name}))) + +(defworkflow parent-workflow + [args] + @(w/invoke child-workflow args {:retry-options {:maximum-attempts 1} + :workflow-task-timeout 10 + :workflow-execution-timeout 3600 + :workflow-run-timeout 3600})) +``` diff --git a/doc/clients.md b/doc/clients.md index 3cf0411..daed0c0 100644 --- a/doc/clients.md +++ b/doc/clients.md @@ -4,9 +4,13 @@ To initialize a Workflow Client, create an instance of a Workflow client with [temporal.client.core/create-client](https://cljdoc.org/d/io.github.manetu/temporal-sdk/CURRENT/api/temporal.client.core#create-client), create a Workflow stub with [temporal.client.core/create-workflow](https://cljdoc.org/d/io.github.manetu/temporal-sdk/CURRENT/api/temporal.client.core#create-workflow), and invoke the Workflow with [temporal.client.core/start](https://cljdoc.org/d/io.github.manetu/temporal-sdk/CURRENT/api/temporal.client.core#start)). Finally, gather the results of the Workflow with [temporal.client.core/get-result](https://cljdoc.org/d/io.github.manetu/temporal-sdk/CURRENT/api/temporal.client.core#get-result). +To initialize a Schedule Client, create an instance of a Schedule client with [temporal.client.schedule/create-client](https://cljdoc.org/d/io.github.manetu/temporal-sdk/CURRENT/api/temporal.client.schedule#create-client), create a Schedule with [temporal.client.schedule/schedule](https://cljdoc.org/d/io.github.manetu/temporal-sdk/CURRENT/api/temporal.client.schedule#schedule), and run a scheduled workflow execution immediately with [temporal.client.schedule/execute](https://cljdoc.org/d/io.github.manetu/temporal-sdk/CURRENT/api/temporal.client.schedule#execute)). + ## Details -To start a Workflow Execution, your Temporal Server must be running, and your front-end service must be accepting gRPC calls. +Using the namespace `temporal.client.core` or `temporal.client.schedule` + +To start a Workflow Execution or manage a Scheduled Workflow Execution, your Temporal Server must be running, and your front-end service must be accepting gRPC calls. You can provide options to (create-client) to establish a connection to the specifics of the Temporal front-end service in your environment. @@ -14,7 +18,9 @@ You can provide options to (create-client) to establish a connection to the spec (create-client {:target "temporal-frontend:7233" :namespace "my-namespace"}) ``` -After establishing a successful connection to the Temporal Frontend Service, you may perform operations such as: +### Workflow Executions + +After establishing a successful connection to the Temporal Frontend Service, you may perform Workflow Execution specific operations such as: - **Starting Workflows**: See [temporal.client.core/start](https://cljdoc.org/d/io.github.manetu/temporal-sdk/CURRENT/api/temporal.client.core#start) and [temporal.client.core/signal-with-start](https://cljdoc.org/d/io.github.manetu/temporal-sdk/CURRENT/api/temporal.client.core#signal-with-start) - **Signaling Workflows**: See [temporal.client.core/>!](https://cljdoc.org/d/io.github.manetu/temporal-sdk/CURRENT/api/temporal.client.core#%3E!) and [temporal.client.core/signal-with-start](https://cljdoc.org/d/io.github.manetu/temporal-sdk/CURRENT/api/temporal.client.core#signal-with-start) @@ -33,7 +39,7 @@ As a simple example, for the following Workflow implementation: (str "Hi, " name)) (defworkflow greeter-workflow - [ctx {:keys [args]}] + [args] (log/info "greeter-workflow:" args) @(a/invoke greet-activity args)) ``` @@ -52,3 +58,71 @@ We can create a client to invoke our Workflow as follows: ``` Evaluating this code should result in `Hi, Bob` appearing on the console. Note that (get-result) returns a promise, thus requiring a dereference. + +### Scheduled Workflow Executions + +After establishing a successful connection to the Temporal Frontend Service, you may perform Schedule specific operations such as: + +- **Creating Schedules**: See [temporal.client.schedule/schedule](https://cljdoc.org/d/io.github.manetu/temporal-sdk/CURRENT/api/temporal.client.schedule/schedule) +- **Describing Schedules**: See [temporal.client.schedule/describe](https://cljdoc.org/d/io.github.manetu/temporal-sdk/CURRENT/api/temporal.client.schedule/describe) +- **Pausing Schedules**: See [temporal.client.schedule/pause](https://cljdoc.org/d/io.github.manetu/temporal-sdk/CURRENT/api/temporal.client.schedule/pause) +- **Unpausing Schedules**: See [temporal.client.schedule/unpause](https://cljdoc.org/d/io.github.manetu/temporal-sdk/CURRENT/api/temporal.client.schedule/unpause) +- **Removing Schedules**: See [temporal.client.schedule/unschedule](https://cljdoc.org/d/io.github.manetu/temporal-sdk/CURRENT/api/temporal.client.schedule/unschedule) +- **Triggering Schedules**: See [temporal.client.schedule/execute](https://cljdoc.org/d/io.github.manetu/temporal-sdk/CURRENT/api/temporal.client.schedule/execute) +- **Updating Schedules**: See [temporal.client.schedule/reschedule](https://cljdoc.org/d/io.github.manetu/temporal-sdk/CURRENT/api/temporal.client.schedule/reschedule) + +Schedules can be built with cron expressions or built in Temporal time types. + +Schedules can executed immediately via "triggering" them. + +Schedules will handle overlapping runs determined by the `SchedulePolicy` you give it when creating it. + +Example Usage + +As a simple example, for the following Workflow implementation: + +```clojure +(require '[temporal.workflow :refer [defworkflow]]) +(require '[temporal.activity :refer [defactivity] :as a]) + +(defactivity greet-activity + [ctx {:keys [name] :as args}] + (log/info "greet-activity:" args) + (str "Hi, " name)) + +(defworkflow greeter-workflow + [args] + (log/info "greeter-workflow:" args) + @(a/invoke greet-activity args)) +``` + +Create and manage a schedule as follows + + +```clojure +(require '[temporal.client.schedule :as s]) +(let [client-options {:target "localhost:7233" + :namespace "default" + :enable-https false} + task-queue "MyTaskQueue" + workflow-id "my-workflow" + schedule-id "my-schedule" + client (s/create-client client-options)] + (s/schedule client schedule-id {:schedule {:trigger-immediately? true + :memo {"Created by" "John Doe"}} + :spec {:cron-expressions ["0 0 * * *"] + :timezone "US/Central"} + :policy {:pause-on-failure? false + :catchup-window (Duration/ofSeconds 10) + :overlap :skip} + :action {:arguments {:name "John"} + :options {:workflow-id workflow-id + :task-queue task-queue} + :workflow-type greeter-workflow}}) + (s/describe client schedule-id) + (s/pause client schedule-id) + (s/unpause client schedule-id) + (s/execute client schedule-id :skip) + (s/reschedule client schedule-id {:spec {:cron-expressions ["0 1 * * *"]}}) + (s/unschedule client schedule-id)) +``` diff --git a/doc/cljdoc.edn b/doc/cljdoc.edn index dfa13b0..957616f 100644 --- a/doc/cljdoc.edn +++ b/doc/cljdoc.edn @@ -1,6 +1,7 @@ {:cljdoc.doc/tree [["Readme" {:file "README.md"}] ["Workflows" {:file "doc/workflows.md"}] + ["Child Workflows" {:file "doc/child_workflows.md"}] ["Activities" {:file "doc/activities.md"}] ["Workers" {:file "doc/workers.md"}] ["Clients" {:file "doc/clients.md"}] diff --git a/doc/testing.md b/doc/testing.md index bedc94d..78b27d8 100644 --- a/doc/testing.md +++ b/doc/testing.md @@ -25,7 +25,7 @@ You can use the provided environment with a Clojure unit testing framework of yo (str "Hi, " name)) (defworkflow greeter-workflow - [ctx {:keys [args]}] + [args] (log/info "greeter-workflow:" args) @(a/invoke greet-activity args)) diff --git a/doc/workers.md b/doc/workers.md index 1de10b5..27ae142 100644 --- a/doc/workers.md +++ b/doc/workers.md @@ -25,7 +25,7 @@ As a simple example, let's say we want our Worker to be able to execute the foll (str "Hi, " name)) (defworkflow greeter-workflow - [ctx {:keys [args]}] + [args] (log/info "greeter-workflow:" args) @(a/invoke greet-activity args)) ``` diff --git a/doc/workflows.md b/doc/workflows.md index 424e00e..f009f62 100644 --- a/doc/workflows.md +++ b/doc/workflows.md @@ -10,7 +10,7 @@ In this Clojure SDK programming model, a Temporal Workflow is a function declare ```clojure (defworkflow my-workflow - [ctx params] + [params] ...) ``` @@ -24,48 +24,48 @@ A Workflow implementation consists of defining a (defworkflow) function. The pl (require '[temporal.workflow :refer [defworkflow]]) (defworkflow my-workflow - [ctx {{:keys [foo]} :args}] + [{:keys [foo]}] ...) ``` ### Workflow Implementation Constraints -Temporal uses the [Event Sourcing pattern](https://docs.microsoft.com/en-us/azure/architecture/patterns/event-sourcing) to recover the state of a Workflow object, including its threads and local variable values. In essence, the Workflow code is re-executed from the beginning whenever a Workflow state requires restoration. During replay, successfully executed Activities are not re-executed but return the result previously recorded in the Workflow event history. +Temporal uses the [Event Sourcing pattern](https://docs.microsoft.com/en-us/azure/architecture/patterns/event-sourcing) to recover the state of a Workflow object, including its threads and local variable values. The Workflow code is re-executed from the beginning whenever a Workflow state requires restoration. During replay, successfully executed Activities are not re-executed but return the result previously recorded in the Workflow event history. Even though Temporal has the replay capability, which brings resilience to your Workflows, you should never think about this capability when writing your Workflows. Instead, you should focus on implementing your business logic/requirements and write your Workflows as they would execute only once. There are some things, however, to think about when writing your Workflows, namely determinism and isolation. We summarize these constraints here: -- Do not use any mutable global variables such as atoms in your Workflow implementations. This will ensure that multiple Workflow instances are fully isolated. -- Do not call any non-deterministic functions like non-seeded random or uuid-generators directly from the Workflow code. (Coming soon: SideEffect API) +- Do not use mutable global variables such as atoms in your Workflow implementations. Avoiding globals will ensure that multiple Workflow instances are fully isolated. +- Do not call non-deterministic functions like non-seeded random or uuid generators directly from the Workflow code. Instead, use Side Effects. - Perform all IO operations and calls to third-party services on Activities and not Workflows, as they are usually non-deterministic. -- Do not use any programming language constructs that rely on system time. (Coming soon: API methods for time) -- Do not use threading primitives such as clojure.core.async/go or clojure.core.async/thread. (Coming soon: API methods for async function execution) +- Do not use any programming language constructs that rely on system time. All notions of time must come from Side Effects or Activities so that the results become part of the Event History. +- Do not use threading primitives such as clojure.core.async/go or clojure.core.async/thread. - Do not perform any operations that may block the underlying thread, such as clojure.core.async/ @state 1))) + @state)) +``` + +### Temporal Queries + +Your Workflow may respond to [queries](https://cljdoc.org/d/io.github.manetu/temporal-sdk/CURRENT/api/temporal.client.core#query). + +A temporal query is similar to a temporal signal; both are messages sent to a running Workflow. +The difference is that a signal intends to change the behaviour of the Workflow, whereas a query intends to inspect its current state. +Querying the state of a Workflow implies that the Workflow must maintain its state while running, typically in a Clojure [atom](https://clojuredocs.org/clojure.core/atom) or [ref](https://clojure.org/reference/refs). + +#### Registering a Query handler + +To enable querying a Workflow, you may use [temporal.workflow/register-query-handler!](https://cljdoc.org/d/io.github.manetu/temporal-sdk/CURRENT/api/temporal.workflow#register-query-handler!). +The query handler is a function that has a reference to the Workflow state, usually by closing over it. It interprets the query and returns a response. + +```clojure +(defworkflow stateful-workflow + [{:keys [init] :as args}] + (let [state (atom init)] + (register-query-handler! (fn [query-type args] + (when (= query-type :my-query) + (get-in @state [:path :to :answer])))) + ;; e.g. react to signals (perhaps in a loop), updating the state atom + )) +``` + +#### Querying a Workflow + +You may query a Workflow with [temporal.client.core/query](https://cljdoc.org/d/io.github.manetu/temporal-sdk/CURRENT/api/temporal.client.core#query). +A query consists of a `query-type` (keyword) and possibly some `args` (any serializable data structure). + +```clojure +(query workflow :my-query {:foo "bar"}) +``` + +## Exceptions + +This SDK integrates with the [slingshot](https://github.com/scgilardi/slingshot) library. Stones cast with Slingshot's throw+ are serialized and re-thrown across activity and workflow boundaries in a transparent manner that is compatible with Slingshot's `try+` based catch blocks. + +### Managing Retries +By default, stones cast that are not caught locally by an Activity or Workflow trigger ApplicationFailure semantics and are thus subject to the overall Retry Policies in place. However, the developer may force a given stone to be non-retriable by setting the flag '::non-retriable?' within the object. + +Example: + +```clojure +(require `[temporal.exceptions :as e]) +(require `[slingshot.slingshot :refer [throw+]]) + +(defactivity my-activity + [ctx args] + (throw+ {:type ::my-fatal-error :msg "this error is non-retriable" ::e/non-retriable? true})) +``` + +## Versioning + +The Temporal Platform requires that Workflow code be deterministic. Because of that requirement, this Clojure SDK exposes a workflow patching API [temporal.workflow/get-version](https://cljdoc.org/d/io.github.manetu/temporal-sdk/CURRENT/api/temporal.workflow#get-version). Workflow developers use the `get-version` function to determine the correct branch of logic to follow to maintain determinism. + +Example: + +Assume we have a workflow that invokes an activity using temporal.activity/invoke that we wish to convert to temporal.activity/local-invoke. Such a change is acceptable for any future instances of your Workflow. However, any existing instances must be careful as this logic change could introduce non-determinism during replay. + +We can safely handle both the original and the new desired scenario by branching based on the results from calling temporal.workflow/get-version: + +```clojure +(require `[temporal.workflow :as w]) +(require `[temporal.activity :as a]) + +(let [version (w/get-version ::local-activity w/default-version 1)] + (cond + (= version w/default-version) @(a/invoke versioned-activity :v1) + (= version 1) @(a/local-invoke versioned-activity :v2))) +``` diff --git a/project.clj b/project.clj index 1393ce4..5b08be1 100644 --- a/project.clj +++ b/project.clj @@ -1,9 +1,9 @@ -(defproject io.github.manetu/temporal-sdk "0.12.4-SNAPSHOT" +(defproject io.github.manetu/temporal-sdk "1.4.1-SNAPSHOT" :description "A Temporal SDK for Clojure" :url "https://github.com/manetu/temporal-clojure-sdk" :license {:name "Apache License 2.0" :url "https://www.apache.org/licenses/LICENSE-2.0" - :year 2022 + :year 2023 :key "apache-2.0"} :plugins [[lein-cljfmt "0.9.0"] [lein-kibit "0.1.8"] @@ -11,26 +11,30 @@ [lein-cloverage "1.2.4"] [jonase/eastwood "1.3.0"] [lein-codox "0.10.8"]] - :dependencies [[org.clojure/clojure "1.11.1"] - [org.clojure/core.async "1.6.673"] - [io.temporal/temporal-sdk "1.19.1"] - [io.temporal/temporal-testing "1.19.1"] - [com.taoensso/encore "3.59.0"] - [com.taoensso/timbre "6.1.0"] - [com.taoensso/nippy "3.2.0"] + :dependencies [[org.clojure/clojure "1.12.0"] + [org.clojure/core.async "1.7.701"] + [io.temporal/temporal-sdk "1.28.3"] + [io.temporal/temporal-testing "1.28.3"] + [com.taoensso/encore "3.139.0"] + [com.taoensso/timbre "6.6.1"] + [com.taoensso/nippy "3.4.2"] [funcool/promesa "9.2.542"] - [medley "1.4.0"]] + [medley "1.4.0"] + [slingshot "0.12.2"]] :repl-options {:init-ns user} - :java-source-paths ["src"] + :java-source-paths ["src" "resources"] :javac-options ["-target" "11" "-source" "11"] :eastwood {:add-linters [:unused-namespaces]} :codox {:metadata {:doc/format :markdown}} - :profiles {:dev {:dependencies [[org.clojure/tools.namespace "1.4.4"] - [eftest "0.6.0"]]}} + :profiles {:dev {:dependencies [[org.clojure/tools.namespace "1.5.0"] + [eftest "0.6.0"] + [mockery "0.1.4"] + [io.temporal/temporal-opentracing "1.28.3"]] + :resource-paths ["test/temporal/test/resources"]}} :cloverage {:runner :eftest :runner-opts {:multithread? false :fail-fast? true} - :fail-threshold 87 + :fail-threshold 91 :ns-exclude-regex [#"temporal.client.worker"]}) diff --git a/resources/clj-kondo.exports/io.github.manetu/temporal-sdk/config.edn b/resources/clj-kondo.exports/io.github.manetu/temporal-sdk/config.edn new file mode 100644 index 0000000..27f04bf --- /dev/null +++ b/resources/clj-kondo.exports/io.github.manetu/temporal-sdk/config.edn @@ -0,0 +1,2 @@ +{:lint-as {temporal.workflow/defworkflow clojure.core/defn + temporal.activity/defactivity clojure.core/defn}} diff --git a/src/temporal/activity.clj b/src/temporal/activity.clj index 1785cd1..0f4a164 100644 --- a/src/temporal/activity.clj +++ b/src/temporal/activity.clj @@ -5,6 +5,7 @@ (:require [taoensso.timbre :as log] [taoensso.nippy :as nippy] [promesa.core :as p] + [temporal.internal.exceptions :as e] [temporal.internal.activity :as a] [temporal.internal.utils :as u] [temporal.internal.promise]) ;; needed for IPromise protocol extention @@ -41,12 +42,10 @@ along with the Activity Task for the next retry attempt and can be extracted by (log/trace "get-heartbeat-details:" v) v))) -(defn- complete-invoke - [activity result] - (log/trace activity "completed with" (count result) "bytes") - (let [r (nippy/thaw result)] - (log/trace activity "results:" r) - r)) +(defn get-info + "Returns information about the Activity execution" + [] + (a/get-info)) (defn invoke " @@ -92,7 +91,8 @@ Arguments: stub (Workflow/newUntypedActivityStub (a/invoke-options-> options))] (log/trace "invoke:" activity "with" params options) (-> (.executeAsync stub act-name u/bytes-type (u/->objarray params)) - (p/then (partial complete-invoke activity)) + (p/then (partial u/complete-invoke activity)) + (p/catch e/slingshot? e/recast-stone) (p/catch (fn [e] (log/error e) (throw e))))))) @@ -126,13 +126,14 @@ Arguments: (local-invoke my-activity {:foo \"bar\"} {:start-to-close-timeout (Duration/ofSeconds 3)) ``` " - ([activity params] (invoke activity params {})) + ([activity params] (local-invoke activity params {})) ([activity params options] (let [act-name (a/get-annotation activity) stub (Workflow/newUntypedLocalActivityStub (a/local-invoke-options-> options))] (log/trace "local-invoke:" activity "with" params options) (-> (.executeAsync stub act-name u/bytes-type (u/->objarray params)) - (p/then (partial complete-invoke activity)) + (p/then (partial u/complete-invoke activity)) + (p/catch e/slingshot? e/recast-stone) (p/catch (fn [e] (log/error e) (throw e))))))) diff --git a/src/temporal/client/core.clj b/src/temporal/client/core.clj index c231183..8e16584 100644 --- a/src/temporal/client/core.clj +++ b/src/temporal/client/core.clj @@ -1,44 +1,16 @@ -;; Copyright © 2022 Manetu, Inc. All rights reserved +;; Copyright © Manetu, Inc. All rights reserved (ns temporal.client.core "Methods for client interaction with Temporal" (:require [taoensso.timbre :as log] [taoensso.nippy :as nippy] [promesa.core :as p] + [temporal.client.options :as copts] [temporal.internal.workflow :as w] - [temporal.internal.utils :as u]) + [temporal.internal.utils :as u] + [temporal.internal.exceptions :as e]) (:import [java.time Duration] - [io.temporal.client WorkflowClient WorkflowClientOptions WorkflowClientOptions$Builder WorkflowStub] - [io.temporal.serviceclient WorkflowServiceStubs WorkflowServiceStubsOptions WorkflowServiceStubsOptions$Builder])) - -(def ^:no-doc stub-options - {:channel #(.setChannel ^WorkflowServiceStubsOptions$Builder %1 %2) - :ssl-context #(.setSslContext ^WorkflowServiceStubsOptions$Builder %1 %2) - :enable-https #(.setEnableHttps ^WorkflowServiceStubsOptions$Builder %1 %2) - :target #(.setTarget ^WorkflowServiceStubsOptions$Builder %1 %2) - :rpc-timeout #(.setRpcTimeout ^WorkflowServiceStubsOptions$Builder %1 %2) - :rpc-long-poll-timeout #(.setRpcLongPollTimeout ^WorkflowServiceStubsOptions$Builder %1 %2) - :rpc-query-timeout #(.setRpcQueryTimeout ^WorkflowServiceStubsOptions$Builder %1 %2) - :backoff-reset-freq #(.setConnectionBackoffResetFrequency ^WorkflowServiceStubsOptions$Builder %1 %2) - :grpc-reconnect-freq #(.setGrpcReconnectFrequency ^WorkflowServiceStubsOptions$Builder %1 %2) - :headers #(.setHeaders ^WorkflowServiceStubsOptions$Builder %1 %2) - :enable-keepalive #(.setEnableKeepAlive ^WorkflowServiceStubsOptions$Builder %1 %2) - :keepalive-time #(.setKeepAliveTime ^WorkflowServiceStubsOptions$Builder %1 %2) - :keepalive-timeout #(.setKeepAliveTimeout ^WorkflowServiceStubsOptions$Builder %1 %2) - :keepalive-without-stream #(.setKeepAlivePermitWithoutStream ^WorkflowServiceStubsOptions$Builder %1 %2)}) - -(defn ^:no-doc stub-options-> - ^WorkflowServiceStubsOptions [params] - (u/build (WorkflowServiceStubsOptions/newBuilder) stub-options params)) - -(def ^:no-doc client-options - {:identity #(.setIdentity ^WorkflowClientOptions$Builder %1 %2) - :namespace #(.setNamespace ^WorkflowClientOptions$Builder %1 %2) - :data-converter #(.setDataConverter ^WorkflowClientOptions$Builder %1 %2)}) - -(defn ^:no-doc client-options-> - ^WorkflowClientOptions [params] - (u/build (WorkflowClientOptions/newBuilder (WorkflowClientOptions/getDefaultInstance)) client-options params)) + [io.temporal.client WorkflowClient WorkflowStub])) (defn create-client " @@ -47,39 +19,16 @@ workflow clients (See [[create-workflow]]). Arguments: -- `options`: Client configuration option map (See below) +- `options`: Options for configuring the `WorkflowClient` (See [[temporal.client.options/workflow-client-options]] and [[temporal.client.options/stub-options]]) - `timeout`: Connection timeout as a [Duration](https://docs.oracle.com/javase/8/docs/api//java/time/Duration.html) (default: 5s) -#### options map - - -| Value | Description | Type | Default | -| ------------------------- | --------------------------------------------------------------------------- | ------------ | ------- | -| :target | Sets the connection host:port | String | \"127.0.0.1:7233\" | -| :identity | Overrides the worker node identity (workers only) | String | | -| :namespace | Sets the Temporal namespace context for this client | String | | -| :data-converter | Overrides the data converter used to serialize arguments and results. | [DataConverter](https://www.javadoc.io/doc/io.temporal/temporal-sdk/latest/io/temporal/common/converter/DataConverter.html) | | -| :channel | Sets gRPC channel to use. Exclusive with target and sslContext | [ManagedChannel](https://grpc.github.io/grpc-java/javadoc/io/grpc/ManagedChannel.html) | | -| :ssl-context | Sets gRPC SSL Context to use | [SslContext](https://netty.io/4.0/api/io/netty/handler/ssl/SslContext.html) | | -| :enable-https | Sets option to enable SSL/TLS/HTTPS for gRPC | boolean | false | -| :rpc-timeout | Sets the rpc timeout value for non query and non long poll calls | [Duration](https://docs.oracle.com/javase/8/docs/api//java/time/Duration.html) | 10s | -| :rpc-long-poll-timeout | Sets the rpc timeout value | [Duration](https://docs.oracle.com/javase/8/docs/api//java/time/Duration.html) | 60s | -| :rpc-query-timeout | Sets the rpc timeout for queries | [Duration](https://docs.oracle.com/javase/8/docs/api//java/time/Duration.html) | 10s | -| :backoff-reset-freq | Sets frequency at which gRPC connection backoff should be reset practically | [Duration](https://docs.oracle.com/javase/8/docs/api//java/time/Duration.html) | 10s | -| :grpc-reconnect-freq | Sets frequency at which gRPC channel will be moved into an idle state | [Duration](https://docs.oracle.com/javase/8/docs/api//java/time/Duration.html) | 60s | -| :headers | Set the headers | [Metadata](https://grpc.github.io/grpc-java/javadoc/io/grpc/Metadata.html) | | -| :enable-keepalive | Set keep alive ping from client to the server | boolean | false | -| :keepalive-time | Set the keep alive time | [Duration](https://docs.oracle.com/javase/8/docs/api//java/time/Duration.html) | | -| :keepalive-timeout | Set the keep alive timeout | [Duration](https://docs.oracle.com/javase/8/docs/api//java/time/Duration.html) | | -| :keepalive-without-stream | Set if client sends keepalive pings even with no active RPCs | boolean | false | - " ([] (create-client {})) ([options] (create-client options (Duration/ofSeconds 5))) ([options timeout] - (let [service (WorkflowServiceStubs/newConnectedServiceStubs (stub-options-> options) timeout)] - (WorkflowClient/newInstance service (client-options-> options))))) + (let [service (copts/service-stub-> options timeout)] + (WorkflowClient/newInstance service (copts/workflow-client-options-> options))))) (defn create-workflow " @@ -98,12 +47,13 @@ Create a new workflow-stub instance, suitable for managing and interacting with ``` " ([^WorkflowClient client workflow-id] - (let [stub (.newUntypedWorkflowStub client workflow-id)] + (let [stub (.newUntypedWorkflowStub client (u/namify workflow-id))] (log/trace "create-workflow id:" workflow-id) {:client client :stub stub})) ([^WorkflowClient client workflow options] (let [wf-name (w/get-annotated-name workflow) - stub (.newUntypedWorkflowStub client wf-name (w/wf-options-> options))] + options (w/wf-options-> options) + stub (.newUntypedWorkflowStub client wf-name options)] (log/trace "create-workflow:" wf-name options) {:client client :stub stub}))) @@ -151,7 +101,11 @@ defworkflow once the workflow concludes. " [{:keys [^WorkflowStub stub] :as workflow}] (-> (.getResultAsync stub u/bytes-type) - (p/then nippy/thaw))) + (p/then nippy/thaw) + (p/catch e/slingshot? e/recast-stone) + (p/catch (fn [e] + (log/error e) + (throw e))))) (defn query " @@ -191,4 +145,4 @@ Forcefully terminates 'workflow' ``` " [{:keys [^WorkflowStub stub] :as workflow} reason params] - (.terminate stub reason (u/->objarray params))) \ No newline at end of file + (.terminate stub reason (u/->objarray params))) diff --git a/src/temporal/client/options.clj b/src/temporal/client/options.clj new file mode 100644 index 0000000..391245a --- /dev/null +++ b/src/temporal/client/options.clj @@ -0,0 +1,93 @@ +(ns temporal.client.options + (:require [temporal.internal.utils :as u]) + (:import [io.temporal.client WorkflowClientOptions WorkflowClientOptions$Builder] + [io.temporal.common.interceptors WorkflowClientInterceptorBase] + [io.temporal.client.schedules ScheduleClientOptions ScheduleClientOptions$Builder] + [io.temporal.serviceclient WorkflowServiceStubs WorkflowServiceStubsOptions WorkflowServiceStubsOptions$Builder])) + +(def workflow-client-options + " +`WorkflowClientOptions` configuration map (See [[temporal.client.core/create-client]]) + +| Value | Description | Type | Default | +| ------------------------- | --------------------------------------------------------------------------- | ------------ | ------- | +| :target | Sets the connection host:port | String | \"127.0.0.1:7233\" | +| :identity | Overrides the worker node identity (workers only) | String | | +| :namespace | Sets the Temporal namespace context for this client | String | | +| :data-converter | Overrides the data converter used to serialize arguments and results. | [DataConverter](https://www.javadoc.io/doc/io.temporal/temporal-sdk/latest/io/temporal/common/converter/DataConverter.html) | | +| :interceptors | Collection of interceptors used to intercept workflow client calls. | [WorkflowClientInterceptor](https://javadoc.io/doc/io.temporal/temporal-sdk/latest/io/temporal/common/interceptors/WorkflowClientInterceptor.html) | | +" + {:identity #(.setIdentity ^WorkflowClientOptions$Builder %1 %2) + :namespace #(.setNamespace ^WorkflowClientOptions$Builder %1 %2) + :data-converter #(.setDataConverter ^WorkflowClientOptions$Builder %1 %2) + :interceptors #(.setInterceptors ^WorkflowClientOptions$Builder %1 (into-array WorkflowClientInterceptorBase %2))}) + +(defn ^:no-doc workflow-client-options-> + ^WorkflowClientOptions [params] + (u/build (WorkflowClientOptions/newBuilder (WorkflowClientOptions/getDefaultInstance)) workflow-client-options params)) + +(def schedule-client-options + " +`ScheduleClientOptions` configuration map (See [[temporal.client.schedule/create-client]]) + +| Value | Description | Type | Default | +| ------------------------- | --------------------------------------------------------------------------- | ------------ | ------- | +| :target | Sets the connection host:port | String | \"127.0.0.1:7233\" | +| :identity | Overrides the worker node identity (workers only) | String | | +| :namespace | Sets the Temporal namespace context for this client | String | | +| :data-converter | Overrides the data converter used to serialize arguments and results. | [DataConverter](https://www.javadoc.io/doc/io.temporal/temporal-sdk/latest/io/temporal/common/converter/DataConverter.html) | | + +" + {:identity #(.setIdentity ^ScheduleClientOptions$Builder %1 %2) + :namespace #(.setNamespace ^ScheduleClientOptions$Builder %1 %2) + :data-converter #(.setDataConverter ^ScheduleClientOptions$Builder %1 %2)}) + +(defn ^:no-doc schedule-client-options-> + ^ScheduleClientOptions [params] + (u/build (ScheduleClientOptions/newBuilder (ScheduleClientOptions/getDefaultInstance)) schedule-client-options params)) + +(def stub-options + " +`WorkflowServiceStubsOptions` configuration map (See [[temporal.client.core/create-client]] or [[temporal.client.schedule/create-client]]) + +| Value | Description | Type | Default | +| ------------------------- | --------------------------------------------------------------------------- | ------------ | ------- | +| :channel | Sets gRPC channel to use. Exclusive with target and sslContext | [ManagedChannel](https://grpc.github.io/grpc-java/javadoc/io/grpc/ManagedChannel.html) | | +| :ssl-context | Sets gRPC SSL Context to use (See [[temporal.tls/new-ssl-context]]) | [SslContext](https://netty.io/4.0/api/io/netty/handler/ssl/SslContext.html) | | +| :enable-https | Sets option to enable SSL/TLS/HTTPS for gRPC | boolean | false | +| :rpc-timeout | Sets the rpc timeout value for non query and non long poll calls | [Duration](https://docs.oracle.com/javase/8/docs/api//java/time/Duration.html) | 10s | +| :rpc-long-poll-timeout | Sets the rpc timeout value | [Duration](https://docs.oracle.com/javase/8/docs/api//java/time/Duration.html) | 60s | +| :rpc-query-timeout | Sets the rpc timeout for queries | [Duration](https://docs.oracle.com/javase/8/docs/api//java/time/Duration.html) | 10s | +| :backoff-reset-freq | Sets frequency at which gRPC connection backoff should be reset practically | [Duration](https://docs.oracle.com/javase/8/docs/api//java/time/Duration.html) | 10s | +| :grpc-reconnect-freq | Sets frequency at which gRPC channel will be moved into an idle state | [Duration](https://docs.oracle.com/javase/8/docs/api//java/time/Duration.html) | 60s | +| :headers | Set the headers | [Metadata](https://grpc.github.io/grpc-java/javadoc/io/grpc/Metadata.html) | | +| :enable-keepalive | Set keep alive ping from client to the server | boolean | false | +| :keepalive-time | Set the keep alive time | [Duration](https://docs.oracle.com/javase/8/docs/api//java/time/Duration.html) | | +| :keepalive-timeout | Set the keep alive timeout | [Duration](https://docs.oracle.com/javase/8/docs/api//java/time/Duration.html) | | +| :keepalive-without-stream | Set if client sends keepalive pings even with no active RPCs | boolean | false | +| :metrics-scope | The scope to be used for metrics reporting | [Scope](https://github.com/uber-java/tally/blob/master/core/src/main/java/com/uber/m3/tally/Scope.java) | | + +" + {:channel #(.setChannel ^WorkflowServiceStubsOptions$Builder %1 %2) + :ssl-context #(.setSslContext ^WorkflowServiceStubsOptions$Builder %1 %2) + :enable-https #(.setEnableHttps ^WorkflowServiceStubsOptions$Builder %1 %2) + :target #(.setTarget ^WorkflowServiceStubsOptions$Builder %1 %2) + :rpc-timeout #(.setRpcTimeout ^WorkflowServiceStubsOptions$Builder %1 %2) + :rpc-long-poll-timeout #(.setRpcLongPollTimeout ^WorkflowServiceStubsOptions$Builder %1 %2) + :rpc-query-timeout #(.setRpcQueryTimeout ^WorkflowServiceStubsOptions$Builder %1 %2) + :backoff-reset-freq #(.setConnectionBackoffResetFrequency ^WorkflowServiceStubsOptions$Builder %1 %2) + :grpc-reconnect-freq #(.setGrpcReconnectFrequency ^WorkflowServiceStubsOptions$Builder %1 %2) + :headers #(.setHeaders ^WorkflowServiceStubsOptions$Builder %1 %2) + :enable-keepalive #(.setEnableKeepAlive ^WorkflowServiceStubsOptions$Builder %1 %2) + :keepalive-time #(.setKeepAliveTime ^WorkflowServiceStubsOptions$Builder %1 %2) + :keepalive-timeout #(.setKeepAliveTimeout ^WorkflowServiceStubsOptions$Builder %1 %2) + :keepalive-without-stream #(.setKeepAlivePermitWithoutStream ^WorkflowServiceStubsOptions$Builder %1 %2) + :metrics-scope #(.setMetricsScope ^WorkflowServiceStubsOptions$Builder %1 %2)}) + +(defn stub-options-> + ^WorkflowServiceStubsOptions [params] + (u/build (WorkflowServiceStubsOptions/newBuilder) stub-options params)) + +(defn service-stub-> + [options timeout] + (WorkflowServiceStubs/newConnectedServiceStubs (stub-options-> options) timeout)) diff --git a/src/temporal/client/schedule.clj b/src/temporal/client/schedule.clj new file mode 100644 index 0000000..476285a --- /dev/null +++ b/src/temporal/client/schedule.clj @@ -0,0 +1,152 @@ +(ns temporal.client.schedule + (:require [taoensso.timbre :as log] + [temporal.client.options :as copts] + [temporal.internal.utils :as u] + [temporal.internal.schedule :as s]) + (:import [java.time Duration] + [io.temporal.client.schedules ScheduleClient ScheduleUpdate ScheduleUpdateInput])) + +(set! *warn-on-reflection* true) + +(defn create-client + "Creates a `ScheduleClient` instance suitable for interacting with Temporal's Schedules. + + Arguments: + + - `options`: Options for configuring the `ScheduleClient` (See [[temporal.client.options/schedule-client-options]] and [[temporal.client.options/stub-options]]) + - `timeout`: Connection timeout as a [Duration](https://docs.oracle.com/javase/8/docs/api//java/time/Duration.html) (default: 5s) + +" + ([options] (create-client options (Duration/ofSeconds 5))) + ([options timeout] + (let [service (copts/service-stub-> options timeout)] + (ScheduleClient/newInstance service (copts/schedule-client-options-> options))))) + +(defn schedule + "Creates a `Schedule` with Temporal + + Arguments: + + - `client`: [ScheduleClient]https://www.javadoc.io/doc/io.temporal/temporal-sdk/latest/io/temporal/client/schedules/ScheduleClient.html + - `schedule-id`: The string name of the schedule in Temporal, keeping it consistent with workflow id is a good idea + - `options`: A map containing the `:schedule`, `:state`, `:policy`, `:spec`, and `:action` option maps for the `Schedule` + + ```clojure + (defworkflow my-workflow + [ctx args] + ...) + + (let [client (create-client {:target \"localhost:8080\"})] + (create + client + \"my-workflow\" + {:schedule {:trigger-immediately? false} + :state {:paused? false} + :policy {:pause-on-failure? true} + :spec {:crons [\"0 * * * *\"]} + :action {:workflow-type my-workflow + :arguments {:value 1} + :options {:workflow-id \"my-workflow\"}}})) + ```" + [^ScheduleClient client schedule-id options] + (let [schedule (s/schedule-> options) + schedule-options (s/schedule-options-> (:schedule options))] + (log/tracef "create schedule:" schedule-id) + (.createSchedule client schedule-id schedule schedule-options))) + +(defn unschedule + "Deletes a Temporal `Schedule` via a schedule-id + + ```clojure + + (let [client (create-client {:target \"localhost:8080\"})] + (unschedule client \"my-schedule\") + ```" + [^ScheduleClient client schedule-id] + (log/tracef "remove schedule:" schedule-id) + (-> client + (.getHandle schedule-id) + (.delete))) + +(defn describe + "Describes an existing Temporal `Schedule` via a schedule-id + + ```clojure + + (let [client (create-client {:target \"localhost:8080\"})] + (describe client \"my-schedule\") + ```" + [^ScheduleClient client schedule-id] + (-> client + (.getHandle schedule-id) + (.describe))) + +(defn pause + "Pauses an existing Temporal `Schedule` via a schedule-id + + ```clojure + + (let [client (create-client {:target \"localhost:8080\"})] + (pause client \"my-schedule\") + ```" + [^ScheduleClient client schedule-id] + (log/tracef "pausing schedule:" schedule-id) + (-> client + (.getHandle schedule-id) + (.pause))) + +(defn unpause + "Unpauses an existing Temporal `Schedule` via a schedule-id + + ```clojure + + (let [client (create-client {:target \"localhost:8080\"})] + (unpause client \"my-schedule\") + ```" + [^ScheduleClient client schedule-id] + (log/tracef "unpausing schedule:" schedule-id) + (-> client + (.getHandle schedule-id) + (.unpause))) + +(defn execute + "Runs a Temporal `Schedule's` workflow execution immediately via a schedule-id + + ```clojure + + (let [client (create-client {:target \"localhost:8080\"})] + (execute client \"my-schedule\" :skip) + ```" + [^ScheduleClient client schedule-id overlap-policy] + (log/tracef "execute schedule:" schedule-id) + (-> client + (.getHandle schedule-id) + (.trigger (s/overlap-policy-> overlap-policy)))) + +(defn reschedule + "Updates the current Temporal `Schedule` via a schedule-id. + Uses the same options as create asside from `:schedule` + + The `ScheduleHandle` takes a unary function object + of the signature: + + (ScheduleUpdateInput) -> ScheduleUpdate + + ```clojure + + (let [client (create-client {:target \"localhost:8080\"})] + (reschedule client \"my-schedule\" {:spec {:crons [\"1 * * * *\"]}}) + ```" + [^ScheduleClient client schedule-id options] + (log/tracef "update schedule:" schedule-id) + (letfn [(update-fn + [opts ^ScheduleUpdateInput input] + (let [schedule (-> input + (.getDescription) + (.getSchedule))] + (-> schedule + (s/schedule-> opts) + (ScheduleUpdate.))))] + (-> client + (.getHandle schedule-id) + (.update (u/->Func (partial update-fn options)))))) diff --git a/src/temporal/client/worker.clj b/src/temporal/client/worker.clj index 8949a4f..997dbea 100644 --- a/src/temporal/client/worker.clj +++ b/src/temporal/client/worker.clj @@ -1,4 +1,4 @@ -;; Copyright © 2022 Manetu, Inc. All rights reserved +;; Copyright © Manetu, Inc. All rights reserved (ns temporal.client.worker "Methods for managing a Temporal worker instance" @@ -6,9 +6,10 @@ [temporal.internal.activity :as a] [temporal.internal.workflow :as w] [temporal.internal.utils :as u]) - (:import [io.temporal.worker Worker WorkerFactory WorkerOptions WorkerOptions$Builder] + (:import [io.temporal.worker Worker WorkerFactory WorkerFactoryOptions WorkerFactoryOptions$Builder WorkerOptions WorkerOptions$Builder] [temporal.internal.dispatcher DynamicWorkflowProxy] - [io.temporal.workflow DynamicWorkflow])) + [io.temporal.workflow DynamicWorkflow] + [io.temporal.common.interceptors WorkerInterceptor])) (defn ^:no-doc init " @@ -20,33 +21,64 @@ Initializes a worker instance, suitable for real connections or unit-testing wit {:activities (a/import-dispatch activities) :workflows (w/import-dispatch workflows)})] (log/trace "init:" dispatch) (.registerActivitiesImplementations worker (to-array [(a/dispatcher ctx (:activities dispatch))])) - (.addWorkflowImplementationFactory worker DynamicWorkflowProxy - (u/->Func - (fn [] - (new DynamicWorkflowProxy - (reify DynamicWorkflow - (execute [_ args] - (w/execute ctx (:workflows dispatch) args))))))))) + (.registerWorkflowImplementationFactory worker DynamicWorkflowProxy + (u/->Func + (fn [] + (new DynamicWorkflowProxy + (reify DynamicWorkflow + (execute [_ args] + (w/execute ctx (:workflows dispatch) args))))))))) +(def worker-factory-options + " +Options for configuring the worker-factory (See [[start]]) + +| Value | Description | Type | Default | +| ------------ | ----------------------------------------------------------------- | ---------------- | ------- | +| :enable-logging-in-replay | | boolean | false | +| :max-workflow-thread-count | Maximum number of threads available for workflow execution across all workers created by the Factory. | int | 600 | +| :worker-interceptors | Collection of WorkerInterceptors | [WorkerInterceptor](https://javadoc.io/doc/io.temporal/temporal-sdk/latest/io/temporal/common/interceptors/WorkerInterceptor.html) | | +| :workflow-cache-size | To avoid constant replay of code the workflow objects are cached on a worker. This cache is shared by all workers created by the Factory. | int | 600 | +| :using-virtual-workflow-threads | Use Virtual Threads for all workflow threads across all workers created by this factory. This option is only supported for JDK >= 21. If set then :max-workflow-thread-count is ignored. | boolean | false | +" + + {:enable-logging-in-replay #(.setEnableLoggingInReplay ^WorkerFactoryOptions$Builder %1 %2) + :max-workflow-thread-count #(.setMaxWorkflowThreadCount ^WorkerFactoryOptions$Builder %1 %2) + :worker-interceptors #(.setWorkerInterceptors ^WorkerFactoryOptions$Builder %1 (into-array WorkerInterceptor %2)) + :workflow-cache-size #(.setWorkflowCacheSize ^WorkerFactoryOptions$Builder %1 %2) + :using-virtual-workflow-threads #(.setUsingVirtualWorkflowThreads ^WorkerFactoryOptions$Builder %1 %2)}) + +(defn ^:no-doc worker-factory-options-> + ^WorkerFactoryOptions [params] + (u/build (WorkerFactoryOptions/newBuilder (WorkerFactoryOptions/getDefaultInstance)) worker-factory-options params)) + (def worker-options " Options for configuring workers (See [[start]]) -| Value | Mandatory | Description | Type | Default | -| ------------ | ----------- | ----------------------------------------------------------------- | ---------------- | ------- | -| :task-queue | y | The name of the task-queue for this worker instance to listen on. | String / keyword | | -| :ctx | | An opaque handle that is passed back as the first argument of [[temporal.workflow/defworkflow]] and [[temporal.activity/defactivity]], useful for passing state such as database or network connections. | | nil | -| :dispatch | | An optional map explicitly setting the dispatch table | See below | All visible activities/workers are automatically registered | -| :max-concurrent-activity-task-pollers | | Number of simultaneous poll requests on activity task queue. Consider incrementing if the worker is not throttled due to `MaxActivitiesPerSecond` or `MaxConcurrentActivityExecutionSize` options and still cannot keep up with the request rate. | int | 5 | -| :max-concurrent-activity-execution-size | | Maximum number of activities executed in parallel. | int | 200 | -| :max-concurrent-local-activity-execution-size | | Maximum number of local activities executed in parallel. | int | 200 | -| :max-concurrent-workflow-task-pollers | | Number of simultaneous poll requests on workflow task queue. | int | 2 | -| :max-concurrent-workflow-task-execution-size | | Maximum number of simultaneously executed workflow tasks. | int | 200 | -| :default-deadlock-detection-timeout | | Time period in ms that will be used to detect workflow deadlock. | long | 1000 | -| :default-heartbeat-throttle-interval | | Default amount of time between sending each pending heartbeat. | [Duration](https://docs.oracle.com/javase/8/docs/api//java/time/Duration.html) | 30s | -| :max-heartbeat-throttle-interval | | Maximum amount of time between sending each pending heartbeat. | [Duration](https://docs.oracle.com/javase/8/docs/api//java/time/Duration.html) | 60s | -| :local-activity-worker-only | | Worker should only handle workflow tasks and local activities. | boolean | false | -| :max-taskqueue-activities-per-second | | Sets the rate limiting on number of activities per second. | double | 0.0 (unlimited) | -| :max-workers-activities-per-second | | Maximum number of activities started per second. | double | 0.0 (unlimited) | +| Value | Mandatory | Description | Type | Default | +|-------------------------------------------------|-----------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------------------------------------------------------------------------------|-------------------------------------------------------------| +| :task-queue | y | The name of the task-queue for this worker instance to listen on. | String / keyword | | +| :ctx | | An opaque handle that is passed back as the first argument of [[temporal.workflow/defworkflow]] and [[temporal.activity/defactivity]], useful for passing state such as database or network connections. | | nil | +| :dispatch | | An optional map explicitly setting the dispatch table | See below | All visible activities/workers are automatically registered | +| :default-deadlock-detection-timeout | | Time period in ms that will be used to detect workflow deadlock. | long | 1000 | +| :default-heartbeat-throttle-interval | | Default amount of time between sending each pending heartbeat. | [Duration](https://docs.oracle.com/javase/8/docs/api//java/time/Duration.html) | 30s | +| :local-activity-worker-only | | Worker should only handle workflow tasks and local activities. | boolean | false | +| :max-concurrent-activity-execution-size | | Maximum number of activities executed in parallel. | int | 200 | +| :max-concurrent-activity-execution-size | | Maximum number of activities executed in parallel. | int | 200 | +| :max-concurrent-activity-task-pollers | | Number of simultaneous poll requests on activity task queue. Consider incrementing if the worker is not throttled due to `MaxActivitiesPerSecond` or `MaxConcurrentActivityExecutionSize` options and still cannot keep up with the request rate. | int | 5 | +| :max-concurrent-local-activity-execution-size | | Maximum number of local activities executed in parallel. | int | 200 | +| :max-concurrent-nexus-execution-size | | Maximum number of simultaneously executed nexus tasks. | int | 200 | +| :max-concurrent-nexus-task-pollers | | Number of simultaneous poll requests on nexus tasks. | int | 5 | +| :max-concurrent-workflow-task-execution-size | | Maximum number of simultaneously executed workflow tasks. | int | 200 | +| :max-concurrent-workflow-task-pollers | | Number of simultaneous poll requests on workflow task queue. | int | 5 | +| :max-heartbeat-throttle-interval | | Maximum amount of time between sending each pending heartbeat. | [Duration](https://docs.oracle.com/javase/8/docs/api//java/time/Duration.html) | 60s | +| :max-taskqueue-activities-per-second | | Sets the rate limiting on number of activities per second. | double | 0.0 (unlimited) | +| :max-workers-activities-per-second | | Maximum number of activities started per second. | double | 0.0 (unlimited) | +| :using-virtual-threads | | Use virtual threads for all the task executors created by this worker. | boolean | false | +| :using-virtual-threads-on-activity-worker | | Use virtual threads for the activity task executors created by this worker. | boolean | false | +| :using-virtual-threads-on-local-activity-worker | | Use virtual threads for the local activity task executors created by this worker. | boolean | false | +| :using-virtual-threads-on-nexus-worker | | Use Virtual Threads for the Nexus task executors created by this worker. | boolean | false | +| :using-virtual-threads-on-workflow-worker | | Use Virtual Threads for the Workflow task executors created by this worker. | boolean | false | #### dispatch-table @@ -64,17 +96,24 @@ Options for configuring workers (See [[start]]) ``` " - {:max-concurrent-activity-task-pollers #(.setMaxConcurrentActivityTaskPollers ^WorkerOptions$Builder %1 %2) + {:default-deadlock-detection-timeout #(.setDefaultDeadlockDetectionTimeout ^WorkerOptions$Builder %1 %2) + :default-heartbeat-throttle-interval #(.setDefaultHeartbeatThrottleInterval ^WorkerOptions$Builder %1 %2) + :local-activity-worker-only #(.setLocalActivityWorkerOnly ^WorkerOptions$Builder %1 %2) :max-concurrent-activity-execution-size #(.setMaxConcurrentActivityExecutionSize ^WorkerOptions$Builder %1 %2) + :max-concurrent-activity-task-pollers #(.setMaxConcurrentActivityTaskPollers ^WorkerOptions$Builder %1 %2) :max-concurrent-local-activity-execution-size #(.setMaxConcurrentLocalActivityExecutionSize ^WorkerOptions$Builder %1 %2) - :max-concurrent-workflow-task-pollers #(.setMaxConcurrentWorkflowTaskPollers ^WorkerOptions$Builder %1 %2) + :max-concurrent-nexus-execution-size #(.setMaxConcurrentNexusExecutionSize ^WorkerOptions$Builder %1 %2) + :max-concurrent-nexus-task-pollers #(.setMaxConcurrentNexusTaskPollers ^WorkerOptions$Builder %1 %2) :max-concurrent-workflow-task-execution-size #(.setMaxConcurrentWorkflowTaskExecutionSize ^WorkerOptions$Builder %1 %2) - :default-deadlock-detection-timeout #(.setDefaultDeadlockDetectionTimeout ^WorkerOptions$Builder %1 %2) - :default-heartbeat-throttle-interval #(.setDefaultHeartbeatThrottleInterval ^WorkerOptions$Builder %1 %2) + :max-concurrent-workflow-task-pollers #(.setMaxConcurrentWorkflowTaskPollers ^WorkerOptions$Builder %1 %2) :max-heartbeat-throttle-interval #(.setMaxHeartbeatThrottleInterval ^WorkerOptions$Builder %1 %2) - :local-activity-worker-only #(.setLocalActivityWorkerOnly ^WorkerOptions$Builder %1 %2) :max-taskqueue-activities-per-second #(.setMaxTaskQueueActivitiesPerSecond ^WorkerOptions$Builder %1 %2) - :max-workers-activities-per-second #(.setMaxWorkerActivitiesPerSecond ^WorkerOptions$Builder %1 %2)}) + :max-workers-activities-per-second #(.setMaxWorkerActivitiesPerSecond ^WorkerOptions$Builder %1 %2) + :using-virtual-threads #(.setUsingVirtualThreads ^WorkerOptions$Builder %1 %2) + :using-virtual-threads-on-activity-worker #(.setUsingVirtualThreadsOnActivityWorker ^WorkerOptions$Builder %1 %2) + :using-virtual-threads-on-local-activity-worker #(.setUsingVirtualThreadsOnLocalActivityWorker ^WorkerOptions$Builder %1 %2) + :using-virtual-threads-on-nexus-worker #(.setUsingVirtualThreadsOnNexusWorker ^WorkerOptions$Builder %1 %2) + :using-virtual-threads-on-workflow-worker #(.setUsingVirtualThreadsOnWorkflowWorker ^WorkerOptions$Builder %1 %2)}) (defn ^:no-doc worker-options-> ^WorkerOptions [params] @@ -86,19 +125,21 @@ Starts a worker processing loop. Arguments: -- `client`: WorkflowClient instance returned from [[temporal.client.core/create-client]] -- `options`: Worker start options (See [[worker-options]]) +- `client`: WorkflowClient instance returned from [[temporal.client.core/create-client]] +- `options`: Worker start options (See [[worker-options]]) +- `factory-options`: WorkerFactory options (See [[worker-factory-options]]) ```clojure (start {:task-queue ::my-queue :ctx {:some \"context\"}}) ``` " - [client {:keys [task-queue] :as options}] - (let [factory (WorkerFactory/newInstance client) - worker (.newWorker factory (u/namify task-queue) (worker-options-> options))] - (init worker options) - (.start factory) - {:factory factory :worker worker})) + ([client options] (start client options nil)) + ([client {:keys [task-queue] :as options} factory-options] + (let [factory (WorkerFactory/newInstance client (worker-factory-options-> factory-options)) + worker (.newWorker factory (u/namify task-queue) (worker-options-> options))] + (init worker options) + (.start factory) + {:factory factory :worker worker}))) (defn stop " diff --git a/src/temporal/codec.clj b/src/temporal/codec.clj index b8da810..fdc6188 100644 --- a/src/temporal/codec.clj +++ b/src/temporal/codec.clj @@ -1,4 +1,4 @@ -;; Copyright © 2022 Manetu, Inc. All rights reserved +;; Copyright © Manetu, Inc. All rights reserved (ns temporal.codec "Methods for managing codecs between a client and the Temporal backend" diff --git a/src/temporal/common.clj b/src/temporal/common.clj index 1b990c7..5981c5d 100644 --- a/src/temporal/common.clj +++ b/src/temporal/common.clj @@ -1,4 +1,4 @@ -;; Copyright © 2022 Manetu, Inc. All rights reserved +;; Copyright © Manetu, Inc. All rights reserved (ns temporal.common (:require [temporal.internal.utils :as u]) diff --git a/src/temporal/exceptions.clj b/src/temporal/exceptions.clj new file mode 100644 index 0000000..1062ede --- /dev/null +++ b/src/temporal/exceptions.clj @@ -0,0 +1,5 @@ +;; Copyright © 2024 Manetu, Inc. All rights reserved + +(ns temporal.exceptions) + +(def flags {::non-retriable? "'true' indicates this exception is not going to be retried even if it is not included into retry policy doNotRetry list."}) diff --git a/src/temporal/internal/activity.clj b/src/temporal/internal/activity.clj index ed2ef93..9e9bb4f 100644 --- a/src/temporal/internal/activity.clj +++ b/src/temporal/internal/activity.clj @@ -1,14 +1,17 @@ -;; Copyright © 2022 Manetu, Inc. All rights reserved +;; Copyright © Manetu, Inc. All rights reserved (ns ^:no-doc temporal.internal.activity (:require [clojure.core.protocols :as p] [clojure.datafy :as d] [clojure.core.async :refer [go [{:keys [maximum-attempts] :or {maximum-attempts 0} :as options}] + (-> options + (cond-> (zero? maximum-attempts) (assoc :maximum-attempts Integer/MAX_VALUE)) ;; workaround for https://github.com/temporalio/sdk-java/issues/1727 + (common/retry-options->))) + (def local-invoke-option-spec {:start-to-close-timeout #(.setStartToCloseTimeout ^LocalActivityOptions$Builder %1 %2) :schedule-to-close-timeout #(.setScheduleToCloseTimeout ^LocalActivityOptions$Builder %1 %2) - :retry-options #(.setRetryOptions ^LocalActivityOptions$Builder %1 (common/retry-options-> %2)) + :retry-options #(.setRetryOptions ^LocalActivityOptions$Builder %1 (local-retry-options-> %2)) :do-not-include-args #(.setDoNotIncludeArgumentsIntoMarker ^LocalActivityOptions$Builder %1 %2) :local-retry-threshold #(.setLocalRetryThreshold ^LocalActivityOptions$Builder %1 %2)}) (defn local-invoke-options-> - ^LocalActivityOptions [params] - (u/build (LocalActivityOptions/newBuilder (LocalActivityOptions/getDefaultInstance)) local-invoke-option-spec (import-invoke-options params))) + ^LocalActivityOptions [{:keys [retry-options] :or {retry-options {}} :as params}] + (u/build (LocalActivityOptions/newBuilder (LocalActivityOptions/getDefaultInstance)) local-invoke-option-spec (import-invoke-options (assoc params :retry-options retry-options)))) (extend-protocol p/Datafiable ActivityInfo @@ -58,9 +66,10 @@ :activity-type (.getActivityType d)})) (defn get-info [] - (-> (Activity/getExecutionContext) - (.getInfo) - (d/datafy))) + (->> (Activity/getExecutionContext) + (.getInfo) + (d/datafy) + (into {}))) (defn get-annotation ^String [x] @@ -102,11 +111,14 @@ f (u/find-dispatch-fn dispatch activity-type) a (u/->args args)] (log/trace activity-id "calling" f "with args:" a) - (try - (result-> activity-id (f ctx a)) - (catch Exception e - (log/error e) - (throw e))))) + (try+ + (result-> activity-id (f ctx a)) + (catch DestroyWorkflowThreadError ex + (log/debug activity-id "thread evicted") + (throw ex)) + (catch Object o + (log/error &throw-context) + (e/forward &throw-context))))) (defn dispatcher [ctx dispatch] (reify DynamicActivity diff --git a/src/temporal/internal/child_workflow.clj b/src/temporal/internal/child_workflow.clj new file mode 100644 index 0000000..3335dae --- /dev/null +++ b/src/temporal/internal/child_workflow.clj @@ -0,0 +1,35 @@ +(ns ^:no-doc temporal.internal.child-workflow + (:require [temporal.common :as common] + [temporal.internal.utils :as u] + [temporal.internal.workflow :as w]) + (:import [java.time Duration] + [io.temporal.api.enums.v1 ParentClosePolicy] + [io.temporal.workflow ChildWorkflowOptions ChildWorkflowOptions$Builder ChildWorkflowCancellationType])) + +(def cancellation-type-> + {:abandon ChildWorkflowCancellationType/ABANDON + :try-cancel ChildWorkflowCancellationType/TRY_CANCEL + :wait-cancellation-completed ChildWorkflowCancellationType/WAIT_CANCELLATION_COMPLETED + :wait-cancellation-requested ChildWorkflowCancellationType/WAIT_CANCELLATION_REQUESTED}) + +(def parent-close-policy-> + {:abandon ParentClosePolicy/PARENT_CLOSE_POLICY_ABANDON + :request-cancel ParentClosePolicy/PARENT_CLOSE_POLICY_REQUEST_CANCEL + :terminate ParentClosePolicy/PARENT_CLOSE_POLICY_TERMINATE}) + +(def ^:no-doc child-workflow-option-spec + {:task-queue #(.setTaskQueue ^ChildWorkflowOptions$Builder %1 (u/namify %2)) + :workflow-id #(.setWorkflowId ^ChildWorkflowOptions$Builder %1 (u/namify %2)) + :workflow-id-reuse-policy #(.setWorkflowIdReusePolicy ^ChildWorkflowOptions$Builder %1 (w/workflow-id-reuse-policy-> %2)) + :parent-close-policy #(.setParentClosePolicy ^ChildWorkflowOptions$Builder %1 (parent-close-policy-> %2)) + :workflow-execution-timeout #(.setWorkflowExecutionTimeout ^ChildWorkflowOptions$Builder %1 %2) + :workflow-run-timeout #(.setWorkflowRunTimeout ^ChildWorkflowOptions$Builder %1 %2) + :workflow-task-timeout #(.setWorkflowTaskTimeout ^ChildWorkflowOptions$Builder %1 %2) + :retry-options #(.setRetryOptions %1 (common/retry-options-> %2)) + :cron-schedule #(.setCronSchedule ^ChildWorkflowOptions$Builder %1 %2) + :cancellation-type #(.setCancellationType ^ChildWorkflowOptions$Builder %1 (cancellation-type-> %2)) + :memo #(.setMemo ^ChildWorkflowOptions$Builder %1 %2)}) + +(defn child-workflow-options-> + ^ChildWorkflowOptions [options] + (u/build (ChildWorkflowOptions/newBuilder) child-workflow-option-spec options)) diff --git a/src/temporal/internal/exceptions.clj b/src/temporal/internal/exceptions.clj new file mode 100644 index 0000000..0f07cbd --- /dev/null +++ b/src/temporal/internal/exceptions.clj @@ -0,0 +1,41 @@ +;; Copyright © 2024 Manetu, Inc. All rights reserved + +(ns ^:no-doc temporal.internal.exceptions + (:require [slingshot.slingshot :refer [throw+]] + [taoensso.timbre :as log] + [temporal.exceptions :as e] + [temporal.internal.utils :as u]) + (:import [io.temporal.failure ApplicationFailure])) + +(def exception-type (name ::slingshot)) + +(defn slingshot? [ex] + (and (instance? ApplicationFailure (ex-cause ex)) + (= exception-type (.getType (cast ApplicationFailure (ex-cause ex)))))) + +(defn forward + [{:keys [message wrapper] {:keys [::e/non-retriable?] :or {non-retriable? false} :as object} :object :as context}] + (log/trace "forward:" context) + (cond + (and (map? object) (empty? object) (true? (some->> wrapper (instance? Throwable)))) + (do + (log/trace "recasting wrapped exception") + (throw wrapper)) + + (instance? Throwable object) + (do + (log/trace "recasting throwable") + (throw object)) + + :else + (do + (log/trace "recasting stone within ApplicationFailure") + (let [o (u/->objarray context) + t (if non-retriable? + (ApplicationFailure/newNonRetryableFailure message exception-type o) + (ApplicationFailure/newFailure message exception-type o))] + (throw t))))) + +(defn recast-stone [ex] + (let [stone (->> ex ex-cause (cast ApplicationFailure) (.getDetails) u/->args :object)] + (throw+ stone))) ;; FIXME: Does not preserve the original stack-trace diff --git a/src/temporal/internal/promise.clj b/src/temporal/internal/promise.clj index a6ce9fd..c042a50 100644 --- a/src/temporal/internal/promise.clj +++ b/src/temporal/internal/promise.clj @@ -1,8 +1,7 @@ -;; Copyright © 2022 Manetu, Inc. All rights reserved +;; Copyright © Manetu, Inc. All rights reserved (ns ^:no-doc temporal.internal.promise - (:require [taoensso.timbre :as log] - [promesa.protocols :as pt] + (:require [promesa.protocols :as pt] [temporal.internal.utils :refer [->Func] :as u]) (:import [clojure.lang IDeref IBlockingDeref] [io.temporal.workflow Promise] diff --git a/src/temporal/internal/schedule.clj b/src/temporal/internal/schedule.clj new file mode 100644 index 0000000..dd20138 --- /dev/null +++ b/src/temporal/internal/schedule.clj @@ -0,0 +1,98 @@ +(ns ^:no-doc temporal.internal.schedule + (:require [temporal.internal.utils :as u] + [temporal.internal.workflow :as w]) + (:import [io.temporal.api.enums.v1 ScheduleOverlapPolicy] + [io.temporal.client.schedules + Schedule + Schedule$Builder + ScheduleActionStartWorkflow + ScheduleActionStartWorkflow$Builder + ScheduleOptions + ScheduleOptions$Builder + SchedulePolicy + SchedulePolicy$Builder + ScheduleSpec + ScheduleSpec$Builder + ScheduleState + ScheduleState$Builder])) + +(set! *warn-on-reflection* true) + +(defn overlap-policy-> + [overlap-policy] + (case overlap-policy + :allow ScheduleOverlapPolicy/SCHEDULE_OVERLAP_POLICY_ALLOW_ALL + :buffer ScheduleOverlapPolicy/SCHEDULE_OVERLAP_POLICY_BUFFER_ALL + :buffer-one ScheduleOverlapPolicy/SCHEDULE_OVERLAP_POLICY_BUFFER_ONE + :cancel ScheduleOverlapPolicy/SCHEDULE_OVERLAP_POLICY_CANCEL_OTHER + :skip ScheduleOverlapPolicy/SCHEDULE_OVERLAP_POLICY_SKIP + :terminate ScheduleOverlapPolicy/SCHEDULE_OVERLAP_POLICY_TERMINATE_OTHER)) + +(def schedule-policy-spec + {:catchup-window #(.setCatchupWindow ^SchedulePolicy$Builder %1 %2) + :overlap (fn [^SchedulePolicy$Builder builder value] + (.setOverlap builder (overlap-policy-> value))) + :pause-on-failure? #(.setPauseOnFailure ^SchedulePolicy$Builder %1 %2)}) + +(defn schedule-policy-> + ^SchedulePolicy [params] + (u/build (SchedulePolicy/newBuilder) schedule-policy-spec params)) + +(def schedule-spec-spec + {:calendars #(.setCalendars ^ScheduleSpec$Builder %1 %2) + :cron-expressions #(.setCronExpressions ^ScheduleSpec$Builder %1 %2) + :intervals #(.setIntervals ^ScheduleSpec$Builder %1 %2) + :end-at #(.setEndAt ^ScheduleSpec$Builder %1 %2) + :jitter #(.setJitter ^ScheduleSpec$Builder %1 %2) + :skip-at #(.setSkip ^ScheduleSpec$Builder %1 %2) + :start-at #(.setStartAt ^ScheduleSpec$Builder %1 %2) + :timezone #(.setTimeZoneName ^ScheduleSpec$Builder %1 %2)}) + +(defn schedule-spec-> + ^ScheduleSpec [params] + (u/build (ScheduleSpec/newBuilder) schedule-spec-spec params)) + +(def schedule-action-start-workflow-spec-> + {:options #(.setOptions ^ScheduleActionStartWorkflow$Builder %1 (w/wf-options-> %2)) + :arguments (fn [^ScheduleActionStartWorkflow$Builder builder value] + (.setArguments builder (u/->objarray value))) + :workflow-type (fn [^ScheduleActionStartWorkflow$Builder builder value] + (if (string? value) + (.setWorkflowType builder ^String value) + (.setWorkflowType builder (w/get-annotated-name value))))}) + +(defn schedule-action-start-workflow-> + ^ScheduleActionStartWorkflow [params] + (u/build + (ScheduleActionStartWorkflow/newBuilder) + schedule-action-start-workflow-spec-> params)) + +(def schedule-state-spec + {:limited-action? #(.setLimitedAction ^ScheduleState$Builder %1 %2) + :note #(.setNote ^ScheduleState$Builder %1 %2) + :paused? #(.setPaused ^ScheduleState$Builder %1 %2) + :remaining-actions #(.setRemainingActions ^ScheduleState$Builder %1 %2)}) + +(defn schedule-state-> + ^ScheduleState [params] + (u/build (ScheduleState/newBuilder) schedule-state-spec params)) + +(def schedule-options-spec + {:memo #(.setMemo ^ScheduleOptions$Builder %1 %2) + :trigger-immediately? #(.setTriggerImmediately ^ScheduleOptions$Builder %1 %2)}) + +(defn schedule-options-> + ^ScheduleOptions [params] + (u/build (ScheduleOptions/newBuilder) schedule-options-spec params)) + +(def schedule-spec + {:action #(.setAction ^Schedule$Builder %1 (schedule-action-start-workflow-> %2)) + :policy #(.setPolicy ^Schedule$Builder %1 (schedule-policy-> %2)) + :spec #(.setSpec ^Schedule$Builder %1 (schedule-spec-> %2)) + :state #(.setState ^Schedule$Builder %1 (schedule-state-> %2))}) + +(defn schedule-> + (^Schedule [params] + (u/build (Schedule/newBuilder) schedule-spec params)) + (^Schedule [schedule params] + (u/build (Schedule/newBuilder schedule) schedule-spec params))) diff --git a/src/temporal/internal/search_attributes.clj b/src/temporal/internal/search_attributes.clj new file mode 100644 index 0000000..a73a102 --- /dev/null +++ b/src/temporal/internal/search_attributes.clj @@ -0,0 +1,12 @@ +(ns temporal.internal.search-attributes + (:import [io.temporal.api.enums.v1 IndexedValueType])) + +(def indexvalue-type-> + {:unspecified IndexedValueType/INDEXED_VALUE_TYPE_UNSPECIFIED + :text IndexedValueType/INDEXED_VALUE_TYPE_TEXT + :keyword IndexedValueType/INDEXED_VALUE_TYPE_KEYWORD + :int IndexedValueType/INDEXED_VALUE_TYPE_INT + :double IndexedValueType/INDEXED_VALUE_TYPE_DOUBLE + :bool IndexedValueType/INDEXED_VALUE_TYPE_BOOL + :datetime IndexedValueType/INDEXED_VALUE_TYPE_DATETIME + :keyword-list IndexedValueType/INDEXED_VALUE_TYPE_KEYWORD_LIST}) diff --git a/src/temporal/internal/signals.clj b/src/temporal/internal/signals.clj index 198831e..5f6744e 100644 --- a/src/temporal/internal/signals.clj +++ b/src/temporal/internal/signals.clj @@ -1,4 +1,4 @@ -;; Copyright © 2022 Manetu, Inc. All rights reserved +;; Copyright © Manetu, Inc. All rights reserved (ns ^:no-doc temporal.internal.signals (:require [taoensso.timbre :as log] @@ -21,13 +21,16 @@ (.add ch payload) (assoc s signal-name ch))))) -(defn create - "Registers the calling workflow to receive signals and returns a context variable to be passed back later" +(defn register-signal-handler! + [f] + (Workflow/registerListener + (reify + DynamicSignalHandler + (handle [_ signal-name args] + (f signal-name (u/->args args)))))) + +(defn create-signal-chan [] (let [state (atom {})] - (Workflow/registerListener - (reify - DynamicSignalHandler - (handle [_ signal-name args] - (-handle state signal-name (u/->args args))))) + (register-signal-handler! (partial -handle state)) state)) diff --git a/src/temporal/internal/utils.clj b/src/temporal/internal/utils.clj index 6ad132b..002f4e6 100644 --- a/src/temporal/internal/utils.clj +++ b/src/temporal/internal/utils.clj @@ -1,4 +1,4 @@ -;; Copyright © 2022 Manetu, Inc. All rights reserved +;; Copyright © Manetu, Inc. All rights reserved (ns ^:no-doc temporal.internal.utils (:require [clojure.string :as string] @@ -75,11 +75,16 @@ (verify-registered-fns data) (m/index-by :name data))) +(defn find-dispatch + "Finds any dispatch descriptor named 't' that carry metadata 'marker'" + [dispatch-table t] + (or (get dispatch-table t) + (throw (ex-info "workflow/activity not found" {:function t})))) + (defn find-dispatch-fn "Finds any functions named 't' that carry metadata 'marker'" [dispatch-table t] - (:fn (or (get dispatch-table t) - (throw (ex-info "workflow/activity not found" {:function t}))))) + (:fn (find-dispatch dispatch-table t))) (defn import-dispatch [marker coll] @@ -113,6 +118,13 @@ (fn [x] (str (symbol x))))) +(defn complete-invoke + [stub result] + (log/trace stub "completed with" (count result) "bytes") + (let [r (nippy/thaw result)] + (log/trace stub "results:" r) + r)) + (defn ->Func [f] (reify @@ -136,4 +148,4 @@ (f x1 x2 x3 x4 x5)) Functions$Func6 (apply [_ x1 x2 x3 x4 x5 x6] - (f x1 x2 x3 x4 x5 x6)))) \ No newline at end of file + (f x1 x2 x3 x4 x5 x6)))) diff --git a/src/temporal/internal/workflow.clj b/src/temporal/internal/workflow.clj index ef5222f..d1a1d5c 100644 --- a/src/temporal/internal/workflow.clj +++ b/src/temporal/internal/workflow.clj @@ -1,39 +1,65 @@ -;; Copyright © 2022 Manetu, Inc. All rights reserved +;; Copyright © Manetu, Inc. All rights reserved (ns ^:no-doc temporal.internal.workflow (:require [clojure.core.protocols :as p] [clojure.datafy :as d] - [taoensso.timbre :as log] + [slingshot.slingshot :refer [try+]] [taoensso.nippy :as nippy] + [taoensso.timbre :as log] [temporal.common :as common] - [temporal.internal.utils :as u] - [temporal.internal.signals :as s]) - (:import [io.temporal.workflow Workflow WorkflowInfo] - [io.temporal.client WorkflowOptions WorkflowOptions$Builder])) + [temporal.internal.exceptions :as e] + [temporal.internal.signals :as s] + [temporal.internal.utils :as u]) + (:import [io.temporal.api.enums.v1 WorkflowIdReusePolicy] + [io.temporal.client WorkflowOptions WorkflowOptions$Builder] + [io.temporal.internal.sync DestroyWorkflowThreadError] + [io.temporal.workflow Workflow WorkflowInfo])) + +(extend-protocol p/Datafiable + WorkflowInfo + (datafy [d] + {:namespace (.getNamespace d) + :workflow-id (.getWorkflowId d) + :run-id (.getRunId d) + :workflow-type (.getWorkflowType d) + :attempt (.getAttempt d)})) -(def wf-option-spec +(def workflow-id-reuse-options + " +| Value | Description | +| ---------------------------- | --------------------------------------------------------------------------- | +| :allow-duplicate | Allow starting a workflow execution using the same workflow id. | +| :allow-duplicate-failed-only | Allow starting a workflow execution using the same workflow id, only when the last execution's final state is one of [terminated, cancelled, timed out, failed] | +| :reject-duplicate | Do not permit re-use of the workflow id for this workflow. | +| :terminate-if-running | If a workflow is running using the same workflow ID, terminate it and start a new one. If no running workflow, then the behavior is the same as ALLOW_DUPLICATE| +" + {:allow-duplicate WorkflowIdReusePolicy/WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE + :allow-duplicate-failed-only WorkflowIdReusePolicy/WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY + :reject-duplicate WorkflowIdReusePolicy/WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE + :terminate-if-running WorkflowIdReusePolicy/WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING}) + +(defn ^:no-doc workflow-id-reuse-policy-> + ^WorkflowIdReusePolicy [policy] + (or (get workflow-id-reuse-options policy) + (throw (IllegalArgumentException. (str "Unknown workflow-id-reuse-policy: " policy " Must be one of " (keys workflow-id-reuse-options)))))) + +(def ^:no-doc wf-option-spec {:task-queue #(.setTaskQueue ^WorkflowOptions$Builder %1 (u/namify %2)) - :workflow-id #(.setWorkflowId ^WorkflowOptions$Builder %1 %2) + :workflow-id #(.setWorkflowId ^WorkflowOptions$Builder %1 (u/namify %2)) + :workflow-id-reuse-policy #(.setWorkflowIdReusePolicy ^WorkflowOptions$Builder %1 (workflow-id-reuse-policy-> %2)) :workflow-execution-timeout #(.setWorkflowExecutionTimeout ^WorkflowOptions$Builder %1 %2) :workflow-run-timeout #(.setWorkflowRunTimeout ^WorkflowOptions$Builder %1 %2) :workflow-task-timeout #(.setWorkflowTaskTimeout ^WorkflowOptions$Builder %1 %2) :retry-options #(.setRetryOptions %1 (common/retry-options-> %2)) :cron-schedule #(.setCronSchedule ^WorkflowOptions$Builder %1 %2) :memo #(.setMemo ^WorkflowOptions$Builder %1 %2) - :search-attributes #(.setSearchAttributes ^WorkflowOptions$Builder %1 %2)}) + :search-attributes #(.setSearchAttributes ^WorkflowOptions$Builder %1 %2) + :start-delay #(.setStartDelay ^WorkflowOptions$Builder %1 %2)}) -(defn wf-options-> +(defn ^:no-doc wf-options-> ^WorkflowOptions [params] (u/build (WorkflowOptions/newBuilder (WorkflowOptions/getDefaultInstance)) wf-option-spec params)) -(extend-protocol p/Datafiable - WorkflowInfo - (datafy [d] - {:namespace (.getNamespace d) - :workflow-id (.getWorkflowId d) - :run-id (.getRunId d) - :workflow-type (.getWorkflowType d)})) - (defn get-info [] (d/datafy (Workflow/getInfo))) @@ -51,14 +77,20 @@ (defn execute [ctx dispatch args] - (try - (let [{:keys [workflow-type workflow-id]} (get-info) - f (u/find-dispatch-fn dispatch workflow-type) - a (u/->args args) - _ (log/trace workflow-id "calling" f "with args:" a) - r (f ctx {:args a :signals (s/create)})] - (log/trace workflow-id "result:" r) - (nippy/freeze r)) - (catch Exception e - (log/error e) - (throw e)))) + (let [{:keys [workflow-type workflow-id]} (get-info)] + (try+ + (let [d (u/find-dispatch dispatch workflow-type) + f (:fn d) + a (u/->args args) + _ (log/trace workflow-id "calling" f "with args:" a) + r (if (-> d :type (= :legacy)) + (f ctx {:args a :signals (s/create-signal-chan)}) + (f a))] + (log/trace workflow-id "result:" r) + (nippy/freeze r)) + (catch DestroyWorkflowThreadError ex + (log/debug workflow-id "thread evicted") + (throw ex)) + (catch Object o + (log/error &throw-context) + (e/forward &throw-context))))) diff --git a/src/temporal/promise.clj b/src/temporal/promise.clj index 2249707..42d6a60 100644 --- a/src/temporal/promise.clj +++ b/src/temporal/promise.clj @@ -1,4 +1,4 @@ -;; Copyright © 2022 Manetu, Inc. All rights reserved +;; Copyright © Manetu, Inc. All rights reserved (ns temporal.promise "Methods for managing promises from pending activities from within workflows" @@ -30,6 +30,34 @@ promises returned from [[temporal.activity/invoke]] from within workflow context (p/then (fn [_] (mapv deref coll))))) +(defn all-settled + "Returns a Promise that becomes completed/failed when all the arguments are done/settled, even in the face of errors. + +*N.B. You must handle the exceptions in the returned promise with promesa* + +Similar to [promesa/all](https://funcool.github.io/promesa/latest/promesa.core.html#var-all) but designed to work with +promises returned from [[temporal.activity/invoke]] from within workflow context. + +For more Java SDK samples example look here: + https://github.com/temporalio/samples-java/tree/main/core/src/main/java/io/temporal/samples/batch + +```clojure +(-> (all-settled [(a/invoke activity-a ..) (a/invoke activity-b ..)]) + (promesa.core/then (fn [[a-result b-result]] ...))) +``` +" + [coll] + (letfn [(wait! [^Promise p] (try (.get p) (catch Exception _)))] + ;; So we do not have to duplicate this, make the only copy here + (let [promises (->array coll)] + (run! wait! promises) + (-> + (Promise/allOf promises) + (pt/->PromiseAdapter) + ;; The promises are all completed at this point, + ;; this is just to use the promesa library + (p/then (fn [_] (mapv deref coll))))))) + (defn race "Returns Promise that becomes completed when any of the arguments are completed. @@ -49,9 +77,9 @@ promises returned from [[temporal.activity/invoke]] from within workflow context (defn resolved "Returns a new, fully resolved promise" [value] - (Workflow/newPromise value)) + (pt/->PromiseAdapter (Workflow/newPromise value))) (defn rejected "Returns a new, rejected promise" [^Exception e] - (Workflow/newFailedPromise e)) \ No newline at end of file + (pt/->PromiseAdapter (Workflow/newFailedPromise e))) diff --git a/src/temporal/side_effect.clj b/src/temporal/side_effect.clj index 94f592d..a4c984f 100644 --- a/src/temporal/side_effect.clj +++ b/src/temporal/side_effect.clj @@ -1,4 +1,4 @@ -;; Copyright © 2022 Manetu, Inc. All rights reserved +;; Copyright © Manetu, Inc. All rights reserved (ns temporal.side-effect "Methods for managing side-effects from within workflows" diff --git a/src/temporal/signals.clj b/src/temporal/signals.clj index a444d56..ee7cab1 100644 --- a/src/temporal/signals.clj +++ b/src/temporal/signals.clj @@ -1,4 +1,4 @@ -;; Copyright © 2022 Manetu, Inc. All rights reserved +;; Copyright © Manetu, Inc. All rights reserved (ns temporal.signals "Methods for managing signals from within workflows" @@ -9,46 +9,77 @@ (:import [io.temporal.workflow Workflow])) (defn is-empty? - "Returns 'true' if 'signal-name' either doesn't exist or exists but has no pending messages" - [state signal-name] + "Returns 'true' if 'signal-name' either doesn't exist within signal-chan or exists but has no pending messages" + [signal-chan signal-name] (let [signal-name (u/namify signal-name) - ch (s/get-ch @state signal-name) + ch (s/get-ch @signal-chan signal-name) r (or (nil? ch) (.isEmpty ch))] - (log/trace "is-empty?:" @state signal-name r) + (log/trace "is-empty?:" @signal-chan signal-name r) r)) (defn- rx - [state signal-name] + [signal-chan signal-name] (let [signal-name (u/namify signal-name) - ch (s/get-ch @state signal-name) + ch (s/get-ch @signal-chan signal-name) m (.poll ch)] (log/trace "rx:" signal-name m) m)) (defn poll - "Non-blocking check of the signal. Consumes and returns a message if found, otherwise returns 'nil'" - [state signal-name] - (when-not (is-empty? state signal-name) - (rx state signal-name))) + "Non-blocking check of the signal via signal-chan. Consumes and returns a message if found, otherwise returns 'nil'" + [signal-chan signal-name] + (when-not (is-empty? signal-chan signal-name) + (rx signal-chan signal-name))) (defn ! "Sends `payload` to `workflow-id` via signal `signal-name`." [^String workflow-id signal-name payload] (let [signal-name (u/namify signal-name) stub (Workflow/newUntypedExternalWorkflowStub workflow-id)] - (.signal stub signal-name (u/->objarray payload)))) \ No newline at end of file + (.signal stub signal-name (u/->objarray payload)))) + +(def register-signal-handler! + " +Registers a DynamicSignalHandler listener that handles signals sent to the workflow such as with [[>!]]. + +Use inside a workflow definition with 'f' closing over any desired workflow state (e.g. atom) to mutate +the workflow state. + +Arguments: +- `f`: a 2-arity function, expecting 2 arguments. + +`f` arguments: +- `signal-name`: string +- `args`: params value or data structure + +```clojure +(defworkflow signalled-workflow + [{:keys [init] :as args}] + (let [state (atom init)] + (register-signal-handler! (fn [signal-name args] + (when (= signal-name \"mysignal\") + (update state #(conj % args))))) + ;; workflow implementation + )) +```" + s/register-signal-handler!) + +(def create-signal-chan + "Registers the calling workflow to receive signals and returns a 'signal-channel' context for use with functions such as [[ value))) + attributes)) + +(def ^:no-doc test-env-options + {:worker-factory-options #(.setWorkerFactoryOptions ^TestEnvironmentOptions$Builder %1 (worker/worker-factory-options-> %2)) + :workflow-client-options #(.setWorkflowClientOptions ^TestEnvironmentOptions$Builder %1 (copts/workflow-client-options-> %2)) + :workflow-service-stub-options #(.setWorkflowServiceStubsOptions ^TestEnvironmentOptions$Builder %1 (copts/stub-options-> %2)) + :metrics-scope #(.setMetricsScope ^TestEnvironmentOptions$Builder %1 %2) + :search-attributes set-search-attributes}) + +(defn ^:no-doc test-env-options-> + ^TestEnvironmentOptions [params] + (u/build (TestEnvironmentOptions/newBuilder) test-env-options params)) (defn create " Creates a mock Temporal backend, suitable for unit testing. A worker may be created with [[start]] and a client may be connected with [[get-client]] + +Arguments: + +- `options`: Client configuration option map (See below) + +#### options map + +| Value | Description | Type | Default | +| ------------------------- | --------------------------------------------- | ------------ | ------- | +| :worker-factory-options | | [[worker/worker-factory-options]] | | +| :workflow-client-options | | [[copts/client-options]] | | +| :workflow-service-stub-options | | [[copts/stub-options]] | | +| :metrics-scope | The scope to be used for metrics reporting | [Scope](https://github.com/uber-java/tally/blob/master/core/src/main/java/com/uber/m3/tally/Scope.java) | | +| :search-attributes | Add a map of search attributes to be registered on the Temporal Server | map | | + " - [] - (TestWorkflowEnvironment/newInstance)) + ([] + (TestWorkflowEnvironment/newInstance)) + ([options] + (TestWorkflowEnvironment/newInstance (test-env-options-> options)))) (defn start " @@ -31,10 +67,10 @@ Arguments: ``` " [env {:keys [task-queue] :as options}] - (let [worker (.newWorker env (u/namify task-queue))] + (let [worker (.newWorker env (u/namify task-queue) (worker/worker-options-> options))] (worker/init worker options) (.start env) - :ok)) + worker)) (defn stop " diff --git a/src/temporal/tls.clj b/src/temporal/tls.clj new file mode 100644 index 0000000..4176a35 --- /dev/null +++ b/src/temporal/tls.clj @@ -0,0 +1,49 @@ +;; Copyright © Manetu, Inc. All rights reserved + +(ns temporal.tls + "Utilities for connecting to TLS enabled Temporal clusters" + (:require [clojure.java.io :as io]) + (:import [java.security KeyStore] + [java.security.cert CertificateFactory X509Certificate] + [javax.net.ssl TrustManagerFactory] + [io.grpc.netty.shaded.io.grpc.netty GrpcSslContexts] + [io.grpc.netty.shaded.io.netty.handler.ssl SslContext])) + +(defn- new-ca + ^X509Certificate [certpath] + (let [cf (CertificateFactory/getInstance "X.509")] + (with-open [is (io/input-stream certpath)] + (.generateCertificate cf is)))) + +(defn- new-keystore + ^KeyStore [certpath] + (let [ca (new-ca certpath) + ks-type (KeyStore/getDefaultType)] + (doto (KeyStore/getInstance ks-type) + (.load nil nil) + (.setCertificateEntry "ca" ca)))) + +(defn- new-trustmanagerfactory + ^TrustManagerFactory [certpath] + (let [alg (TrustManagerFactory/getDefaultAlgorithm) + ks (new-keystore certpath)] + (doto (TrustManagerFactory/getInstance alg) + (.init ks)))) + +(defn new-ssl-context + " +Creates a new gRPC [SslContext](https://netty.io/4.0/api/io/netty/handler/ssl/SslContext.html) suitable for passing to the :ssl-context option of [[temporal.client.core/create-client]] + +Arguments: + +- `ca-path`: The path to a PEM encoded x509 Certificate Authority root certificate for validating the Temporal server. +- `cert-path`: The path to a PEM encoded x509 Certificate representing this client's identity, used for mutual TLS authentication. +- 'key-path': The path to a PEM encoded private key representing this client's identity, used for mutual TLS authentication. + +" + ^SslContext [{:keys [ca-path cert-path key-path] :as args}] + (-> (GrpcSslContexts/forClient) + (cond-> + (some? ca-path) (.trustManager (new-trustmanagerfactory ca-path)) + (and (some? cert-path) (some? key-path)) (.keyManager (io/file cert-path) (io/file key-path))) + (.build))) diff --git a/src/temporal/workflow.clj b/src/temporal/workflow.clj index 3f59211..932511f 100644 --- a/src/temporal/workflow.clj +++ b/src/temporal/workflow.clj @@ -1,12 +1,15 @@ -;; Copyright © 2022 Manetu, Inc. All rights reserved +;; Copyright © Manetu, Inc. All rights reserved (ns temporal.workflow "Methods for defining and implementing Temporal workflows" (:require [taoensso.nippy :as nippy] [taoensso.timbre :as log] + [promesa.core :as p] + [temporal.internal.exceptions :as e] [temporal.internal.utils :as u] - [temporal.internal.workflow :as w]) + [temporal.internal.workflow :as w] + [temporal.internal.child-workflow :as cw]) (:import [io.temporal.workflow DynamicQueryHandler Workflow] [java.util.function Supplier] [java.time Duration])) @@ -34,6 +37,13 @@ [^Duration duration] (Workflow/sleep duration)) +(def default-version (int Workflow/DEFAULT_VERSION)) + +(defn get-version + "Used to safely perform backwards incompatible changes to workflow definitions" + [change-id min max] + (Workflow/getVersion (u/namify change-id) min max)) + (defn register-query-handler! " Registers a DynamicQueryHandler listener that handles queries sent to the workflow, using [[temporal.client.core/query]]. @@ -50,10 +60,10 @@ Arguments: ```clojure (defworkflow stateful-workflow - [ctx {:keys [signals] {:keys [init] :as args} :args :as params}] + [{:keys [init] :as args}] (let [state (atom init)] (register-query-handler! (fn [query-type args] - (when (= query-type :answer) + (when (= query-type :my-query) (get-in @state [:path :to :answer])))) ;; workflow implementation )) @@ -71,19 +81,16 @@ Arguments: (defmacro defworkflow " -Defines a new workflow, similar to defn, expecting a 2-arity parameter list and body. Should evaluate to something +Defines a new workflow, similar to defn, expecting a 1-arity parameter list and body. Should evaluate to something serializable, which will become available for [[temporal.client.core/get-result]]. Arguments: -- `ctx`: Context passed through from [[temporal.client.worker/start]] -- `params`: A map containing the following - - `args`: Passed from 'params' to [[temporal.client.core/start]] or [[temporal.client.core/signal-with-start]] - - `signals`: Signal context for use with signal calls such as [[temporal.signals/ options))] + (log/trace "invoke:" workflow "with" params options) + (-> (.executeAsync stub u/bytes-type (u/->objarray params)) + (p/then (partial u/complete-invoke workflow)) + (p/catch e/slingshot? e/recast-stone) + (p/catch (fn [e] + (log/error e) + (throw e))))))) diff --git a/test/temporal/test/activity_info.clj b/test/temporal/test/activity_info.clj new file mode 100644 index 0000000..aa34c3e --- /dev/null +++ b/test/temporal/test/activity_info.clj @@ -0,0 +1,26 @@ +;; Copyright © Manetu, Inc. All rights reserved + +(ns temporal.test.activity-info + (:require [clojure.test :refer :all] + [temporal.client.core :as c] + [temporal.workflow :refer [defworkflow]] + [temporal.activity :refer [defactivity] :as a] + [temporal.test.utils :as t])) + +(use-fixtures :once t/wrap-service) + +(defactivity getinfo-activity + [ctx args] + (a/get-info)) + +(defworkflow getinfo-workflow + [args] + @(a/invoke getinfo-activity args)) + +(deftest the-test + (testing "Verifies that we can retrieve our activity-id" + (let [workflow (t/create-workflow getinfo-workflow)] + (c/start workflow {}) + (let [{:keys [activity-id]} @(c/get-result workflow)] + (is (some? activity-id)))))) + diff --git a/test/temporal/test/async.clj b/test/temporal/test/async.clj index 9613655..48701aa 100644 --- a/test/temporal/test/async.clj +++ b/test/temporal/test/async.clj @@ -1,13 +1,13 @@ -;; Copyright © 2022 Manetu, Inc. All rights reserved +;; Copyright © Manetu, Inc. All rights reserved (ns temporal.test.async - (:require [clojure.test :refer :all] + (:require [clojure.test :refer [deftest testing is use-fixtures]] [clojure.core.async :refer [go]] [taoensso.timbre :as log] - [temporal.client.core :as c] - [temporal.workflow :refer [defworkflow]] [temporal.activity :refer [defactivity] :as a] - [temporal.test.utils :as t])) + [temporal.client.core :as c] + [temporal.test.utils :as t] + [temporal.workflow :refer [defworkflow] :as w])) (use-fixtures :once t/wrap-service) @@ -20,11 +20,11 @@ (str "Hi, " name)))) (defworkflow async-greeter-workflow - [ctx {:keys [args]}] + [args] (log/info "greeter-workflow:" args) @(a/invoke async-greet-activity args {:retry-options {:maximum-attempts 1}})) -(deftest the-test +(deftest basic-async-test (testing "Verifies that we can round-trip with an async task" (let [workflow (t/create-workflow async-greeter-workflow)] (c/start workflow {:name "Bob"}) @@ -34,3 +34,32 @@ (c/start workflow {:name "Charlie"}) (is (thrown? java.util.concurrent.ExecutionException @(c/get-result workflow)))))) + +(defactivity async-child-activity + [ctx {:keys [name] :as args}] + (go + (log/info "async-child-activity:" args) + (if (= name "Charlie") + (ex-info "permission-denied" {}) + (str "Hi, " name)))) + +(defworkflow async-child-workflow + [{:keys [name] :as args}] + (log/info "async-child-workflow:" args) + @(a/invoke async-child-activity args {:retry-options {:maximum-attempts 1}})) + +(defworkflow async-parent-workflow + [args] + (log/info "async-parent-workflow:" args) + @(w/invoke async-child-workflow args {:retry-options {:maximum-attempts 1} :task-queue t/task-queue})) + +(deftest child-workflow-test + (testing "Verifies that we can round-trip with an async task" + (let [workflow (t/create-workflow async-parent-workflow)] + (c/start workflow {:name "Bob"}) + (is (= @(c/get-result workflow) "Hi, Bob")))) + (testing "Verifies that we can process errors in async mode" + (let [workflow (t/create-workflow async-parent-workflow)] + (c/start workflow {:name "Charlie"}) + (is (thrown? java.util.concurrent.ExecutionException + @(c/get-result workflow)))))) diff --git a/test/temporal/test/child_workflow.clj b/test/temporal/test/child_workflow.clj new file mode 100644 index 0000000..23310ed --- /dev/null +++ b/test/temporal/test/child_workflow.clj @@ -0,0 +1,44 @@ +(ns temporal.test.child-workflow + (:require [clojure.test :refer [deftest testing is use-fixtures]] + [taoensso.timbre :as log] + [temporal.client.core :as c] + [temporal.workflow :refer [defworkflow] :as w] + [temporal.activity :refer [defactivity] :as a] + [temporal.test.utils :as t])) + +(use-fixtures :once t/wrap-service) + +(defactivity child-greeter-activity + [ctx {:keys [name] :as args}] + (log/info "greet-activity:" args) + (str "Hi, " name)) + +(defworkflow child-workflow + [{:keys [names] :as args}] + (log/info "child-workflow:" args) + (for [name names] + @(a/invoke child-greeter-activity {:name name}))) + +(defworkflow parent-workflow + [args] + (log/info "parent-workflow:" args) + @(w/invoke child-workflow args {:retry-options {:maximum-attempts 1} :task-queue t/task-queue})) + +(deftest basic-child-workflow-test + (testing "Using a child workflow (with multiple activities) in a parent workflow works as expected" + (let [workflow (t/create-workflow parent-workflow)] + (c/start workflow {:names ["Bob" "George" "Fred"]}) + (is (= (set @(c/get-result workflow)) #{"Hi, Bob" "Hi, George" "Hi, Fred"}))))) + +(defworkflow parent-workflow-with-activities + [args] + (log/info "parent-workflow:" args) + (concat + @(w/invoke child-workflow args) + [@(a/invoke child-greeter-activity {:name "Xavier"})])) + +(deftest parent-workflow-with-mulitple-test + (testing "Using a child workflow (with multiple activities) in a parent workflow (with activities) works as expected" + (let [workflow (t/create-workflow parent-workflow-with-activities)] + (c/start workflow {:names ["Bob" "George" "Fred"]}) + (is (= (set @(c/get-result workflow)) #{"Hi, Bob" "Hi, George" "Hi, Fred" "Hi, Xavier"}))))) diff --git a/test/temporal/test/client_signal.clj b/test/temporal/test/client_signal.clj index 1b0a0b2..0f08f71 100644 --- a/test/temporal/test/client_signal.clj +++ b/test/temporal/test/client_signal.clj @@ -1,10 +1,10 @@ -;; Copyright © 2022 Manetu, Inc. All rights reserved +;; Copyright © Manetu, Inc. All rights reserved (ns temporal.test.client-signal (:require [clojure.test :refer :all] [taoensso.timbre :as log] [temporal.client.core :refer [>!] :as c] - [temporal.signals :refer [ (pt/all (map invoke (range 10))) (p/then (fn [r] (log/info "r:" r) r)))) -(deftest the-test +(deftest concurrency-with-all-test (testing "Verifies that we can launch activities in parallel" (let [workflow (t/create-workflow concurrency-workflow)] (c/start workflow {}) - (is (-> workflow c/get-result deref count (= 10)))))) \ No newline at end of file + (is (-> workflow c/get-result deref count (= 10)))))) + +(defactivity all-settled-activity + [ctx args] + (log/tracef "calling all-settled-activity %d" args) + args) + +(defn invoke-settled [x] + (a/invoke all-settled-activity x)) + +(defworkflow all-settled-workflow + [args] + (log/trace "calling all-settled-workflow") + @(-> (pt/all-settled (map invoke-settled (range 10))) + (p/then (fn [r] r)) + (p/catch (fn [e] (:args (ex-data e)))))) + +(defactivity error-prone-activity + [ctx args] + (log/tracef "calling error-prone-activity %d" args) + (when (= args 5) + (throw (ex-info "error on 5" {:args args}))) + args) + +(defn invoke-error [x] + (a/invoke error-prone-activity x)) + +(defworkflow error-prone-workflow + [args] + (log/trace "calling error-prone-workflow") + @(-> (pt/all-settled (map invoke-error (range 10))) + (p/then (fn [r] r)) + (p/catch (fn [e] (:args (ex-data e)))))) + +(deftest concurrency-with-all-settled-test + (testing "Testing that all-settled waits for all the activities to complete + just like `p/all` does in spite of errors" + (let [workflow (t/create-workflow all-settled-workflow)] + (c/start workflow {}) + (is (-> workflow c/get-result deref count (= 10))))) + (testing "Testing that all-settled waits for all the activities to complete + just like `p/all` and can still propogate errors" + (let [workflow (t/create-workflow error-prone-workflow)] + (c/start workflow {}) + (is (-> workflow c/get-result deref (= 5)))))) + +(defactivity doubling-activity + [ctx args] + (log/info "doubling-activity:" args) + (* args 2)) + +(defn invoke-doubling-activity [x] + (a/invoke doubling-activity x)) + +(defworkflow concurrent-child-workflow + [args] + (log/info "concurrent-child-workflow:" args) + @(-> (pt/all [(invoke-doubling-activity args)]) + (p/then (fn [r] (log/info "r:" r) r)))) + +(defn invoke-child-workflow [x] + (w/invoke concurrent-child-workflow x {:retry-options {:maximum-attempts 1} :task-queue t/task-queue})) + +(defworkflow concurrent-parent-workflow + [args] + (log/info "concurrent-parent-workflow:" args) + @(-> (pt/all (map invoke-child-workflow (range 10))) + (p/then (fn [r] (log/info "r:" r) r)))) + +(deftest child-workflow-concurrency-test + (testing "Using a child workflow instead of an ctivity works with the promise api" + (let [workflow (t/create-workflow concurrent-parent-workflow)] + (c/start workflow {}) + (is (-> workflow c/get-result deref count (= 10))) + (is (-> workflow c/get-result) (mapv #(* 2 %) (range 10)))))) diff --git a/test/temporal/test/conflict.clj b/test/temporal/test/conflict.clj index 855a451..4086b6c 100644 --- a/test/temporal/test/conflict.clj +++ b/test/temporal/test/conflict.clj @@ -1,4 +1,4 @@ -;; Copyright © 2022 Manetu, Inc. All rights reserved +;; Copyright © Manetu, Inc. All rights reserved (ns temporal.test.conflict (:require [clojure.test :refer :all] diff --git a/test/temporal/test/exception.clj b/test/temporal/test/exception.clj new file mode 100644 index 0000000..90e896e --- /dev/null +++ b/test/temporal/test/exception.clj @@ -0,0 +1,36 @@ +;; Copyright © Manetu, Inc. All rights reserved + +(ns temporal.test.exception + (:require [clojure.test :refer :all] + [taoensso.timbre :as log] + [temporal.client.core :as c] + [temporal.workflow :refer [defworkflow]] + [temporal.activity :refer [defactivity] :as a] + [temporal.test.utils :as t])) + +(use-fixtures :once t/wrap-service) + +(defactivity exception-activity + [ctx args] + (log/info "exception-activity:" args) + (throw (ex-info "test 1" {}))) + +(defworkflow indirect-exception-workflow + [args] + (log/info "indirect-exception-workflow:" args) + @(a/invoke exception-activity args {:retry-options {:maximum-attempts 1}})) + +(defworkflow direct-exception-workflow + [args] + (log/info "direct-exception-workflow:" args) + (throw (ex-info "test 2" {}))) + +(deftest the-test + (testing "Verifies that we can throw exceptions indirectly from an activity" + (let [workflow (t/create-workflow indirect-exception-workflow)] + (c/start workflow {}) + (is (thrown? Exception @(c/get-result workflow))))) + (testing "Verifies that we can throw exceptions directly from a workflow" + (let [workflow (t/create-workflow direct-exception-workflow)] + (c/start workflow {}) + (is (thrown? Exception @(c/get-result workflow)))))) diff --git a/test/temporal/test/heartbeat.clj b/test/temporal/test/heartbeat.clj index 0feff88..e3a0304 100644 --- a/test/temporal/test/heartbeat.clj +++ b/test/temporal/test/heartbeat.clj @@ -18,7 +18,7 @@ (throw (ex-info "heartbeat details not found" {}))))) (defworkflow heartbeat-workflow - [_ _] + [_] @(a/invoke heartbeat-activity {})) (deftest the-test diff --git a/test/temporal/test/legacy_workflow.clj b/test/temporal/test/legacy_workflow.clj new file mode 100644 index 0000000..822039f --- /dev/null +++ b/test/temporal/test/legacy_workflow.clj @@ -0,0 +1,25 @@ +;; Copyright © Manetu, Inc. All rights reserved + +(ns temporal.test.legacy-workflow + (:require [clojure.test :refer :all] + [taoensso.timbre :as log] + [temporal.client.core :refer [>!] :as c] + [temporal.signals :refer [! workflow signal-name :foo) + (is (-> workflow c/get-result deref (= :foo)))))) diff --git a/test/temporal/test/local_activity.clj b/test/temporal/test/local_activity.clj index 6df5c80..a78807d 100644 --- a/test/temporal/test/local_activity.clj +++ b/test/temporal/test/local_activity.clj @@ -1,4 +1,4 @@ -;; Copyright © 2022 Manetu, Inc. All rights reserved +;; Copyright © Manetu, Inc. All rights reserved (ns temporal.test.local-activity (:require [clojure.test :refer :all] @@ -16,7 +16,7 @@ (str "Hi, " name)) (defworkflow local-greeter-workflow - [ctx {:keys [args]}] + [args] (log/info "greeter-workflow:" args) @(a/local-invoke local-greet-activity args {:do-not-include-args true})) diff --git a/test/temporal/test/local_retry.clj b/test/temporal/test/local_retry.clj new file mode 100644 index 0000000..966a8a2 --- /dev/null +++ b/test/temporal/test/local_retry.clj @@ -0,0 +1,47 @@ +;; Copyright © Manetu, Inc. All rights reserved + +(ns temporal.test.local-retry + (:require [clojure.test :refer :all] + [promesa.core :as p] + [taoensso.timbre :as log] + [temporal.client.core :as c] + [temporal.workflow :refer [defworkflow]] + [temporal.activity :refer [defactivity] :as a] + [temporal.test.utils :as t]) + (:import (io.temporal.client WorkflowFailedException) + [io.temporal.failure TimeoutFailure ActivityFailure] + [java.time Duration])) + +(use-fixtures :once t/wrap-service) + +(defactivity local-retry-activity + [ctx args] + (log/info "local-retry-activity") + (Thread/sleep 100000000)) + +(defworkflow local-retry-workflow + [args] + (log/info "local-retry-workflow:" args) + @(-> (a/local-invoke local-retry-activity {} (merge args {:do-not-include-args true + :start-to-close-timeout (Duration/ofMillis 500)})) + (p/catch ActivityFailure + :fail))) + +(defn exec [args] + (let [workflow (t/create-workflow local-retry-workflow)] + (c/start workflow args) + @(-> (c/get-result workflow) + (p/then (constantly :fail)) + (p/catch WorkflowFailedException + (fn [ex] + (if (instance? TimeoutFailure (ex-cause ex)) + :pass + :fail)))))) + +(deftest the-test + (testing "RetryPolicy defaults" + (is (= :pass (exec {})))) + (testing "Explicit unlimited" + (is (= :pass (exec {:retry-options {:maximum-attempts 0}})))) + (testing "Verify that setting maximum-attempts to a finite value is respected" + (is (= :fail (exec {:retry-options {:maximum-attempts 1}}))))) diff --git a/test/temporal/test/manual_dispatch.clj b/test/temporal/test/manual_dispatch.clj index 0d1847f..bdf77b2 100644 --- a/test/temporal/test/manual_dispatch.clj +++ b/test/temporal/test/manual_dispatch.clj @@ -1,4 +1,4 @@ -;; Copyright © 2022 Manetu, Inc. All rights reserved +;; Copyright © Manetu, Inc. All rights reserved (ns temporal.test.manual-dispatch (:require [clojure.test :refer :all] @@ -36,12 +36,12 @@ ;;----------------------------------------------------------------------------- (defworkflow explicitly-registered-workflow - [ctx {:keys [args]}] + [args] (log/info "registered-workflow:" args) :ok) (defworkflow explicitly-skipped-workflow - [ctx {:keys [args]}] + [args] (log/info "skipped-workflow:" args) :ok) diff --git a/test/temporal/test/poll.clj b/test/temporal/test/poll.clj index 40db159..2607e98 100644 --- a/test/temporal/test/poll.clj +++ b/test/temporal/test/poll.clj @@ -1,4 +1,4 @@ -;; Copyright © 2022 Manetu, Inc. All rights reserved +;; Copyright © Manetu, Inc. All rights reserved (ns temporal.test.poll (:require [clojure.test :refer :all] @@ -17,9 +17,9 @@ (cons m (lazy-signals signals))))) (defworkflow poll-workflow - [ctx {:keys [signals]}] + [_] (log/info "test-workflow:") - (doall (lazy-signals signals))) + (doall (lazy-signals (s/create-signal-chan)))) (deftest the-test (testing "Verifies that poll exits with nil when there are no signals" diff --git a/test/temporal/test/query.clj b/test/temporal/test/query.clj index ebb0948..50c784a 100644 --- a/test/temporal/test/query.clj +++ b/test/temporal/test/query.clj @@ -1,10 +1,9 @@ -;; Copyright © 2022 Manetu, Inc. All rights reserved +;; Copyright © Manetu, Inc. All rights reserved (ns temporal.test.query (:require [clojure.test :refer :all] - [taoensso.timbre :as log] [temporal.client.core :refer [>!] :as c] - [temporal.signals :refer [!] :as c] + [temporal.signals :refer [ @state 1))) + @state)) + +(deftest the-test + (testing "Verifies that we can handle raw signals" + (let [workflow (t/create-workflow raw-signal-workflow)] + (c/start workflow {}) + + (>! workflow signal-name {}) + (>! workflow signal-name {}) + (is (= 2 @(c/get-result workflow)))))) diff --git a/test/temporal/test/resolved_promises.clj b/test/temporal/test/resolved_promises.clj new file mode 100644 index 0000000..c6186d6 --- /dev/null +++ b/test/temporal/test/resolved_promises.clj @@ -0,0 +1,24 @@ +;; Copyright © Manetu, Inc. All rights reserved + +(ns temporal.test.resolved-promises + (:require [clojure.test :refer :all] + [taoensso.timbre :as log] + [temporal.client.core :as c] + [temporal.workflow :refer [defworkflow]] + [temporal.promise :as pt] + [temporal.test.utils :as t])) + +(use-fixtures :once t/wrap-service) + +(defworkflow all-workflow + [args] + (log/info "workflow:" args) + @(pt/all [(pt/resolved true)]) + @(pt/race [(pt/resolved true)]) + :ok) + +(deftest the-test + (testing "Verifies that pt/resolved and pt/rejected are compatible with all/race" + (let [workflow (t/create-workflow all-workflow)] + (c/start workflow {}) + (is (-> workflow c/get-result deref (= :ok)))))) diff --git a/test/temporal/test/resources/ca.crt b/test/temporal/test/resources/ca.crt new file mode 100644 index 0000000..7de9830 --- /dev/null +++ b/test/temporal/test/resources/ca.crt @@ -0,0 +1,19 @@ +-----BEGIN CERTIFICATE----- +MIIDDjCCAfagAwIBAgIQLLzSA48U8GQtqMFMgazHjzANBgkqhkiG9w0BAQsFADAh +MR8wHQYDVQQDExZZdWdhYnl0ZSBTZWxmc2lnbmVkIENBMB4XDTIzMTIyOTIwMzM0 +N1oXDTI0MDMyODIwMzM0N1owITEfMB0GA1UEAxMWWXVnYWJ5dGUgU2VsZnNpZ25l +ZCBDQTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoCggEBANJGsOx6lyoQfP2f +ByK6LAN1/CTdPoNNVMok7ibP3XKiqE0Dyrv9I/66pZqQrP1HYnv+uVXFDBPMqhXa +Bw5MyW0kjWQzruh2nTDWljZLl129XsvbPR4zkR+UH+6AoHxLIsfS1//jwE+y7OC8 +2vpE5PAsBsR2hPXGYPqWeZWscjDW0A0QpYI2b0q8UUnY36cr2fBv0tG/ZwDnhEBe +9E2kTnL4NIcRxoOZXN85NusRBN5oueGOhyVeSBNz2ym1o8D8mMT1L3YHZXsTMDBA +OTYpzCx+hgVtrp3cQUWWGp8R8G3FtrWDvJGxf34X5k/4txlRlaGrQ98w3AM8JdMr +0irWANMCAwEAAaNCMEAwDgYDVR0PAQH/BAQDAgKkMA8GA1UdEwEB/wQFMAMBAf8w +HQYDVR0OBBYEFBWjvPFAs3z0xzL+LSwcskkw1rcxMA0GCSqGSIb3DQEBCwUAA4IB +AQBXvO40RJ0Gy4Mcmn1R7CWDBKfRdqvSVoZdQwI0ZNQ0HXWzln4OksprohWRQJNJ +yqvw6DG7LroyfGEn8sfO1fzLcROymGK1akSu4PT0QSHMfmS377OPJvS6licFSvMB +2aFiD2pmQRlNa9mHAxAvGMPnAgrWhNALEPfePpVo9eR+06cJj17WXU/LGA3Fey1N +fAZ7W/LPpFBPPs98gbgJvhUIOJ8IvxPbNTG/21kQK8CfJ2dBnkNZOcm/FI5z+eu1 +O5swP6MRHuybvkHhyrhbU67f3iACpCgGzg1YUSjuXm9W+c4LB6qsGiZ/yGc2jJ8v +sbxxhViVNVJ58wbJ63C3OimS +-----END CERTIFICATE----- \ No newline at end of file diff --git a/test/temporal/test/resources/tls.crt b/test/temporal/test/resources/tls.crt new file mode 100644 index 0000000..8429c59 --- /dev/null +++ b/test/temporal/test/resources/tls.crt @@ -0,0 +1,19 @@ +-----BEGIN CERTIFICATE----- +MIIDBTCCAe2gAwIBAgIRAJHL+sXVDe5KtY39RLUHnZAwDQYJKoZIhvcNAQELBQAw +ITEfMB0GA1UEAxMWWXVnYWJ5dGUgU2VsZnNpZ25lZCBDQTAeFw0yMzEyMjkyMDMz +NTFaFw0yNDAzMjgyMDMzNTFaMBMxETAPBgNVBAMTCHl1Z2FieXRlMIIBIjANBgkq +hkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA2Im+0RuHFVA1X7j1XNnr4B68ognJN1Dr ++/YtSiiN14QMRHENjlhUwi8pdvDwOvozyjQnufLchoOY7FpFlOgm99j5JjTD0F52 +vBhL1Xe0XB3NrI1e6FizqoOzv7WfBwTepptuF3p1U8RIiqHtxVHUUFsX3LrwKRqD +QgNGvQA4YJOdlOzAdpYmH/MDehFIrX0iZRfLc/Eb4uOc+CZxOHboMi8lAlNdnSFj +8coLYYI7PUSsc1l/XNwvm4p09cWmuDpwaEHMmVOfLh2WBChojDgzYbRcSBbX+VS4 +pW6EEnGthNx8/DyFXkiVKSBWMIhDHWhcsiG6PP3mlfuGqYOLSDZGbwIDAQABo0Yw +RDATBgNVHSUEDDAKBggrBgEFBQcDAjAMBgNVHRMBAf8EAjAAMB8GA1UdIwQYMBaA +FBWjvPFAs3z0xzL+LSwcskkw1rcxMA0GCSqGSIb3DQEBCwUAA4IBAQBCqRpJz0c/ +2bYVaUiH8bk+ubW0iDQH3kCpHS5uTLWNO/gsWE7H+Gp93AZueb3dWoiuZWAlahNL +qLLgKhwM0P527nnoEhBcCYlA3fEfArTWIq4CAE6SX4jUPYsVp4ZUMfQTICsivtPw +uL0lcupDnrpmmiDCYCiCcmT/kXuzUrSyO4Qu8pbznfvhHBa6fxdJdQ6bBFZ1zBox +jJe0UnS7emCFAUzv4yNPw1yFyMqMIwfwN2arVxcz1WQhbM1uLenK6gVjBglnnFDT +9ZsM/ZZXGUvqiIIRvGrREES9Ljuic2j2q/keUXJFg9917dy3ep884+5vv7cxoha1 +RNY6CYw4pDlU +-----END CERTIFICATE----- \ No newline at end of file diff --git a/test/temporal/test/resources/tls.key b/test/temporal/test/resources/tls.key new file mode 100644 index 0000000..14458b4 --- /dev/null +++ b/test/temporal/test/resources/tls.key @@ -0,0 +1,28 @@ +-----BEGIN PRIVATE KEY----- +MIIEvwIBADANBgkqhkiG9w0BAQEFAASCBKkwggSlAgEAAoIBAQDYib7RG4cVUDVf +uPVc2evgHryiCck3UOv79i1KKI3XhAxEcQ2OWFTCLyl28PA6+jPKNCe58tyGg5js +WkWU6Cb32PkmNMPQXna8GEvVd7RcHc2sjV7oWLOqg7O/tZ8HBN6mm24XenVTxEiK +oe3FUdRQWxfcuvApGoNCA0a9ADhgk52U7MB2liYf8wN6EUitfSJlF8tz8Rvi45z4 +JnE4dugyLyUCU12dIWPxygthgjs9RKxzWX9c3C+binT1xaa4OnBoQcyZU58uHZYE +KGiMODNhtFxIFtf5VLilboQSca2E3Hz8PIVeSJUpIFYwiEMdaFyyIbo8/eaV+4ap +g4tINkZvAgMBAAECggEBAMilQppSzqXyL7LmGP2TtJx0/seLF9dY9YIAh9DaqSxV +YGSe+Te4I7nXp61d7sxHgWvRTipgnvVJxY7kyusC/vDULXG4nOVcUttSDBrek9Jz +j1xfltznLHxJE2sF6TjAy2tIRQgeYc9f5vQGveMEQx6+eer/kYAU4CFwFcEWDid1 +jGHFuk68CTn/lRACUTIGGzbmevbdzhmtYhU5q8KZO4s1xfGQE+e+Ikw9KNVcBI2l +ox4gzhgfcNsE+RUjhUfACB45aBDGfWLY1uC7opIM5lT0I03TBHwhe7y/24qRtnZ3 +3J4kajfssf4rDvHm4MKroTVf3OM4ZM6q2rKDWFMGJaECgYEA4UYE8P6YNO704AtW +/O3kLMjno2q44YXO/HG34cyoPHRk5E60CDbeZ4JX2DlhsVWioGkZg6VVfedtwrlS +j8sFO/2wpnhF591kWwaa5AK6lhhb8bPfjSZG6kffDzKRP1PEN41hq2EqmofpfuLF +7YsSFE+6TeGxVq/SCiXPOpU9UlECgYEA9hK02iDECo1dkQBojPH9RtlCt9uWPW0h +qaQQdKr5RM638IhLZA24tMkfU0XrVp179c7aRdkrIsn6pGzmp2bufjetrLLGkrj8 +mujV0nwP4qAHTZjSgCAtoVEUaSYv+WcBsB3shHycB5ffTLZWAOY0WzCrDpsXpd7W +N/NaoRjlHL8CgYEA2DEpZtr+2bYF/coENoJbe3tnilZOjeirp2u/TAzr2/DcLps1 +fbiionXdth4DmnuTshyLJuMR892ZYcoW6Pau1E74LBq7A/VdbVoeZfoUdR11h7XX +Mg/s+MP21w/xgvPyGFovxJhgmaMbu/EIgJr5w9Jr+nhBh+7+RUzZ3uAA1LECgYEA +gCb/3vXfgytaRlDzIixI3qP5di07EmSKeoHCPDBqvyX1b6RbtxDaV/TChqjMRoCf +9UU0MdpG98g+63D3sskNfdhbb6xvdCw5Cigma4dG8pyrEQN85VNc0D2cpqJHq9i0 +bVc4PUt0KxQyLA5tvewl6jPvchzddPoXkG4BjhKcB5sCgYAVA1rHQy6kMaJuInRs +YPQ+jpKKoxSbDfRvXuhBGaAyisJy5lfI6qHDpl9t2QNHV2tnHq1R5AW8rkAGKVdF +dh/t5DvgfC20ZMVZlJcoPnfwdVaOvu0tnLPk1pes1pInH/TbBGNm6ngZTBdXVu3J +4qnsF0nRAwBqxlv1ldLOF/9i4w== +-----END PRIVATE KEY----- \ No newline at end of file diff --git a/test/temporal/test/retry_coherence.clj b/test/temporal/test/retry_coherence.clj new file mode 100644 index 0000000..f2126d0 --- /dev/null +++ b/test/temporal/test/retry_coherence.clj @@ -0,0 +1,40 @@ +;; Copyright © Manetu, Inc. All rights reserved + +(ns temporal.test.retry-coherence + (:require [clojure.test :refer :all] + [taoensso.timbre :as log] + [temporal.client.core :as c] + [temporal.workflow :refer [defworkflow]] + [temporal.activity :refer [defactivity] :as a] + [temporal.test.utils :as t]) + (:import [java.time Duration])) + +(use-fixtures :once t/wrap-service) + +(defactivity retry-activity + [_ {:keys [mode]}] + (let [{:keys [activity-id]} (a/get-info)] + (log/info "retry-activity:" activity-id) + (if-let [details (a/get-heartbeat-details)] + (do + (log/info "original activity-id:" activity-id "current activity-id:" details) + (= activity-id details)) + (do + (a/heartbeat activity-id) + (case mode + :crash (throw (ex-info "synthetic crash" {})) + :timeout (Thread/sleep 2000)))))) + +(defworkflow retry-workflow + [args] + @(a/invoke retry-activity args {:start-to-close-timeout (Duration/ofSeconds 1)})) + +(deftest the-test + (testing "Verifies that a retriable crash has a stable activity-id" + (let [workflow (t/create-workflow retry-workflow)] + (c/start workflow {:mode :crash}) + (is (-> workflow c/get-result deref true?)))) + (testing "Verifies that a timeout retry has a stable activity-id" + (let [workflow (t/create-workflow retry-workflow)] + (c/start workflow {:mode :timeout}) + (is (-> workflow c/get-result deref true?))))) diff --git a/test/temporal/test/reuse_policy.clj b/test/temporal/test/reuse_policy.clj new file mode 100644 index 0000000..e7ac122 --- /dev/null +++ b/test/temporal/test/reuse_policy.clj @@ -0,0 +1,51 @@ +;; Copyright © Manetu, Inc. All rights reserved + +(ns temporal.test.reuse-policy + (:require [clojure.test :refer :all] + [slingshot.slingshot :refer [try+ throw+]] + [taoensso.timbre :as log] + [temporal.client.core :as c] + [temporal.workflow :refer [defworkflow] :as w] + [temporal.side-effect :as s] + [temporal.exceptions :as e] + [temporal.test.utils :as t]) + (:import [io.temporal.client WorkflowExecutionAlreadyStarted])) + +(use-fixtures :once t/wrap-service) + +(defworkflow reuse-workflow + [args] + (log/info "reuse-workflow:" args) + (case args + :sleep (w/await (constantly false)) + :crash (throw+ {:type ::synthetic-crash ::e/non-retriable? true}) + :exit (s/gen-uuid))) + +(defn invoke [{:keys [policy wid args] :or {wid ::wid args :exit policy :allow-duplicate}}] + (let [wf (c/create-workflow (t/get-client) reuse-workflow {:task-queue t/task-queue + :workflow-id wid + :workflow-id-reuse-policy policy})] + (c/start wf args) + @(c/get-result wf))) + +(deftest the-test + (testing "Verifies that :allow-duplicate policy works" + (dotimes [_ 2] + (is (some? (invoke {:wid ::allow-duplicate :policy :allow-duplicate}))))) + (testing "Verifies that :allow-duplicate-failed-only policy works" + (try+ + (invoke {:wid ::allow-duplicate-failed-only :args :crash}) + (catch [:type ::synthetic-crash] _ + :ok)) + (is (some? (invoke {:wid ::allow-duplicate-failed-only :policy :allow-duplicate-failed-only})))) + (testing "Verifies that :reject-duplicate policy works" + (let [result (invoke {:wid ::reject-duplicate :policy :reject-duplicate})] + (is (thrown? WorkflowExecutionAlreadyStarted (invoke {:wid ::reject-duplicate :policy :reject-duplicate}))) + (let [wf2 (c/create-workflow (t/get-client) ::reject-duplicate)] + (is (= result @(c/get-result wf2)))))) + (testing "Verifies that :terminate-if-running policy works" + (let [wf (c/create-workflow (t/get-client) reuse-workflow {:task-queue t/task-queue :workflow-id ::terminate-if-running})] + (c/start wf :sleep) + (is (some? (invoke {:wid ::terminate-if-running :policy :terminate-if-running}))))) + (testing "Verifies that a bogus reuse policy throws" + (is (thrown? IllegalStateException (invoke {:wid ::bogus :policy :bogus}))))) diff --git a/test/temporal/test/scale.clj b/test/temporal/test/scale.clj index 85ba703..481bcfe 100644 --- a/test/temporal/test/scale.clj +++ b/test/temporal/test/scale.clj @@ -1,4 +1,4 @@ -;; Copyright © 2022 Manetu, Inc. All rights reserved +;; Copyright © Manetu, Inc. All rights reserved (ns temporal.test.scale (:require [clojure.test :refer :all] @@ -20,7 +20,7 @@ id)) (defworkflow scale-workflow - [ctx {:keys [args]}] + [args] (log/info "workflow:" args) @(a/invoke scale-activity args)) diff --git a/test/temporal/test/schedule.clj b/test/temporal/test/schedule.clj new file mode 100644 index 0000000..0200963 --- /dev/null +++ b/test/temporal/test/schedule.clj @@ -0,0 +1,146 @@ +(ns temporal.test.schedule + (:require [clojure.test :refer :all] + [temporal.client.schedule :as schedule] + [temporal.internal.schedule :as s] + [temporal.internal.workflow :as w] + [temporal.test.utils :as t] + [temporal.workflow :refer [defworkflow]]) + (:import [io.temporal.client.schedules ScheduleClient ScheduleHandle ScheduleUpdateInput ScheduleDescription] + [java.time Duration Instant])) + +(def workflow-id "simple-workflow") +(def schedule-id "simple-workflow-schedule") +(defworkflow simple-workflow [ctx args] args) + +(defn create-mocked-schedule-handle + [state] + (reify ScheduleHandle + (update [_ update-fn] + (swap! state update :update assoc :update-fn update-fn)) + (delete [_] + (swap! state update :delete (fnil inc 0))) + (describe [_] + (swap! state update :describe (fnil inc 0)) + nil) + (pause [_] + (swap! state update :pause (fnil inc 0))) + (unpause [_] + (swap! state update :unpause (fnil inc 0))) + (trigger [_ overlap-policy] + (swap! state update :trigger assoc :overlap-policy overlap-policy)))) + +(defn create-mocked-schedule-client + [state] + (reify ScheduleClient + (createSchedule [_ schedule-id schedule schedule-options] + (swap! state update :create assoc + :schedule-id schedule-id + :schedule schedule + :schedule-options schedule-options) + (create-mocked-schedule-handle state)) + (getHandle [_ schedule-id] + (swap! state update :handle assoc :schedule-id schedule-id) + (create-mocked-schedule-handle state)))) + +(defn- stub-schedule-options + [& {:keys [action spec policy state schedule]}] + {:action (merge {:arguments {:name "John Doe" :age 32} + :options {:workflow-id workflow-id + :task-queue t/task-queue} + :workflow-type simple-workflow} + action) + :spec (merge {:cron-expressions ["0 * * * * "] + :end-at (Instant/now) + :jitter (Duration/ofSeconds 1) + :start-at (Instant/now) + :timezone "US/Central"} + spec) + :policy (merge {:pause-on-failure? true + :catchup-window (Duration/ofSeconds 1) + :overlap :skip} + policy) + :state (merge {:paused? true + :note "note" + :limited-action? false} + state) + :schedule (merge {:trigger-immediately? true + :memo "memo"} + schedule)}) + +;; NOTE: Temporal Schedules are not supported in the Temporal test environment +;; hence the mocked ScheduleClient stubs for now + +(deftest schedule-workflow-test + (testing "scheduling a workflow is successful" + (let [state (atom {}) + client (create-mocked-schedule-client state)] + (is (some? (schedule/schedule client schedule-id (stub-schedule-options)))) + (is (= (get-in @state [:create :schedule-id]) schedule-id)) + (is (= (-> (get-in @state [:create :schedule]) .getAction .getWorkflowType) (w/get-annotated-name simple-workflow))) + (is (= (-> (get-in @state [:create :schedule]) .getAction .getWorkflowType) "simple-workflow")) + (is (= (-> (get-in @state [:create :schedule]) .getAction .getOptions .getWorkflowId) workflow-id)) + (is (= (-> (get-in @state [:create :schedule]) .getSpec .getCronExpressions) ["0 * * * * "])) + (is (-> (get-in @state [:create :schedule]) .getPolicy .isPauseOnFailure)) + (is (= (-> (get-in @state [:create :schedule]) .getState .getNote) "note")) + (is (-> (get-in @state [:create :schedule]) .getState .isPaused))))) + +(deftest unschedule-scheduled-workflow-test + (testing "unscheduling a scheduled workflow is successful" + (let [state (atom {}) + client (create-mocked-schedule-client state)] + (schedule/unschedule client schedule-id) + (is (= (:delete @state) 1))))) + +(deftest describe-scheduled-workflow-test + (testing "describing a scheduled workflow is successful" + (let [state (atom {}) + client (create-mocked-schedule-client state)] + (schedule/describe client schedule-id) + (is (= (:describe @state) 1))))) + +(deftest pause-scheduled-workflow-test + (testing "pauses a scheduled workflow is successful" + (let [state (atom {}) + client (create-mocked-schedule-client state)] + (schedule/pause client schedule-id) + (is (= (:pause @state) 1))))) + +(deftest unpause-scheduled-workflow-test + (testing "unpauses a scheduled workflow is successful" + (let [state (atom {}) + client (create-mocked-schedule-client state)] + (schedule/unpause client schedule-id) + (is (= (:unpause @state) 1))))) + +(deftest execute-scheduled-workflow-test + (testing "executes a scheduled workflow is successful" + (let [state (atom {}) + client (create-mocked-schedule-client state)] + (schedule/execute client schedule-id :skip) + (is (= (get-in @state [:trigger :overlap-policy]) + (s/overlap-policy-> :skip)))))) + +(deftest reschedule-scheduled-workflow-test + (testing "reschedules/updates a scheduled workflow is successful" + (let [state (atom {}) + client (create-mocked-schedule-client state) + update-options (stub-schedule-options :spec {:cron-expressions ["1 * * * *"]}) + schedule-update-input (ScheduleUpdateInput. + (ScheduleDescription. + schedule-id + nil + (s/schedule-> (stub-schedule-options)) + nil + nil + nil + nil))] + (schedule/reschedule client schedule-id update-options) + ;; validate the actual update function works + (is (= (-> (get-in @state [:update :update-fn]) + (.apply schedule-update-input) + (.getSchedule) + (.getSpec) + (.getCronExpressions)) + (-> (s/schedule-> update-options) + (.getSpec) + (.getCronExpressions))))))) diff --git a/test/temporal/test/search_attributes.clj b/test/temporal/test/search_attributes.clj new file mode 100644 index 0000000..59bbb3f --- /dev/null +++ b/test/temporal/test/search_attributes.clj @@ -0,0 +1,31 @@ +;; Copyright © Manetu, Inc. All rights reserved + +(ns temporal.test.search-attributes + (:require [clojure.test :refer :all] + [temporal.client.core :as c] + [temporal.testing.env :as e] + [temporal.workflow :refer [defworkflow]]) + (:import [java.time Duration])) + +;; do not use the shared fixture, since we want to control the env creation + +(def task-queue ::default) + +(defworkflow searchable-workflow + [args] + :ok) + +(defn execute [] + (let [env (e/create {:search-attributes {"foo" :keyword}}) + client (e/get-client env) + _ (e/start env {:task-queue task-queue}) + workflow (c/create-workflow client searchable-workflow {:task-queue task-queue + :search-attributes {"foo" "bar"} + :workflow-execution-timeout (Duration/ofSeconds 1) + :retry-options {:maximum-attempts 1}})] + (c/start workflow {}) + @(c/get-result workflow))) + +(deftest the-test + (testing "Verifies that we can utilize custom search attributes" + (is (= (execute) :ok)))) \ No newline at end of file diff --git a/test/temporal/test/sequence.clj b/test/temporal/test/sequence.clj index 8d72a7f..dec2f30 100644 --- a/test/temporal/test/sequence.clj +++ b/test/temporal/test/sequence.clj @@ -1,4 +1,4 @@ -;; Copyright © 2022 Manetu, Inc. All rights reserved +;; Copyright © Manetu, Inc. All rights reserved (ns temporal.test.sequence (:require [clojure.test :refer :all] @@ -18,7 +18,7 @@ (str "Hi, " args)) (defworkflow sequence-workflow - [ctx _] + [_] @(-> (pt/resolved true) (p/then (fn [_] (a/invoke sequence-activity "Bob"))) diff --git a/test/temporal/test/side_effect.clj b/test/temporal/test/side_effect.clj index ecd13ed..2c8bc6c 100644 --- a/test/temporal/test/side_effect.clj +++ b/test/temporal/test/side_effect.clj @@ -1,4 +1,4 @@ -;; Copyright © 2022 Manetu, Inc. All rights reserved +;; Copyright © Manetu, Inc. All rights reserved (ns temporal.test.side-effect (:require [clojure.test :refer :all] @@ -12,7 +12,7 @@ (use-fixtures :once t/wrap-service) (defworkflow side-effect-workflow - [ctx {:keys [args]}] + [args] (log/info "workflow:" args) (side-effect/now)) diff --git a/test/temporal/test/signal_timeout.clj b/test/temporal/test/signal_timeout.clj index b8cb3bc..aab0ed9 100644 --- a/test/temporal/test/signal_timeout.clj +++ b/test/temporal/test/signal_timeout.clj @@ -1,10 +1,10 @@ -;; Copyright © 2022 Manetu, Inc. All rights reserved +;; Copyright © Manetu, Inc. All rights reserved (ns temporal.test.signal-timeout (:require [clojure.test :refer :all] [taoensso.timbre :as log] [temporal.client.core :refer [>!] :as c] - [temporal.signals :refer [ (invoke traced-workflow) (= :ok))))) diff --git a/test/temporal/test/types.clj b/test/temporal/test/types.clj index eb94a2d..9833119 100644 --- a/test/temporal/test/types.clj +++ b/test/temporal/test/types.clj @@ -1,56 +1,56 @@ -;; Copyright © 2022 Manetu, Inc. All rights reserved +;; Copyright © Manetu, Inc. All rights reserved (ns temporal.test.types (:require [clojure.test :refer :all] - [temporal.client.core :as client] - [temporal.internal.workflow :as workflow] - [temporal.client.worker :as worker]) - (:import [java.time Duration] + [temporal.client.worker :as worker] + [temporal.client.options :as o] + [temporal.internal.workflow :as w] + [temporal.internal.schedule :as s] + [temporal.internal.child-workflow :as cw]) + (:import [java.time Duration Instant] [io.grpc Grpc InsecureChannelCredentials Metadata] [io.grpc.netty.shaded.io.grpc.netty GrpcSslContexts])) (deftest workflow-options (testing "Verify that our workflow options work" - (let [x (workflow/wf-options-> {:workflow-id "foo" - :task-queue "bar" - :workflow-execution-timeout (Duration/ofSeconds 1) - :workflow-run-timeout (Duration/ofSeconds 1) - :workflow-task-timeout (Duration/ofSeconds 1) - :retry-options {:maximum-attempts 1} - :cron-schedule "* * * * *" - :memo {"foo" "bar"} - :search-attributes {"foo" "bar"}})] + (let [x (w/wf-options-> {:workflow-id "foo" + :task-queue "bar" + :workflow-execution-timeout (Duration/ofSeconds 1) + :workflow-run-timeout (Duration/ofSeconds 1) + :workflow-task-timeout (Duration/ofSeconds 1) + :retry-options {:maximum-attempts 1} + :cron-schedule "* * * * *" + :memo {"foo" "bar"} + :search-attributes {"foo" "bar"}})] (is (-> x (.getWorkflowId) (= "foo"))) (is (-> x (.getTaskQueue) (= "bar")))))) (deftest client-options (testing "Verify that our stub options work" - (let [x (client/stub-options-> {:channel (-> (Grpc/newChannelBuilder "foo:1234" (InsecureChannelCredentials/create)) - (.build)) - :ssl-context (-> (GrpcSslContexts/forClient) - (.build)) - :target "foo:1234" - :enable-https false - :rpc-timeout (Duration/ofSeconds 1) - :rpc-long-poll-timeout (Duration/ofSeconds 1) - :rpc-query-timeout (Duration/ofSeconds 1) - :backoff-reset-freq (Duration/ofSeconds 1) - :grpc-reconnect-freq (Duration/ofSeconds 1) - :headers (Metadata.) - :enable-keepalive true - :keepalive-time (Duration/ofSeconds 1) - :keepalive-timeout (Duration/ofSeconds 1) - :keepalive-without-stream true})] + (let [x (o/stub-options-> {:channel (-> (Grpc/newChannelBuilder "foo:1234" (InsecureChannelCredentials/create)) (.build)) + :ssl-context (-> (GrpcSslContexts/forClient) (.build)) + :target "foo:1234" + :enable-https false + :rpc-timeout (Duration/ofSeconds 1) + :rpc-long-poll-timeout (Duration/ofSeconds 1) + :rpc-query-timeout (Duration/ofSeconds 1) + :backoff-reset-freq (Duration/ofSeconds 1) + :grpc-reconnect-freq (Duration/ofSeconds 1) + :headers (Metadata.) + :enable-keepalive true + :keepalive-time (Duration/ofSeconds 1) + :keepalive-timeout (Duration/ofSeconds 1) + :keepalive-without-stream true})] (is (-> x (.getTarget) (= "foo:1234"))))) (testing "Verify that our client options work" - (let [x (client/client-options-> {:identity "test" - :namespace "test"})] + (let [x (o/workflow-client-options-> {:identity "test" + :namespace "test"})] (is (-> x (.getIdentity) (= "test"))) (is (-> x (.getNamespace) (= "test"))))) (testing "Verify that mixed client/stub options work" (let [options {:target "foo:1234" :namespace "default"}] - (is (some? (client/stub-options-> options))) - (is (some? (client/client-options-> options)))))) + (is (some? (o/stub-options-> options))) + (is (some? (o/workflow-client-options-> options)))))) (deftest worker-options (testing "Verify that our worker-options work" @@ -66,3 +66,93 @@ :max-taskqueue-activities-per-second 1.0 :max-workers-activities-per-second 1.0})] (is (-> x (.isLocalActivityWorkerOnly) false?))))) + +(deftest schedule-client-options + (testing "Verify that our stub options work" + (let [x (o/stub-options-> {:channel (-> (Grpc/newChannelBuilder "foo:1234" (InsecureChannelCredentials/create)) (.build)) + :ssl-context (-> (GrpcSslContexts/forClient) (.build)) + :target "foo:1234" + :enable-https false + :rpc-timeout (Duration/ofSeconds 1) + :rpc-long-poll-timeout (Duration/ofSeconds 1) + :rpc-query-timeout (Duration/ofSeconds 1) + :backoff-reset-freq (Duration/ofSeconds 1) + :grpc-reconnect-freq (Duration/ofSeconds 1) + :headers (Metadata.) + :enable-keepalive true + :keepalive-time (Duration/ofSeconds 1) + :keepalive-timeout (Duration/ofSeconds 1) + :keepalive-without-stream true})] + (is (-> x (.getTarget) (= "foo:1234"))))) + (testing "Verify that our client options work" + (let [x (o/schedule-client-options-> {:identity "test" + :namespace "test"})] + (is (-> x (.getIdentity) (= "test"))) + (is (-> x (.getNamespace) (= "test"))))) + (testing "Verify that mixed client/stub options work" + (let [options {:target "foo:1234" :namespace "default"}] + (is (some? (o/stub-options-> options))) + (is (some? (o/schedule-client-options-> options)))))) + +(deftest schedule-options + (testing "Verify that a schedule can be constructed with the action, spec, policy, and state" + (let [action {:arguments {:value 1} + :options {:workflow-id "my-workflow-execution" + :task-queue "queue"} + :workflow-type "my-workflow"} + spec {:cron-expressions ["0 * * * * "] + :end-at (Instant/now) + :jitter (Duration/ofSeconds 1) + :start-at (Instant/now) + :timezone "US/Central"} + policy {:pause-on-failure? true + :catchup-window (Duration/ofSeconds 1) + :overlap :skip} + state {:paused? true + :note "note" + :limited-action? false} + schedule (s/schedule-> {:action action + :policy policy + :spec spec + :state state})] + (is (some? (s/schedule-action-start-workflow-> action))) + (is (some? (s/schedule-spec-> spec))) + (is (some? (s/schedule-policy-> policy))) + (is (some? (s/schedule-state-> state))) + (is (some? schedule)) + (is (= "my-workflow" (-> schedule .getAction .getWorkflowType))) + (is (= "my-workflow-execution" (-> schedule .getAction .getOptions .getWorkflowId))) + (is (= ["0 * * * * "] (-> schedule .getSpec .getCronExpressions))) + (is (-> schedule .getPolicy .isPauseOnFailure)) + (is (= "note" (-> schedule .getState .getNote))) + (is (-> schedule .getState .isPaused))))) + +(deftest child-workflow-options + (testing "Verify that a `ChildWorkflowOptions` instance can be built properly" + (let [options {:workflow-id "foo" + :task-queue "bar" + :workflow-execution-timeout (Duration/ofSeconds 1) + :workflow-run-timeout (Duration/ofSeconds 2) + :workflow-task-timeout (Duration/ofSeconds 3) + :retry-options {:maximum-attempts 1} + :cron-schedule "* * * * *" + :memo {"foo" "bar"} + :workflow-id-reuse-policy :terminate-if-running + :parent-close-policy :terminate + :cancellation-type :abandon} + child-workflow-options (cw/child-workflow-options-> options)] + (is (some? child-workflow-options)) + (is (= "foo" (-> child-workflow-options .getWorkflowId))) + (is (= "bar" (-> child-workflow-options .getTaskQueue))) + (is (= (Duration/ofSeconds 1) (-> child-workflow-options .getWorkflowExecutionTimeout))) + (is (= (Duration/ofSeconds 2) (-> child-workflow-options .getWorkflowRunTimeout))) + (is (= (Duration/ofSeconds 3) (-> child-workflow-options .getWorkflowTaskTimeout))) + (is (= 1 (-> child-workflow-options .getRetryOptions .getMaximumAttempts))) + (is (= "* * * * *" (-> child-workflow-options .getCronSchedule))) + (is (= {"foo" "bar"} (-> child-workflow-options .getMemo))) + (is (= io.temporal.api.enums.v1.WorkflowIdReusePolicy/WORKFLOW_ID_REUSE_POLICY_TERMINATE_IF_RUNNING + (-> child-workflow-options .getWorkflowIdReusePolicy))) + (is (= io.temporal.api.enums.v1.ParentClosePolicy/PARENT_CLOSE_POLICY_TERMINATE + (-> child-workflow-options .getParentClosePolicy))) + (is (= io.temporal.workflow.ChildWorkflowCancellationType/ABANDON + (-> child-workflow-options .getCancellationType)))))) diff --git a/test/temporal/test/utils.clj b/test/temporal/test/utils.clj index d6eb8d0..299e6cf 100644 --- a/test/temporal/test/utils.clj +++ b/test/temporal/test/utils.clj @@ -1,4 +1,4 @@ -;; Copyright © 2022 Manetu, Inc. All rights reserved +;; Copyright © Manetu, Inc. All rights reserved (ns temporal.test.utils "Utilities common to all tests" @@ -19,6 +19,9 @@ ;;----------------------------------------------------------------------------- ;; Utilities ;;----------------------------------------------------------------------------- +(defn get-worker [] + (get @state :worker)) + (defn get-client [] (get @state :client)) @@ -30,17 +33,18 @@ ;;----------------------------------------------------------------------------- (defn create-service [] (let [env (e/create) - client (e/get-client env)] - (e/start env {:task-queue task-queue}) + client (e/get-client env) + worker (e/start env {:task-queue task-queue})] (swap! state assoc :env env + :worker worker :client client))) (defn destroy-service [] (swap! state (fn [{:keys [env] :as s}] (e/stop env) - (dissoc s :env :client)))) + (dissoc s :env :worker :client)))) (defn wrap-service [test-fn] (create-service) diff --git a/test/temporal/test/uuid_test.clj b/test/temporal/test/uuid_test.clj index eb542cd..eca4c2b 100644 --- a/test/temporal/test/uuid_test.clj +++ b/test/temporal/test/uuid_test.clj @@ -1,4 +1,4 @@ -;; Copyright © 2022 Manetu, Inc. All rights reserved +;; Copyright © Manetu, Inc. All rights reserved (ns temporal.test.uuid-test (:require [clojure.test :refer :all] @@ -11,7 +11,7 @@ (use-fixtures :once t/wrap-service) (defworkflow uuid-workflow - [ctx args] + [args] (log/info "workflow:" args) (gen-uuid)) diff --git a/test/temporal/test/versioning.clj b/test/temporal/test/versioning.clj new file mode 100644 index 0000000..a6c8d3a --- /dev/null +++ b/test/temporal/test/versioning.clj @@ -0,0 +1,62 @@ +;; Copyright © Manetu, Inc. All rights reserved + +(ns temporal.test.versioning + (:require [clojure.test :refer :all] + [taoensso.timbre :as log] + [mockery.core :refer [with-mock]] + [temporal.client.core :as c] + [temporal.workflow :refer [defworkflow] :as w] + [temporal.activity :refer [defactivity] :as a] + [temporal.test.utils :as t])) + +(use-fixtures :once t/wrap-service) + +(def mailbox (atom nil)) + +;; Only serves to generate events in our history +(defactivity versioned-activity + [ctx args] + (reset! mailbox args) + args) + +(defn workflow-v1 + [] + (log/info "versioned-workflow v1") + @(a/invoke versioned-activity :foo) + @(a/invoke versioned-activity :v1)) + +(defn workflow-v2 + [] + (log/info "versioned-workflow v2") + @(a/invoke versioned-activity :foo) + (let [version (w/get-version ::test w/default-version 1)] + (cond + (= version w/default-version) @(a/invoke versioned-activity :v1) + (= version 1) @(a/local-invoke versioned-activity :v2)))) + +(defworkflow versioned-workflow + [args] + (workflow-v1)) + +(deftest the-test + (let [client (t/get-client) + worker (t/get-worker) + wf (c/create-workflow client versioned-workflow {:task-queue t/task-queue :workflow-id "test-1"})] + (testing "Invoke our v1 workflow" + (c/start wf {}) + (is (= @(c/get-result wf) :v1)) + (is (= @mailbox :v1))) + (with-mock _ + {:target ::workflow-v1 + :return workflow-v2} ;; emulates a code update by dynamically substituting v2 for v1 + (testing "Replay the workflow after upgrading the code" + (reset! mailbox :slug) + (let [history (.fetchHistory client "test-1")] + (.replayWorkflowExecution worker history)) + (is (= @mailbox :slug))) ;; activity is not re-executed in replay, so the :slug should remain + (testing "Invoke our workflow fresh and verify that it takes the v2 path" + (reset! mailbox nil) + (let [wf2 (c/create-workflow client versioned-workflow {:task-queue t/task-queue :workflow-id "test-2"})] + (c/start wf2 {}) + (is (= @(c/get-result wf2) :v2)) + (is (= @mailbox :v2))))))) diff --git a/test/temporal/test/vthreads.clj b/test/temporal/test/vthreads.clj new file mode 100644 index 0000000..2a304e4 --- /dev/null +++ b/test/temporal/test/vthreads.clj @@ -0,0 +1,62 @@ +;; Copyright © Manetu, Inc. All rights reserved + +(ns temporal.test.vthreads + (:require [clojure.string :as string] + [clojure.test :refer :all] + [promesa.core :as p] + [promesa.exec :refer [vthreads-supported?]] + [taoensso.timbre :as log] + [temporal.activity :refer [defactivity] :as a] + [temporal.client.core :as c] + [temporal.testing.env :as e] + [temporal.workflow :refer [defworkflow]]) + (:import [java.time Duration])) + +(def task-queue ::default) + +(defactivity collect-platform-threads [_ _] + (->> (Thread/getAllStackTraces) + (keys) + (map Thread/.getName) + (into #{}))) + +(defworkflow vthread-workflow + [_args] + {:virtual-worker-thread? (.isVirtual (Thread/currentThread)) + :platform-threads @(a/invoke collect-platform-threads nil)}) + +(defn execute [backend-opts worker-opts] + (let [env (e/create backend-opts) + client (e/get-client env) + _ (e/start env (merge {:task-queue task-queue} worker-opts)) + workflow (c/create-workflow client vthread-workflow {:task-queue task-queue :workflow-execution-timeout (Duration/ofSeconds 1) :retry-options {:maximum-attempts 1}}) + _ (c/start workflow {})] + @(-> (c/get-result workflow) + (p/finally (fn [_ _] (e/synchronized-stop env)))))) + +(defn- substring-in-coll? [substr coll] + (boolean (some (fn [s] (string/includes? s substr)) coll))) + +(deftest the-test + (testing "Verifies that we do not use vthreads by default" + (is (false? (:virtual-worker-thread? (execute {} {}))))) + (testing "Verifies that we do not use vthreads if we specifically disable them" + (is (false? (:virtual-worker-thread? (execute {:worker-factory-options {:using-virtual-workflow-threads false}} + {}))))) + (if-not vthreads-supported? + (log/info "vthreads require JDK >= 21, skipping tests") + (testing "Verifies that we can enable vthread support" + (is (true? (:virtual-worker-thread? + (execute {:worker-factory-options {:using-virtual-workflow-threads true}} + {})))) + + (testing "Verifies that Poller and Executor threads can be turned into vthreads using worker-options" + (let [pthreads (:platform-threads (execute {} {:using-virtual-threads true}))] + (is (not-any? #(substring-in-coll? % pthreads) + ["Workflow Executor" "Activity Executor" + "Workflow Poller" "Activity Poller"]))) + + (let [pthreads (:platform-threads (execute {} {:using-virtual-threads false}))] + (is (every? #(substring-in-coll? % pthreads) + ["Workflow Executor" "Activity Executor" + "Workflow Poller" "Activity Poller"]))))))) diff --git a/test/temporal/test/workflow_signal.clj b/test/temporal/test/workflow_signal.clj index 9f47afd..b923673 100644 --- a/test/temporal/test/workflow_signal.clj +++ b/test/temporal/test/workflow_signal.clj @@ -1,10 +1,10 @@ -;; Copyright © 2022 Manetu, Inc. All rights reserved +;; Copyright © Manetu, Inc. All rights reserved (ns temporal.test.workflow-signal (:require [clojure.test :refer :all] [taoensso.timbre :as log] [temporal.client.core :as c] - [temporal.signals :refer [!]] + [temporal.signals :refer [!] :as s] [temporal.workflow :refer [defworkflow]] [temporal.test.utils :as t])) @@ -13,12 +13,13 @@ (def signal-name ::signal) (defworkflow wfsignal-primary-workflow - [ctx {:keys [signals] :as args}] + [args] (log/info "primary-workflow:" args) - (! workflow-id signal-name msg))