📝Manifoldとは

非同期プログラミングのための部品を提供.

url: https://github.com/clj-commons/manifold

2つの抽象がポイント.

  • deferreds
  • streams

deferredsとは聞き慣れないが, 延期, 先延ばしの意味. 遅延評価ということかな? streamは並行プログラミングにおけるStreamsの概念.

ManifoldのStreamを利用して作られた通信ライブラリがaleph. alephの方にもManifoldに関するドキュメントはある.

Base Concepts

deferred, またはdeferred valuesは遅延された値という意味. 評価されていない値. <<… >>というような二重カッコで表現される. asynchronous valueともいわれる. Clojureのpromiseにcallbackを設定できるような拡張機能.

deferred valueになにかがbindされると realized という表現をつかっている. deferredの対概念がrealized.

一般的に stream とは, 末端がunboud valueのリストであり, manifoldにおけるstreamの概念ははsink streamとsource streamのduplex stream(二重)となる. 言い換えると, streamの両端がderefferd valueなlist.

  • sink
    • 情報を消費(consume)するstream.
    • put!する.
    • closedになる.
  • source は情報を生み出す(produce)stream.
    • take!する.
    • drainedになる.

このストリームの一般的なパターンは🎓Pipelineパターンにまとめた.

deferred: データフロー変数

https://github.com/clj-commons/manifold/blob/master/doc/deferred.md

deferred valueは@だったりderefで読む.

deferred valuesとはいわばデータフロー変数である(🤔Clojureのpromise/deliverはデータフロー変数のこと).

d/success! でデータフロー変数に値をbindingする. これはpromiseにおけるdeliverと同じと理解していい. もしくは, d/error! でデータフロー変数に例外をbindingすることもできる.

success!/error!が発動されてはじめてrealizedという状態になり, on-realizedに登録されたcallback関数がデータフロー変数に対して発動する. しかし, READMEで “no one should ever need to use on-realized”と記述されているように, 実際にcallbackを登録する関数はd/chainが使われる.

chain/let-flow: 決定性データフローの構築

chain をつかうと, 遅延実行関数(deferred valueがbindされたときのcallback関数)を関数合成(composing)できる. データフロー変数を引数にとるThreading Macrosが構築できる.

d/chainは一つのデータフロー変数に対してcallback関数を連ねていくことができる. 興味深い性質は, 途中でdeferred valueが関数の中で戻らされる場合は途中で処理が止まりその時点のdeferred valueが戻り値で戻る.

これはchainの途中で実行に時間がかかる処理が挟まっていたり, なにかの計算結果を待ち合わせているときに, その先のフロー処理が進まずに途中結果が戻ってくるようなもの.

chain returns a deferred representing the return value of the right-most callback. If any of the functions returns a deferred or a value that can be coerced into a deferred, the chain will be paused until the deferred yields a value.

データフロー実行の合成がchain. データフロー変数の合成がlet-flowと解釈することができる.

let-flowはletとchainのsyntax sugerといえる.

(defn deferred-sum []
  (let [a (call-service-a)
        b (call-service-b)]
    (chain (zip a b)
           (fn [[a b]]
             (+ a b)))))
 
(defn deferred-sum []
  (let-flow [a (call-service-a)
             b (call-service-b)]
    (+ a b)))

streams: 遅延ストリーム

ref. https://github.com/clj-commons/manifold/blob/master/doc/stream.md

streamの型をtypeで調べると, SplicedStreamとなる. spliceは余りきかない英単語だが, 連結されたの意.


なにも取り出せない状態, 空のsourceは drained (枯渇)という状態になる.

このとき, streamからtake!するとdeferred されたnilが帰ってくる(<< nil >> みたいな感じ). nilが意図して入力された値(valid message)なのか, 枯渇を示すものなのかを明確に区別したいときは, take!のoption引数で::drainedを渡す.

> @(s/take! s ::drained)
::drained
 
;; サンプルではidentical?で比較していた.
(when (identical? ::drained msg)
  :a)

