From 209a5b2d77bad17f81a9c46569cb4e14dd7581a6 Mon Sep 17 00:00:00 2001 From: Ben Sless Date: Thu, 23 Dec 2021 10:40:07 +0200 Subject: [PATCH 1/3] Welcome dataflow namespace Formerly correlli --- doc/dataflow.org | 188 +++++++++++++ src/main/clojure/more/async/dataflow.clj | 104 +++++++ .../clojure/more/async/dataflow/buffer.clj | 25 ++ .../clojure/more/async/dataflow/channel.clj | 56 ++++ src/main/clojure/more/async/dataflow/node.clj | 253 ++++++++++++++++++ .../clojure/more/async/dataflow/specs.clj | 14 + 6 files changed, 640 insertions(+) create mode 100644 doc/dataflow.org create mode 100644 src/main/clojure/more/async/dataflow.clj create mode 100644 src/main/clojure/more/async/dataflow/buffer.clj create mode 100644 src/main/clojure/more/async/dataflow/channel.clj create mode 100644 src/main/clojure/more/async/dataflow/node.clj create mode 100644 src/main/clojure/more/async/dataflow/specs.clj diff --git a/doc/dataflow.org b/doc/dataflow.org new file mode 100644 index 0000000..bdc40be --- /dev/null +++ b/doc/dataflow.org @@ -0,0 +1,188 @@ +* Dataflow + +This namespace is designed to compose simple core.async processes. +It's probably suitable for micro-services. + +The configuration and data model are inspired by [[https://github.com/onyx-platform/onyx][Onyx]]. + +** Motivation + +There is usually little need to go through the logistics and ceremony, when +using core.async, to manage the channels, puts and takes, and in most instances, +lifecycle, as it naturally emerges from the topology. + +Processes' topologies are usually an emergent phenomenon and not explicitly stated. +There is a mix between topology, business logic, and low level async APIs. + +The idea is to separate the topology of the process from logic as much as +possible by providing a data language to describe the data flow, and functions +and other vars are to be resolved when "compiling" the model. + +** Data Model + + - Edges: core.async channels + - Vertices: processing units, connected by channels. Can be pipes, drivers, sinks. + + The graph is describe in terms of two collections: + + - Edges: data describing only the channels, including buffer types, buffer functions, size and transducers. + - Nodes: data describing a pipeline between two channels, mult, producer or consumer. + +*** Buffers + +#+begin_src clojure + {::buffer/type ::buffer/blocking + ::buffer/size 8} + + {::buffer/type ::buffer/sliding + ::buffer/size 8} + + {::buffer/type ::buffer/dropping + ::buffer/size 8} +#+end_src + +*** Channels + +#+begin_src clojure + {::chan/name :a + ::chan/type ::chan/simple} + + {::chan/name :b + ::chan/type ::chan/sized + ::chan/size 8} + + {::chan/name :c + ::chan/type ::chan/buffered + ::chan/buffer {::buffer/type ::buffer/blocking + ::buffer/size 8}} +#+end_src + +** Extension + +*** Buffers + +#+begin_src clojure + (defmethod buffer/-type ::your-new-type [_] ::spec-for-your-type) + + (defmethod buffer/-compile ::your-new-type + [{:keys [:buffer/arg1 :buffer/arg2]}] + (your-buffer-fn arg1 arg2)) + + ;;; Example from buffer namespace + + (defmethod -compile ::dropping [{:keys [::size]}] (a/dropping-buffer size)) +#+end_src + +*** Channels + +#+begin_src clojure + (defmethod chan/-type ::your-new-type [_] ::spec-for-your-type) + + (defmethod chan/-compile ::your-new-type + [{:keys [:chan/arg1 :chan/arg2]}] + (your-chan-fn arg1 arg2)) + + ;; Example from channel namespace + + (defmethod -compile ::buffered [{:keys [::buffer]}] (a/chan (buffer/-compile buffer))) +#+end_src + +*** Worker nodes + +Worker nodes compilers also take an environment argument which contains the channels + +#+begin_src clojure + (defmethod node/-type ::your-new-type [_] ::spec-for-your-type) + + (defmethod node/-compile ::your-new-type + [{:keys [:node/arg1 :node/arg2]} env] + (your-node-fn arg1 arg2)) + + ;; Example from node namespace + + (defmethod -compile ::pipeline-blocking + [{{to ::to from ::from size ::size xf ::xf} ::pipeline} env] + (a/pipeline-blocking size (env to) xf (env from))) +#+end_src + +** Usage + +*** Require dataflow namespaces + +#+begin_src clojure + (require '[more.async.dataflow.node :as node] + '[more.async.dataflow.channel :as chan] + '[more.async.dataflow.buffer :as buffer] + '[more.async.dataflow :as flow]) +#+end_src + +*** Define a model + +- Define model with channels and nodes (can be verified using spec). +- Define the required vars. +- Validate the model using the ~::flow/model~ spec. +- Try compiling the model using ~compile-model~. + +*** Example + +#+begin_src clojure + (def model + {::channels + [{::chan/name :in + ::chan/type ::chan/sized + ::chan/size 1} + {::chan/name :out + ::chan/type ::chan/sized + ::chan/size 1}] + ::nodes + [ + + {::node/name :producer + ::node/type ::node/produce + ::node/produce + {::node/to :in + ::node/async true + ::node/fn (let [a (atom 0)] + (fn drive [] + (Thread/sleep 1000) + (swap! a inc)))}} + + {::node/name :pipeline + ::node/type ::node/pipeline-blocking + ::node/pipeline + {::node/from :in + ::node/to :out + ::node/size 4 + ::node/xf (map (fn [x] (println x) (Thread/sleep 2500) x))}} + + {::node/name :consumer + ::node/type ::node/consume + ::node/consume + {::node/from :out + ::node/fn (fn [x] (println :OUT x)) + ::node/async? true}}]}) + + (s/valid? ::flow/channels (::channels model)) + + (s/valid? ::flow/nodes (::nodes model)) + + (s/valid? ::flow/model model) + + (s/valid? ::flow/connected model) + + (def system (compile-model model)) + + (a/close! (:in (::channels system))) +#+end_src + +** Status + +Experimental. Looking for user reports. + +** Roadmap + +- [ ] Tests +- [ ] Analyze the topology to find any dangling channels or disconnected pipes before instancing the pipes. +- [ ] Implement ~select~ based on ~alt!~ and/or ~alts!~. +- [ ] Find an idiomatic way to connect a web handler as driver. +- [ ] Refine specs, currently have no way to differentiate transducers from regular functions. diff --git a/src/main/clojure/more/async/dataflow.clj b/src/main/clojure/more/async/dataflow.clj new file mode 100644 index 0000000..4e4fa44 --- /dev/null +++ b/src/main/clojure/more/async/dataflow.clj @@ -0,0 +1,104 @@ +(ns more.async.dataflow + (:require + [more.async.dataflow.node :as node] + [more.async.dataflow.channel :as chan] + [clojure.spec.alpha :as s] + [clojure.core.async :as a])) + +(defn connected? + [node chans] + (every? + #(contains? chans (::node/name %)) + (node/ports node))) + +(defn connected-model? + [{::keys [channels nodes]}] + (let [chans (into #{} (map ::chan/name) channels)] + (every? #(connected? % chans) nodes))) + +(s/def ::connected connected-model?) + +(s/def ::node (s/multi-spec node/-type ::node/type)) +(s/def ::channel (s/multi-spec chan/-type ::chan/type)) + +(s/def ::channels (s/+ ::channel)) +(s/def ::nodes (s/+ ::node)) + +(s/def ::model (s/keys :req [::channels ::nodes])) + +(s/def ::correct-model (s/and ::connected)) + +(defn- build + [k f specs] + (reduce + (fn [m spec] + (assoc m (get spec k) (f spec))) + {} + specs)) + +(defn -env + [chans] + (fn [lookup] + (if-some [ch (get chans lookup)] + ch + (throw (ex-info (format "Channel %s not found" lookup) chans))))) + +(defn compile-model + [{::keys [channels nodes env] + :or {env -env}}] + (let [chans (build ::chan/name chan/-compile channels) + env (env chans) + workers (build ::node/name #(node/-compile % env) nodes)] + {::channels chans ::nodes workers})) + +(s/fdef compile-model + :args (s/cat :model ::model)) + +(comment + (def model + {::channels + [{::chan/name :in + ::chan/type ::chan/sized + ::chan/size 1} + {::chan/name :out + ::chan/type ::chan/sized + ::chan/size 1}] + ::nodes + [ + + {::node/name :producer + ::node/type ::node/produce + ::node/produce + {::node/to :in + ::node/async true + ::node/fn (let [a (atom 0)] + (fn drive [] + (Thread/sleep 1000) + (swap! a inc)))}} + + {::node/name :pipeline + ::node/type ::node/pipeline-blocking + ::node/pipeline + {::node/from :in + ::node/to :out + ::node/size 4 + ::node/xf (map (fn [x] (println x) (Thread/sleep 2500) x))}} + + {::node/name :consumer + ::node/type ::node/consume + ::node/consume + {::node/from :out + ::node/fn (fn [x] (println :OUT x)) + ::node/async? true}}]}) + + (s/valid? ::channels (::channels model)) + + (s/valid? ::nodes (::nodes model)) + + (s/valid? ::model model) + + (s/valid? ::connected model) + + (def system (compile-model model)) + + (a/close! (:in (::channels system)))) diff --git a/src/main/clojure/more/async/dataflow/buffer.clj b/src/main/clojure/more/async/dataflow/buffer.clj new file mode 100644 index 0000000..6193e69 --- /dev/null +++ b/src/main/clojure/more/async/dataflow/buffer.clj @@ -0,0 +1,25 @@ +(ns more.async.dataflow.buffer + (:require + [clojure.core.async :as a] + [clojure.spec.alpha :as s])) + + +(s/def ::size int?) + +(s/def ::fixed-buffer (s/keys :req [::size])) + +(defmulti -type ::type) +(defmethod -type ::blocking [_] (s/keys :req [::size])) +(defmethod -type ::sliding [_] (s/keys :req [::size])) +(defmethod -type ::dropping [_] (s/keys :req [::size])) + +(defmulti -compile ::type) +(defmethod -compile ::blocking [{:keys [::size]}] (a/buffer size)) +(defmethod -compile ::sliding [{:keys [::size]}] (a/sliding-buffer size)) +(defmethod -compile ::dropping [{:keys [::size]}] (a/dropping-buffer size)) + +(comment + (-compile {::type ::blocking + ::size 1}) + (s/explain-data ::type {::type ::simple + ::name :in})) diff --git a/src/main/clojure/more/async/dataflow/channel.clj b/src/main/clojure/more/async/dataflow/channel.clj new file mode 100644 index 0000000..51a799c --- /dev/null +++ b/src/main/clojure/more/async/dataflow/channel.clj @@ -0,0 +1,56 @@ +(ns more.async.dataflow.channel + (:require + [more.async.dataflow.buffer :as buffer] + [clojure.spec.alpha :as s] + [clojure.core.async :as a] + [clojure.data])) + +(s/def ::name (s/or :keyword keyword? + :string string? + :number number? + :symbol symbol?)) + +(s/def ::buffer (s/multi-spec buffer/-type ::buffer/type)) + +(s/def ::buffered-chan (s/keys :req [::name ::buffer])) + +(defmulti -type ::type) + +(defmethod -type ::simple [_] (s/keys :req [::name])) +(defmethod -type ::sized [_] (s/keys :req [::size ::name])) +(defmethod -type ::buffered [_] (s/keys :req [::name ::buffer])) + +(s/def ::chan (s/multi-spec -type ::type)) + +(defmulti -compile ::type) + +(defmethod -compile ::simple [_] (a/chan)) +(defmethod -compile ::sized [{:keys [::size]}] (a/chan size)) +(defmethod -compile ::buffered [{:keys [::buffer]}] (a/chan (buffer/-compile buffer))) + +(comment + (-compile {::type ::buffered + ::buffer {::buffer/type ::buffer/sliding + ::buffer/size 2}})) + +(comment + + (s/explain-data ::chan {::type ::simple + ::name :in}) + + (s/explain-data ::type {::type ::sized + ::size 1 + ::name :in}) + + (s/explain-data ::type {::type ::sized + ::name :in + ::size 1}) + + (s/explain-data ::buffer {::buffer/size 1 + ::buffer/type ::buffer/blocking}) + + (s/explain-data ::type {::name :out + ::type ::buffered + ::buffer + {::buffer/size 1 + ::buffer/type ::buffer/blocking}})) diff --git a/src/main/clojure/more/async/dataflow/node.clj b/src/main/clojure/more/async/dataflow/node.clj new file mode 100644 index 0000000..5e3be35 --- /dev/null +++ b/src/main/clojure/more/async/dataflow/node.clj @@ -0,0 +1,253 @@ +(ns more.async.dataflow.node + (:require + [clojure.spec.alpha :as s] + [clojure.core.async :as a] + [more.async :as ma] + [clojure.data])) + +(s/def ::name (s/or :keyword keyword? + :string string? + :number number? + :symbol symbol?)) + +(s/def ::direction #{::in ::out}) + +(defmulti -type ::type) + +(defmulti -compile (fn [node _env] (get node ::type))) + +(defmulti ports ::type) + +(s/def ::to ::name) +(s/def ::to* (s/* ::to)) +(s/def ::to-map (s/map-of any? ::to)) +(s/def ::from ::name) +(s/def ::size nat-int?) +(s/def ::xf fn?) +(s/def ::rf fn?) +(s/def ::fn fn?) +(s/def ::init-fn fn?) +(s/def ::async? boolean?) +(s/def ::timeout pos-int?) + +;;; PIPELINE + +(s/def ::pipeline (s/keys :req [::to ::from ::size ::xf])) +(defmethod -type ::pipeline-blocking [_] (s/keys :req [::name ::pipeline])) +(defmethod -type ::pipeline-async [_] (s/keys :req [::name ::pipeline])) +(defmethod -type ::pipeline [_] (s/keys :req [::name ::pipeline])) + +(defmethod -compile ::pipeline + [{{to ::to from ::from size ::size xf ::xf} ::pipeline} env] + (a/pipeline size (env to) xf (env from))) + +(defmethod -compile ::pipeline-blocking + [{{to ::to from ::from size ::size xf ::xf} ::pipeline} env] + (a/pipeline-blocking size (env to) xf (env from))) + +(defmethod -compile ::pipeline-async + [{{to ::to from ::from size ::size af ::xf} ::pipeline} env] + (a/pipeline-async size (env to) af (env from))) + +(doseq [t [::pipeline ::pipeline-blocking ::pipeline-async]] + (defmethod ports t + [{{to ::to from ::from} ::pipeline}] + #{{::name from ::direction ::in} + {::name to ::direction ::out}})) + +(comment + (s/explain-data + ::pipeline + {::type ::pipeline + ::from "from" + ::to "to" + ::size 4 + ::xf identity})) + +;;; BATCH + +(s/def ::batch + (s/keys :req [::from ::to ::size ::timeout] + :opt [::rf ::init-fn ::async?])) + + +(defmethod -type ::batch [_] + (s/keys :req [::name ::batch])) + +(defmethod -compile ::batch + [{{from ::from + to ::to + size ::size + timeout ::timeout + rf ::rf + init ::init-fn + async? ::async? + :or {rf conj init (constantly [])}} + ::batch} env] + (let [from (env from) + to (env to)] + (if async? + (ma/batch! from to size timeout rf init) + (a/thread (ma/batch!! from to size timeout rf init))))) + +(defmethod ports ::batch + [{{to ::to from ::from} ::batch}] + #{{::name from ::direction ::in} + {::name to ::direction ::out}}) + +(comment + (s/explain-data + ::batch + {::type ::batch + ::from "from" + ::to "to" + ::size 4 + ::timeout 4})) + +;;; MULT + + +(s/def ::mult (s/keys :req [::from] :opt [::to*])) +(defmethod -type ::mult [_] (s/keys :req [::name ::mult])) + +(defmethod -compile ::mult + [{{from ::from to :to*} ::mult} env] + (let [mult (a/mult (env from))] + (doseq [ch to] (a/tap mult (env ch))) + mult)) + +(defmethod ports ::mult + [{{to ::to from ::from} ::mult}] + (into + #{{::name from ::direction ::in}} + (map (fn [to] {::name to ::direction ::out})) + to)) + +(comment + (s/explain-data + ::mult + {::from :in + ::to* [:cbp-in :user-in]}) + (ports + {::type ::mult + ::name :cbm + ::mult {::from :in ::to [:cbp-in :user-in]}})) + +;;; TODO ::mix + +;;; PUBSUB + +(s/def ::pubsub (s/keys :req [::pub ::topic-fn] :opt [::sub])) +(s/def ::pub ::name) +(s/def ::sub (s/* (s/keys :req [::topic ::name]))) +(s/def ::topic any?) +(s/def ::topic-fn ifn?) + +(comment + (s/explain-data + ::pubsub + {::pub :in + ::topic-fn (constantly 0) + ::sub [{::topic 0 ::name ::out}]})) + +(defmethod -type ::pubsub [_] (s/keys :req [::name ::pubsub])) + +(defmethod -compile ::pubsub + [{{pub ::pub sub ::sub tf ::topic-fn} ::pubsub} env] + (let [p (a/pub (env pub) tf)] + (doseq [{:keys [:sub/topic :sub/chan]} sub] + (a/sub p topic (env chan))) + p)) + +(defmethod ports ::pubsub + [{{to ::sub from ::pub} ::pubsub}] + (into + #{{::name from + ::direction ::in}} + (map (fn [to] {::name to ::direction ::out})) + to)) + +;;; PRODUCER + +(s/def ::produce (s/keys :req [::to ::fn] :opt [::async?])) + +(defmethod -type ::produce [_] (s/keys :req [::name ::produce])) + +(defmethod -compile ::produce + [{{ch ::to f ::fn async? ::async?} ::produce} env] + (let [ch (env ch)] + (if async? + (ma/produce-call! ch f) + (a/thread (ma/produce-call!! ch f))))) + +(defmethod ports ::produce + [{{to ::to} ::produce}] + #{{::name to ::direction ::out}}) + +;;; CONSUMER + +(s/def ::checked? boolean?) +(s/def ::consume (s/keys :req [::from ::fn] :opt [::async? ::checked?])) + +(defmethod -type ::consume [_] (s/keys :req [::name ::consume])) + +(defmethod -compile ::consume + [{{ch ::from f ::fn async? ::async? checked? ::checked?} ::consume} env] + (let [ch (env ch)] + (if async? + ((if checked? + ma/consume-checked-call! + ma/consume-call!) ch f) + (a/thread ((if checked? + ma/consume-checked-call!! + ma/consume-call!!) ch f))))) + +(defmethod ports ::consume + [{{from ::from} ::consume}] + #{{::name from ::direction ::in}}) + +;;; SPLIT + +(s/def ::split (s/keys :req [::from ::to-map ::fn] :opt [::dropping?])) +(s/def ::dropping? boolean?) + +(defmethod -type ::split [_] (s/keys :req [::name ::split])) + +(defmethod -compile ::split + [{{from ::from to ::to-map f ::fn + dropping? ::dropping?} ::split} env] + ((if dropping? ma/split?! ma/split!) f (env from) (env to))) + +(defmethod ports ::split + [{{to ::to-map from ::from} ::split}] + (into + #{{::name from ::direction ::in}} + (map (fn [to] {::name to ::direction ::out})) + (vals to))) + +;;; REDUCTIONS + +(s/def ::reductions + (s/keys :req [::from ::to ::rf ::init-fn] + :opt [::async?])) + +(defmethod -type ::reductions [_] + (s/keys :req [::name ::type ::reductions])) + +(defmethod -compile ::reductions + [{{from ::from + to ::to + rf ::rf + init ::rf + async? ::async?} ::reductions} env] + (let [from (env from) + to (env to)] + (if async? + (ma/reductions! rf init from to) + (a/thread + (ma/reductions!! rf init from to))))) + +(defmethod ports ::reductions + [{{to ::to from ::from} ::reductions}] + #{{::name from ::direction ::in} + {::name to ::direction ::out}}) diff --git a/src/main/clojure/more/async/dataflow/specs.clj b/src/main/clojure/more/async/dataflow/specs.clj new file mode 100644 index 0000000..7f46c45 --- /dev/null +++ b/src/main/clojure/more/async/dataflow/specs.clj @@ -0,0 +1,14 @@ +(ns more.async.dataflow.specs + (:require + [clojure.spec.alpha :as s])) + +(defn kw->fn + [kw] + (when kw (resolve (symbol kw)))) + +(defn kfn? [kw] (ifn? (kw->fn kw))) + +(s/def ::name (s/or :keyword keyword? + :string string? + :number number? + :symbol symbol?)) From 989ee10eabe07874ce4b5753fc1ace6a249034ac Mon Sep 17 00:00:00 2001 From: Ben Sless Date: Thu, 23 Dec 2021 10:58:04 +0200 Subject: [PATCH 2/3] Add finally-fn to batch node --- src/main/clojure/more/async/dataflow/node.clj | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/main/clojure/more/async/dataflow/node.clj b/src/main/clojure/more/async/dataflow/node.clj index 5e3be35..06869d0 100644 --- a/src/main/clojure/more/async/dataflow/node.clj +++ b/src/main/clojure/more/async/dataflow/node.clj @@ -66,9 +66,11 @@ ;;; BATCH +(s/def ::finally-fn fn?) + (s/def ::batch (s/keys :req [::from ::to ::size ::timeout] - :opt [::rf ::init-fn ::async?])) + :opt [::rf ::init-fn ::async? ::finally-fn])) (defmethod -type ::batch [_] @@ -81,14 +83,15 @@ timeout ::timeout rf ::rf init ::init-fn + finally ::finally-fn async? ::async? - :or {rf conj init (constantly [])}} + :or {rf conj init (constantly []) finally identity}} ::batch} env] (let [from (env from) to (env to)] (if async? - (ma/batch! from to size timeout rf init) - (a/thread (ma/batch!! from to size timeout rf init))))) + (ma/batch! from to size timeout rf init finally) + (a/thread (ma/batch!! from to size timeout rf init finally))))) (defmethod ports ::batch [{{to ::to from ::from} ::batch}] From 86021eb81e4e8a7b63f271caf3c0f0bf01906e66 Mon Sep 17 00:00:00 2001 From: Ben Sless Date: Thu, 23 Dec 2021 11:00:34 +0200 Subject: [PATCH 3/3] Add dataflow link in README --- README.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/README.md b/README.md index 118077e..bca2f8e 100644 --- a/README.md +++ b/README.md @@ -153,6 +153,10 @@ channel. Produces an intermediate state only when consuming from output channel. Useful for maintaining internal state while processing a stream. +### Dataflow + +See [dataflow](./doc/dataflow.org) + ## Usage ### Dependency