📝Clojure: core.async

非同期プログラミングのためのClojureライブラリ.

https://github.com/clojure/core.async

理論としては📝CSPモデルというものが使われている.

デファクトスタンダードなものの, デフォルトではインストールされてないので自分でインストールが必要.

なんかネットで情報を検索してもまとまった情報がないのは少し敷居が高い. 一応メモのおしりに参考ブックマークはまとめた.

Basic Concepts

  • チャネル(channel): 並列に動くタスク間で値を受け渡す通信路
    • バッファを持ったキューのようなもの.
  • 軽量スレッド(協調スレッド)

Basic Syntax(API)

  • chan: channel生成
  • goマクロ: 非同期ブロック生成
    • go-loop: goとloopの糖衣構文マクロ.
      • recurと合わせて利用するがrecurはClojure標準関数.
  • 送信
    • !: channelに非同期で書く

    • !!: channelに同期で書く

  • 受信
    • <!: channelを非同期で読む
    • <!!: channelを同期で読む
  • 待機
    • alts!: 複数チャネルから非同期処理で早い者勝ちで並行で待機する.

チャネル連携

よく, 小さなチャネルをレゴブロックのようにくっつけるという比喩が使われる. イメージで理解したい. レゴブロックというよりプラレールかな?

  • pipe: 一対一
  • mult/tap: 一対多
  • pub/sub: 一対多
  • mix/admin: 多対一

core.async: mult/tap

multはmultipleの略.

通常チャネルに入った値は一つの出口しか取り出せない. multをすると, 複数取り出せるようになるが, 取り出せる出口を指定するにはtapをつかってチャネルを指定する必要がある.

core.async: pub/sub

pubは(pub)licationの略, subは(sub)scriptionの略.

いわゆるPub/Subパターン. core.asyncのchannelのデフォルトの挙動はproducer-comsumerパターンなので, 💡Pub/SubパターンとProd/Consパターンの違いを意識したい. pub/subメソッドよって機能拡張するイメージ.

🤔pub/subはmultimethodの非同期進化系?!


examples

やはり俺のスプレッドシート管理はまちがっている。

問題設定がシンプルかつ, よい事例. ネットワークやファイルのIOをcore.asyncで解決.

バッファ制御

bufferは有限サイズのキューのようなものなので, いっぱいになったときの制御を指定する必要がある.

  • dropping-buffer
    • いっぱいになったら追加しようとした値を切捨て
  • sliding-buffer
    • いっぱいになったら最初に追加した値を切捨て

core.async Topics

💡put!/take!と<!/>!の違い

Asynchronously puts a val into port, calling fn1 (if supplied) when complete.

ref. put! - clojure.core.async | ClojureDocs

go-blockの中で呼ぶ条件はなく, さらにcallback関数を設定できる.

ref. Clojure - Go Block Best Practices

best practiceとしてput!を推奨している?

💡parkingという軽量スレッドの概念

ブロッキング(blocking)はよくきく概念だが, core.asyncではその対概念としてParking(待機)という概念がでてくる. これは, channel読み書き出来ない場合はスレッドを解放して止まっているという状態.

core.asyncのシンタックス的には <!, >!が非同期読み書きのparking待機, >!!, <!!が同期読み書きのblocking待機になる.

以下の記事の(Blocking and Parking)はわかりやすい説明.

https://www.braveclojure.com/core-async/

💡全てのchannelの結果を待ち合わせるには?

alts!/alts!!はいずれかのチャネルの到着を待ち合わせて早いものがちで処理する.

非同期処理が全て終了するまで並行で待機するにはmapvをつかう.