connect/connect-via/map/filter: 変換器とパイプライン構築

connectをつかってstream通しを連結すると一連の繋がりがパイプラインになる.

間に変換処理を挟むとそれはtransducerと呼ばれる変換器になる. connect-viaに渡す無名関数だったり, map/filterの関数が用意されてる(いろいろある).


connect-viaに渡す関数は引数としてsource->x->sink間の一つの値をもらう. 関数のなかでsinkにputする必要がある. #(s/put! b (inc %)).

connect-viaにはしばしは, backpressure という言葉が使われる. これは背圧と訳されてstreamの文脈では, streamにデータをput出来なくする機能. deferred valueが戻るとその値がrealizedされるまで他の値が渡されない.

Event-Bus: Pub/Sub拡張

🎓Publisher-Subscriberパターンを実現るすための機能.

あまり情報がないが, 以下の記事は詳しい.

A Tour of Manifold’s Deferred, Stream and Event Bus API | by Functional Human | Medium

IFはシンプルであり普通のpub/subのパターンなのでcore.asyncの pub/subからも類推できる.

Zach Tellman - Everything Will Flow - YouTube

もしくはコードを読む.

https://github.com/clj-commons/manifold/blob/master/src/manifold/bus.clj

やっていることは, subscribersのstreamを受け取ってそれを管理して, 上流から流れてきたものを管理化のsubscribersにながし続けるので, connectが進化したようなもの. 考え方が正しいかはわからないけどPub/Subのパターンからはみ出たcore.asyncのmult/tapの代替として使えるかも.

event-busの実体はCuncurrentHashMap+custom methodsのデータ構造なのでcloseとかは不要.

unsubscribe

unsubscribeするにはsubscribeで返されたstreamに対してclose!すればいい.

