📝Clojure: alephとは

非同期通信のためのライブラリ.

📝ManifoldのStreamをBaseにした並行制御とNettyによる非同期通信をBaseにしている(JavaのNettyライブラリラッパー).

参考記事.

Aleph HTTP Client

Clojure: Ringのプロトコル及び, clj-httpの拡張オプションに準拠. READMEによると, HTTP通信はclj-httpのmimicを目指す. mimicとはなんだ?調べた模倣という意味らしい.

ドキュメントに乏しいのだが, requestのパラメータでなにを指定するか迷ったらring のdocを見ろということ.

同期リクエスト

http/getでリクエストを出すと, Java InputStreamが返ってくるのでそれをstringに変換してつかう.

(require '[aleph.http :as http])
(require '[clj-commons.byte-streams :as bs])
(-> @(http/get "https://google.com/")
    :body
    bs/to-string)

非同期リクエスト

d/chainと連携させると get requestからのコールバック関数を登録しつつ, 呼び出し元スレッドは別動作できるような非同期リクエストができる. d/chainはdeferred valueを変えすので, @で読み出さない限り呼び出し元をblockingはしない.

@(d/chain (http/get "https://google.com/")
          :body
          bs/to-string)

関数コールをした時点でデフォルトで非同期リクエストが走っているように見える. 例えばhttp/postを出すと, 関数は即時評価され, derefで読み出さなくても相手側にリクエストは届いていた(経験則). なので突き放し処理はあえてderefでdeferred valueを確認しなくても, ログをするような関数をコールバックに登録しておけばいい.

Aleph Websocket Clinet

クライアント側のWebsocket実装例.

(def ws-url "wss://ws.lightstream.bitflyer.com/json-rpc")
(def sock @(websocket-client ws-url))
 
(let [msg
      {:method "subscribe"
       :params {:channel "lightning_ticker_FX_BTC_JPY"}}]
  (s/put! sock (generate-string msg)))

aleph Topics

aleph接続きれる問題

どうもalephには途中で接続が途切れるようなことが見受けられる.

][Aleph - can it handle client disconnect? : Clojure]]

このredditには作者本人登場してworkaroundを解説.

Managing Websocket Stability issues with Aleph? : Clojure

clj-slack-clientでのreconnect例.


senteを使えという意見も? 推測だが, websocketがそういうものでありそれを改良したものがSocket.IOなのかも(socket.ioのclojure libraryを探し中).

データが到着するとManifoldのd/success!が呼ばれる?

ライブラリを呼び出すとストリームが返される. 内部設計の仕組みとして, データが到着すると, success!をすることによってstream(deferred valuesのリスト)に値を放り込んでいるように見えた.

tips: socketからのデータ受信時の異常を捕まえる

go-offと<!?を活用する.

socket中での例外が発生すると, <!?を活用することでdeferred された例外は取り出すときに投げ直してくれるので, try-catchで捕まえることが出来る.

(defmacro go-loop* [s & callback]
  `(go-off
    (try
      (loop []
        (if-let [msg# (<!? ~s)]
          (do
            (~@callback msg#)
            (recur))
          (s/close! ~s)))
      (catch Exception e#
        (log/error "Exception occured, " (print-stack-trace e#))
        (s/close! ~s)))))

tips: シーケンスを待ち合わせて次に進める

d/chainでシーケンスを定義したときに, 終わりに(fn [_] ws)みたいな処理挟むと, chainで定義した一連のシーケンス処理を完了させたらwsを返すみたいな処理が書ける.

(d/chain ws
         (lib/auth! ex ws)
         (fn [ws] (println "authed"))
         (fn [_] ws))

これによりchainの一連の処理のおしりで先頭に入れたdeferred valueが戻り, この処理と別の処理をchainすることができる. たいていalephを使うときは外部との通信, つまり副作用が発生するので, その待ち合わせ処理をデータフローで上手くハンドリングできる.

tips: 無限ループ実装

s/periodicallyをつかう. これで1秒ごとに1を生成するstreamが生成されるため, loopやs/consumeでtakeし続ける.

periodicallyの第2引数にmsecをいれると, initial-delayを設定できる. そうでなければ, stream生成してすぐに動き出す.

(def s (s/periodically 1000 (constantly 1)))
 
(s/consume (fn [x] (prn x)) periodic-s)
(s/close! periodic-s)
 
;; initial delay設定
(def s2 (s/periodically 1000 3000 (constantly 1)))

tips: メモリリーク調査

jvmの起動オプションで, Dio.netty.leakDetection.levelを設定する. leak検出でERRORログがでる.

-Dio.netty.leakDetection.level=advanced

ref. https://netty.io/wiki/reference-counted-objects.html#leak-detection-levels

memo: websocketがCPU負荷により切断される

けっこう調査に時間がかかった問題のメモ. alephのwebsocket接続が数時間で切れることが続いた. 調べてみるとCPU負荷が100%になるタイミングでaleph-netty-event-pool-xというスレッドが死んでいた.

さらに調べるとCPU負荷は1秒毎にガベージコレクションが動き続けていることが原因だった. ガベージコレクションによってメモリ領域がまったく解放されずにGCが動き続けていた.

メモリの使用量はプログラムを動作させるとともに増加していった. そしてその原因は, 無限シーケンスに値を追加し続けていたり, atomをloopのなかで使っていたりしたことによるものだった.

ココで驚いたことは, alephもmanifoldも関係なく, たんなるメモリ管理が原因だったという話.