(mapv #(async/<!! %) channels)

clojure - Waiting for n channels with core.async - Stack Overflow

💡core.asyncはGo LangのchannelのClojure版

Goでいうchannelの概念らしいが, Go を知らないのでピンとこない.

ただ, Goと並行処理についての情報はClojureよりも検索で多くみつかる(とくに日本語)ので, 参考になる.

💡reloadable core.async

Reloaded Workflowをcore.asyncと組み合わせようとした時, 閉じたチャネルからloopが無限にnilを取り出し続けることを回避するTips.

ポイントは, channelかreadしたmsgを受け取るときにif-let, when-letをつかってnil checkをするところ.

(go-loop []
  (if-let [msg (<! incoming-msgs)]
    (do
      (println "Received " msg)
      (recur))
    (println "Done.")))

マリアナ海溝よりニッチすぎる落とし穴かもしれないが, Emacs CIDERでintegrant.repl とasync/go-loopを連携させたときに標準出力がREPLに表示されなかった.

いろいろ調べたらこれはCIDERが悪そう. core.async関係ないじゃないか… とハマってブチ切れている誰かのためのメモ.

💡cider-ns-refresh-show-logのワナ

💡atomの状態をcore.asyncでupdate

正しい方法かわからないけどコードリーディングメモ.

https://github.com/weissjeffm/gdax-bot/blob/master/src/coinbase_api/core.clj#L61

(defn keep-current-price-updated [match-ch price-atom]
  (a/go-loop []
    (when-let [match (a/<! match-ch)]
      (swap! price-atom assoc (:product_id match)
             (-> match :price read-string))
      (recur))))

引数としてatomとchannelをもらって, channelから新しい値が到着するたびにatomをupdateする. こういう小さなupdate処理がgo-loopで作成した軽量スレッドでCPUが暇なときに実行できるのはいいかもしれない.

またatomやchannelの受取はintegrantとかに連携をまかせてもいいかもしれない.

Clojure: agent がatomよりも適しているかもしれないがどうなんだろう. atomとagentの違いは更新が同期か非同期かという違いがある. しかしgo-blockのなかでは更新をキックするかどうかが非同期なので, その先の同期/非同期はそこまで問題ではないかもしれないし非同期処理の延長の同期という意味でatomがいいのかも. go-blockをつかわないならばagentも検討の余地あり.

💡core.asyncにpeekはない

core.asyncにはchannelの先頭をみるようなAPIはない. take!してみるしかない(peek(queue))


複数の処理で先頭を参照したいときは, core.async: pub/subをつかうのか?

💡atomの状態をcore.asyncでupdate

これはよくないかも. とくに大きなデータ構造の場合はClojureはpersistentなため, atomをupdateするたびにcopyが作成されるのでメモリ効率は悪い. 並列スレッドでの変数共有ならいい.

ループ停止用のstop channelを導入

go-loopのベストプラクティス. stop channlesというものを導入する.

ref. Reloadable core.async code

alts!を使って, 処理したいchannel とともにこのstop channelからの情報もwatchして, 条件分岐でstop-chからの情報を受け取ったらloopを抜ける. 以下, ブログ記事からの引用スニペット.

(defn fetch-msgs
  [stop-ch incoming-ch]
  (go
    (loop []
      (let [msg (transport/recv-msg
                 {:maxWait 200 :queue "system/incoming"})]
        (when msg
          (let [[_ ch] (alts! [[incoming-ch msg] stop-ch])]
            (condp = ch
              incoming-ch (recur)
              stop-ch :complete)))))
    (println "fetch-msgs stopping.")))

安全なloopの停止とともに, デバッグでも活用できる.

core.asyncとスレッド名

推測含む. async-dispatch-n というのがgoroutineで動作しているスレッド.

core.async Insights

🤔atom vs (async/chan 1)比較

(async/chan 1)とはbufferingしないということ. chanのサイズを引数で指定できるが, これがsize=1ということはバッファリングしないということを意味する.

一方atomは単一の値を保持する状態. atomとchannelを比較してみる.

atomは状態であるため, そこに保存してある情報を何度でもreadすることができる. 一方channelは, 一回取り出したらそれを何度も参照するためには取り出した値をキャッシュしておく必要がある. キャッシュするということはもはや状態である.

channelから送られてくる最新情報を取得しようとたたときに, 最新情報がなかなか到着しないとキャッシュしてある情報を何度も参照する利用シーンが想定される. このときatomが必要となる.

まとめると, 何度も参照するならばatom, 1度しか参照しないならばchannelをつかう.

🤔channel closed?を確認するには

ドキュメントがない隠し機能なのでこれが正しいのかはわからない.

@(.closed ch)

🤔producer-consumer patternとcore.async

わたしの拙い記憶だと, 非同期処理をJava やPythonでうまく処理するには, 同期キューを用いたProducer-Consumer Patternをつかうのが鉄板だが, それをcore.asyncをつかうことによってかんたんに実装できる(素晴らしい).


一応過去記事をみると共有メモリ方式ではなく📝決定性データフローモデル?(忘れた).


2010年のまだ, core.asyncがなかった時代にblocking queueでchannelの概念を実装したという記事.

Channels in Clojure · Dave Jack

🤔core.asyncにおける登場人物は末尾再帰するなにか

並行プログラミングの課題は複数のスレッドが協調しながら非同期でやり取りする仕組みがイシューとなる.

しかし, Clojureの, さらにいえばcore.asyncの話題のなかではThreadは本物のスレッドではなく, 軽量スレッドというものになる. これはThreadとはちょっと違うwhile-loopするなにか.

(async/go-loop []
  (do-something!)
  (recur))

そうすると, 今まで並列プログラミング=スレッドプログラミングだと思っていた自分の中の思い込みが破れることになる. これはこれで面白いことだ.

loopするなにかを深堀りしていきたい.

🤔pub/subはmultimethodの非同期進化系?!

Clojure: pub/sub

pubの関数呼び出しにおいてキーワードが使われるのが混乱する. キーワードの実体は関数である. (get m :key)の省略表現. そしてsubの関数の引数はキーワード.

  • (pub ch topic-fn)
  • (sub p topic ch)

このややこしさはどこかで経験している. Clojure マルチメソッドのdefmultiとdefmethodの関係にとても似ていることに気づいた.

もっと見方をかえてしまえば, もはやpub/subはマルチスレッドの非同期の進化系である. さらにいえば, multimethodとは条件分岐をクールに進化させた見方があるので, pub/subとは条件分岐をクールに非同期化したものと見える.

channelはmultしないと一つの情報は一つしか取り出せない

これらを両方評価したあとにREPLからchannelに値を入れると, 交互に動く.

(a/go-loop [] (prn (a/<! in-ch)) (recur))
(a/go-loop [] (println (a/<! in-ch)) (recur))

REPLで同期送信がハングしたときの回避方法

Emacs 限定?? a/>!!でハングしたときの回避方法はこちら.

✅REPLで無限ループを評価してハングからの復旧(cider-interrupt)

🔗Refenreces

Online

Books

わたしが愛読しているプログラミングClojureにはcore.asyncの話題は載ってない, Joy of Clojureにも..

Movies

See also