(require '[manifold.bus :as b])
(require '[manifold.stream :as s])
 
(def sub (b/subscribe bus :test))
(s/close! sub)

close!をしても, 元のstreamが閉じることはない. subscriberのstreamのみ閉じる.

closed?

s/closed?ではなく, s/drained?で確認する.

buffer設定

(event-bus #(stream 1e3))のように記述すると, event-busに対するsubscribeで生成したstreamにbufferを設定することができる.

for debug

デバックの理由により, websocket経由で情報を受信するときはたとえ利用シーンが⚙Producer-Consumerパターンの場合でも, PubSubパターンのevent-busをつかったほうがいいかも.

というのも, メインのstreamとともにデバッグ用のstreamを別でつくることでwebsocketに流れる情報を覗き見することができるから.

Executors

Manifoldのexecutorsとスレッドプールの管理について.

ref. https://github.com/clj-commons/manifold/blob/master/doc/execution.md

3つのスレッドプールを管理している.

  • wait-pool
    • manifold.stream/connect with java.util.BlockingQueue
    • manifold.deferred/chain with java.util.concurrent.Future
  • execute-pool
    • manifold.deferred/future
  • scheduler-pool
    • manifold.time/in
    • manifold.time/every
    • manifold.stream/periodically

wait-poolとexecute-poolは instrumented poolと表現されていて, 統計情報をもとに数が決まるようだ. 表示用callback関数(prnとか)を登録すると状況が見れる(register-execute-exec-stats-callback).

これ以外のデフォルト動作(aka. default executor)はmanifold.stream/put!したスレッドで動作する. ここでややこしくなるのが, 複数スレッドがひとつのstreamに対してput!すると, takeしたときはput! したスレッドごとに動作して統一性がなくなるところ.

解決策として, core.asyncではgoroutineで囲まれた処理はmain thread poolにbindingされて逐次実行される. manifoldでは, main thread poolとは別の固定数スレッドプールを fixed-thread-executor で生成して, onto メソッドでそのスレッドプールで実行することを指定するという改良の制御が可能.


Manifold Topics

streamの状態を調べる関数

いろいろあるが, descriptionという関数で全部Mapで情報が取れる. これをつかわないと, ただstreamを評価しただけでは文字列になる.

  • drained?
  • sink?
  • source?
  • closed?
  • stream?

core.asyncとの連携をするには?(core.async Interop)

core.asyncのchannelはmanifoldのstreamに変換できる(core.async channel-> manifold stream).

(def c (async/chan))
(def s (s/->source c))
(def s (s/->sink c))

connectをつかって連結することもできる(manifold stream->core.async channel)

(s/connect s c)

streamからtakeで取り出し続けるには?

loop-recurの再帰のなかでtakeを呼び出し続けることで取り出す.

d/loop関数はclojure.core/loopを拡張した遅延ループのマクロでこの中でdefered streamからtake!することができる.

s/comsumeはstreamから値を取り出し続けcallback関数をひたすら当てるloopを書ける.

使いこなしているわけではないが, comsumeが簡単に書ける記法でより詳しく動作をカスタマイズしたいならばloop/recurをつかう印象.

s/consume

consume はstreamに入っている値を全部取り出して一つずつcallback関数を適用する.

以下を評価すると, deferred valueが即時に返ってくるが, 取り出しのloop/recurが中で発生し続けるため, sockから流れてくるmsgは絶えずprintlnし続ける動作となる. これを止めるにはsockを閉じる.

(s/consume #(println %) sock)
 
(s/close! sock)

s/consumeはsourceがdrainedになるとtrueを返す仕様.

loop/recur

take! をで同じことをするには, loop/recurで一つずつ取り出した上で関数を適用する必要がある.

(defn my-consume [sock]
  (d/loop []
    (d/chain (s/take! sock ::drained)
             (fn [msg]
               (if (identical? ::drained msg)
                 ::drained
                 (println msg)))
 
             (fn [result]
               (when-not (identical? ::drained result)
                 (d/recur))))))

接続がされているときはdrained?はfalseになる. これがalephの機能なのかManifoldの機能なのかは調査中. 少なくとも上の例だと無限ループになる(i.e. 次のchainのcallback前で止まる). 永遠に待ち続けるのを回避したい場合は接続タイムアウトをtimeoutオプションで指定する.

core.async goとの連携

Manifold streamはcore.asyncのgoroutineのような軽量スレッドなわけではない. Manifoldは遅延ストリームがメイン機能で, CSP並行モデルではない. take!したときはput!したスレッドの延長で動作する.

こっちの機能も必要ならば, defered streamをcore.asyncのchannelに変換してgo-loopをつかう.

chain + doto

chainは入力されたdeferredをつないでいく. dotoは入力xは途中でなにをしても戻り値でxを返す. 以下のように書けるというTips.

(d/chain d
  #(doto % println)
  #(doto % println))

d/chainの中で例外が発生するとどうなる?

d/chainの中で例外が発生すると, 以後の関数は全てスキップして戻り値としてdeferredされた例外オブジェクトが返る.

(def d (d/deferred))
(def z (-> d
           (d/chain
              dec
              #(/ 1 %))))
(d/success! d 1)
 
z
;; deferred された例外オブジェクト.
;;=> << ERROR: #error (.. )
 
@z
;; 例外発動
;;=> java.lang.ArithmeticException

さらにd/catchのマクロをつかうと例外に対するcatchをつかったハンドリング処理をchainの処理と組み合わせることができる.

(-> d
   (d/chain dec #(/ 1 %))
   (d/catch Exception #(println "whoops, that didn't work:" %)))

Threading Macroは手続きの途中で例外が発生したときのハンドリングが課題だが, deferredというコンセプトはこれを解決するためのよい方法のように思える.

Railway oriented programming(鉄道指向プログラミング)

streamのcloseとそのハンドリング

streamはs/close!で明示的にcloseできる. s/closed?で状態を確認できる.

その他, stream処理のなかのcallback関数なのかで例外が発生してThrowable Objectをcatchしないとstreamをcloseするつくりのようだ(ドキュメントにないけどコードを読んだ).

なのでstreamを閉じる前の後処理(ログとか)が必要な場合はchainとともにcatchを書く.

core.async/go vs manifold.go-off

go-offはcore.asyncのgoのconceptをコピーしている.

core.async/goroutine 内ではループする構造はスレッドよりも効率的な軽量スレッドという実行単位になる. go-offはdeferred valueをbinidngされたdefault executorで実行する.

core.async/goとgo-offの違いは, core.async/goはClojureのmain thread poolを利用するのだが, manifoldは自分で定義した or 統計情報を元に構築された固定長スレッドプールを指定できる. なので, 8多重の並列でそれ以上は逐次平行というようなより細かい制御ができる.

go-offでexecutorを引数で指定する関数として, go-off-withという関数もある. go-offはgo-off-withのラッパーマクロで, execute-poolを利用する. これは利用の統計状況もとに構築された固定長スレッドプールなのでcore.asyncよりもやや賢い.

注意するべきところは, take!を利用していればput!したスレッドの延長で動作するところ.

別の違いはエラー処理. goとgo-offの違いは, <!? というシンタックス. 通常goの内部で例外をcatchする記述をしわすれるとnilが返ってなんのエラーが発生したのかわからない. <!?は例外(Throwable)が返るとそれを投げ直す(throwする. その結果をtry-catchでハンドリング).

namespaceが独立している. そして使うにはcore.asyncのinstallも必要.

(require '[manifold.go-off :refer [go-off]])

go-offはdeferred valueを返すので, @すると実行される.

let-flow vs chain

go-offのdocstringにかんたんな解説を発見した.

https://github.com/clj-commons/manifold/blob/master/src/manifold/go_off.clj

deferred/let-flow presumes that every deferrable needs to be resolved. This prevents more complex handling of parallelism or being able to pass deferreds into other functions from within the `let-flow` block.

deferred/chain only works with single deferreds, which means having to write code in unnatural ways to handle multiple deferreds.”

つまり, let-flowは複数のdeferred valueとその合成を前提にしているが, chainはひとつのdeferred valueに対する複数の操作を前提にしている.


let-flowは Barrier pattern.

let-flowはzipのシンタックスシュガーと解釈するとよい. つまり, 全てのdeferredがrealizedされるのを待ち合わせる.

(defn deferred-sum []
  (let [a (call-service-a)
        b (call-service-b)]
    (chain (zip a b) ;; ★ ここ
           (fn [[a b]]
             (+ a b)))))

tips: ある条件に一致したものだけを通すには?

stream/filterをつかう.

(->> (b/subscribe bus :executions)
              (s/buffer 1)
              (s/map :price)
              (s/filter #(= 0 (mod (int %) 2))))

偶数の数しかおしりから出てこないストリームができる.

tips: タイムアウトによってフローの挙動を変更するには?

d/timeout!をつかってtimeoutしたかどうかの変数を定義. d/realized?によってif文で判定する.

(def timeout-d (d/timeout! (d/deferred) 5000 :timeout))
 
(if (d/realized? timeout-d)
    (do-something)
    (do-else))

この方法はちょっと便利. これがないと一時変数にtimestammpを保存して現在時刻と保存時刻の差分で時間経過を判定していた.

Manifold Insights

🤔遅延実行関数合成という時空を超えた決定論的な操作抽象

Manifold chainが面白いのは, 遅延実行関数が連続しているとき.

以下の1行目, 2行目, 3行目はどのタイミングでrealizedされるかは決まっていないものの, 時空を超えて値を順番通りに操作できる. これがデータフロー実行, 遅延実行関数合成の面白さ. 1行目のdがrealizedされるのが10年後だろうが, 2行目のfutureの実行に100万年かかろうが, シンプルなスレッディングマクロで処理をまとめられる. データフロー変数は並行処理をシンプルにする.

(d/chain d
    #(future (inc %))
    #(future (inc %))
    #(println "the future returned" %))

これをすなわち, 📝決定性データフロープログラミングという.

📝決定論

🔗References

See also