Stormのテストコード実行

テストの実行

Stormのビルドはmaven-clojure-pluginを利用して行われるようになったが、以下のように特定のユニットテストだけを実行することができない様子。

  $ mvn test -Dtest=TestHoge  

REPLを開いてclojure.test/run-testを実行すると、特定のテストモジュールだけを実行することができる。

  $ cd storm-core
  $ mvn compile
  $ mvn clojure:repl
  user=> (use 'clojure.test)
  user=> (require 'backtype.storm.supervisor-test)
  user=> (run-tests 'backtype.storm.supervisor-test)

また、deftestで定義された関数を直接呼び出せば、特定のテストケースだけを実行することもできる。

  user=> (use 'backtype.storm.supervisor-test :reload-all)
  user=> (launches-assignment)

ソースコードを修正した後、useを:reload-all付きで呼び出せば、その場で反映させることができるため、print文デバッグをするときにも便利。

時間の制御

backtype.storm.utile.Timeで以下のようなロジックが定義され、Stormのソースコード中でThread.sleepやSystem.currentTimeMillisの代わりに利用されている。simulatingの値がtrueの場合には、advanceTimeを呼び出さないと時間が進まない状態になるが、これをテストコードの中でタイミングを制御したり、短時間で時間が経過した状態を模擬するために利用している。

    public static void sleep(long ms) throws InterruptedException {
        sleepUntil(currentTimeMillis()+ms);
    }

    public static long currentTimeMillis() {
        if(simulating.get()) {
            return simulatedCurrTimeMs.get();
        } else {
            return System.currentTimeMillis();
        }
    }

    public static int currentTimeSecs() {
        return (int) (currentTimeMillis() / 1000);
    }

    public static void advanceTime(long ms) {
        if(!simulating.get()) throw new IllegalStateException("Cannot simulate time unless in simulation mode");
        simulatedCurrTimeMs.set(simulatedCurrTimeMs.get() + ms);
    }

simulatingがtrueの状態で処理を実行するために、with-simulated-timeというマクロが利用されている。

  (defmacro with-simulated-time
    [& body]
    `(do
       (start-simulating-time!)
       (let [ret# (do ~@body)]
         (stop-simulating-time!)
         ret#)))

以下はテストコードの中で使われる場合の例。with-simulated-time-local-clusterの内側ではsimulatingがtrueで、advance-cluster-timeを呼び出すと、時間が進む。

  (deftest heartbeats-to-nimbus
    (with-simulated-time-local-cluster [cluster :supervisors 0
      :daemon-conf {SUPERVISOR-WORKER-START-TIMEOUT-SECS 15
                    SUPERVISOR-HEARTBEAT-FREQUENCY-SECS 3}]
      (letlocals
        (bind sup1 (add-supervisor cluster :id "sup" :ports [5 6 7]))
        (advance-cluster-time cluster 4)
        ...

ロジックの置き換え

with-var-rootsというマクロを利用して、一時的に特定の関数を置き換えることができる。

  (defmacro with-var-roots
    [bindings & body]
    (let [settings (partition 2 bindings)
          tmpvars (repeatedly (count settings) (partial gensym "old"))
  	vars (map first settings)
          savevals (vec (mapcat (fn [t v] [t v]) tmpvars vars))
          setters (for [[v s] settings] `(set-var-root ~v ~s))
          restorers (map (fn [v s] `(set-var-root ~v ~s)) vars tmpvars)]
      `(let ~savevals
         ~@setters
         (try
           ~@body
           (finally
             ~@restorers)))))

以下は利用例。テストコードの中で、common/storm-task-infoとnimbus/compute-new-topology->executor->node+portを置き換え、nimbusのタスク割り当てロジックをバイパスして、引数として指定された値がそのまま割り当て内容としてセットされるようにしている。

  (defn submit-mocked-assignment
    [nimbus storm-name conf topology task->component executor->node+port]
    (with-var-roots [common/storm-task-info (fn [& ignored] task->component)
                     nimbus/compute-new-topology->executor->node+port (mocked-compute-new-topology->executor->node+port
                                                                        storm-name
                                                                        executor->node+port)]
                    (submit-local-topology nimbus storm-name conf topology)))

HTraceによるHBaseのトレーシング

(この記事は、Hadoop Advent Calendar 2013の10日目の記事です)

並列分散処理の解析

HadoopやHBaseのような並列分散環境では、処理が多数のノードをまたがって実行されるために、トレーシングや性能のプロファイリングが難しくなります。各ノードで動作するサーバプロセスのロジックでは、他のノードにRPCで処理を依頼して結果を非同期に待つというパターンが多いため、CPUやネットワークのディスクのI/O帯域といったリソースを使い切る状況にはなりにくく、oprofileのようなプロファイラを利用して測定してみても、ボトルネックの手がかりがなかなか見つかりません。
そのような並列分散処理のトレーシングに利用しているツールとしてGoogleが発表したのがDapperで、例によってそれを参考にして作られたオープンソース実装としてHTraceがあります。HBaseのtrunkにはHTraceを利用してトレーシングを行う機能(HBASE-6449)が入っています。
例えば、HBaseシェルで以下のように実行してみます。

> trace 'start'
> create 'test', 'f'
> trace 'stop'

これによって取得されたトレーシング情報により、以下の画像にあるような情報をみることができます。

「クライアントがHMasterのテーブル作成のためのAPIを呼び出し、HMasterがその処理のためにHRegionServerのAPIを呼び出す」というような複数のノードをまたがる処理の流れと、要した時間を分かりやすく見ることができます。テーブルのcreateに続けて、putを実行した場合のトレーシング情報は、以下の画像のようになります。

> trace 'start'
> put 'test', 'row1', 'f:', 'val1'
> trace 'stop'

このグラフ表示自体はHTraceのもっている機能ではなく、HTraceと同様にDapperを参考にしてTwitter社で開発されたZipkinというツールを利用して表示したものです。HTraceには、取得したトレーシングデータをZipkinの形式に変換して送信するためのモジュールが付属しています。
HDFS-5274では、HDFS層にもHTraceによるトレーシング機能を追加するというパッチが既に開発中で、「HLogのhsyncの部分に妙に時間がかかっている」というような、HBaseの先にあるHDFS層も含めた解析ができるようになるものと期待されます。

トレーシングの実装

HTraceを利用したトレーシングは、以下のような使いやすい特徴を持っているというふれこみです。

  • 追加でトレーシング用のモジュールをインストールする必要がない
  • トレーシングを利用するために設定を変更してサーバを再起動する必要がない
  • トレーシング時の性能への影響が小さい

どのようにしてこれを実現しているのか、HBaseに追加されたトレーシング用のコードを簡単に確認してみます。
クライアント側のコードには、RPCを実行する際にサーバに渡すデータに、トレーシング用のデータを埋め込むためのコードが入っています。このコードは、クライアントアプリケーションがトレーシングを実行するためのAPIを呼び出した結果として、「トレーシング区間」にいる場合のみ実行されるような条件分岐の中にあります。
o.a.h.hbase.ipc.RpcClient#writeRequest::

    protected void writeRequest(Call call) {
      if (shouldCloseConnection.get()) return;
      try {
        RequestHeader.Builder builder = RequestHeader.newBuilder();
        builder.setCallId(call.id);
        if (Trace.isTracing()) {
          Span s = Trace.currentSpan();
          builder.setTraceInfo(RPCTInfo.newBuilder().
            setParentId(s.getSpanId()).setTraceId(s.getTraceId()));
        }

RPCで渡されるトレーシング情報は、呼び出しの親子関係を判断するためのIDだけという、小さなものです。
Tracing.proto::

message RPCTInfo {
  optional int64 trace_id = 1;
  optional int64 parent_id = 2;
}

これがリクエストヘッダのoptionalなフィールドとして定義されています。
RPC.proto::

message RequestHeader {
  // Monotonically increasing call_id to keep track of RPC requests and their response
  optional uint32 call_id = 1;
  optional RPCTInfo trace_info = 2;
  optional string method_name = 3;
  // If true, then a pb Message param follows.
  optional bool request_param = 4;
  ...

RPCで呼ばれた処理をサーバ側で実行するための共通部品にも、リクエスト中にトレーシング情報が存在すれば、トレーシング用のロジックを初期化するコードがあります。これによってサーバサイドでの処理の実行時刻についての情報が取得されます。また、サーバ側のロジックがさらにRPCで別ノードの処理を呼び出す場合にも、上記のトレーシング用呼び出し元情報(RPCTinfo)がリクエストに付加されて下流に伝わっていくことになります。
o.a.h.hbase.ipc.CallRunner#run::

      try {
        ...
        if (call.tinfo != null) {
          traceScope = Trace.startSpan(call.toTraceString(), call.tinfo);
        }
        ...
      } finally {
        if (traceScope != null) {
          traceScope.close();
        }

HBaseの各サーバプロセス(HMasterやHRegionServer)内では、トレーシング情報を保存するためのReceiverと呼ばれるモジュールが動作しています。このReceiverが、各ノードのトレーシング情報をファイルに書き出すなり、Zipkinに送信するなりします。トレーシング情報は、プロセス内のキュー経由でReceiverに渡されて非同期に処理されるので、性能に影響を与えにくくなっています。
o.a.h.hbase.regionserver.HRegionServer::

    private SpanReceiverHost spanReceiverHost;
    ...
    spanReceiverHost = SpanReceiverHost.getInstance(getConfiguration());

さらに、トレーシングを実行する頻度や契機を制御するための、Samplerというモジュールをトレーシングの際に利用することができます。

    public static TraceScope startSpan(String description, Sampler<TraceInfo> s, TraceInfo tinfo) {
       Span span = null;
       if (isTracing() || s.next(tinfo)) {
         span = new MilliSpan(description, tinfo.traceId, tinfo.spanId,
             random.nextLong(), Tracer.getProcessId());
       }
       return continueSpan(span);
     } 

まとめると、小さなトレーシング情報をRPCで受け渡されるデータのオプショナルなフィールドとして定義し、トレーシング情報が付いているリクエストが来た時だけトレーシングのための条件分岐に入るという作りが、使いやすさとオーバーヘッドの小ささを実現するポイントとなっているようです。

HTraceを使ってみる

ここでは参考として、実際にHTraceとZipkinを動かしてトレーシング情報を見てみるための手順を紹介します。以下の手順は、Amazon EC2RHELインスタンス(RHEL-6.4_GA-x86_64-10-Hourly2 (ami-5769f956))を利用して試した際のものです。
HTraceが追加されているのは0.96系以降のバージョンなので、trunk版のソースコードをビルドして実行します。まず、ビルドに必要なツール類をインストールします。(ここでは手っ取り早くRHELに付属のOpenJDK 7を利用していますが、実際の運用にはまだOracle JDKのほうが無難だと思います。)

$ sudo yum install git gcc gcc-c++ java-1.7.0-openjdk-devel

$ tar zxf apache-maven-3.0.5-bin.tar.gz
$ export PATH=/opt/apache-maven-3.0.5/bin:$PATH

$ wget http://protobuf.googlecode.com/files/protobuf-2.5.0.tar.bz2
$ tar jxf protobuf-2.5.0.tar.bz2 
$ cd protobuf-2.5.0
$ ./configure && make
$ sudo make install

次に、HBaseのソースコードをチェックアウトして、以下のdiffのように一部のファイルを修正します。

$ git clone https://github.com/apache/hbase
$ cd hbase
$ vi conf/hbase-site.xml 
$ vi pom.xml 
$ vi hbase-server/pom.xml 
$ git diff
diff --git a/conf/hbase-env.sh b/conf/hbase-env.sh
index 91aca76..2597970 100644
--- a/conf/hbase-env.sh
+++ b/conf/hbase-env.sh
@@ -26,7 +26,7 @@
 # into the startup scripts (bin/hbase, etc.)
 
 # The java implementation to use.  Java 1.6 required.
-# export JAVA_HOME=/usr/java/jdk1.6.0/
+export JAVA_HOME=/usr/lib/jvm/java-1.7.0-openjdk.x86_64
 
 # Extra Java CLASSPATH elements.  Optional.
 # export HBASE_CLASSPATH=
diff --git a/conf/hbase-site.xml b/conf/hbase-site.xml
index c516ac7..51b98aa 100644
--- a/conf/hbase-site.xml
+++ b/conf/hbase-site.xml
@@ -21,4 +21,8 @@
  */
 -->
 <configuration>
+  <property>
+    <name>hbase.trace.spanreceiver.classes</name>
+    <value>org.cloudera.htrace.impl.ZipkinSpanReceiver</value>
+  </property> 
 </configuration>
diff --git a/hbase-server/pom.xml b/hbase-server/pom.xml
index f5723b2..555488b 100644
--- a/hbase-server/pom.xml
+++ b/hbase-server/pom.xml
@@ -447,6 +447,10 @@
       <groupId>org.cloudera.htrace</groupId>
       <artifactId>htrace-core</artifactId>
    </dependency>
+    <dependency>
+      <groupId>org.cloudera.htrace</groupId>
+      <artifactId>htrace-zipkin</artifactId>
+    </dependency>
   </dependencies>
   <profiles>
     <!-- Skip the tests in this module -->
diff --git a/pom.xml b/pom.xml
index 77c97b7..f785c4c 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1353,6 +1353,11 @@
         <artifactId>htrace-core</artifactId>
         <version>${htrace.version}</version>
       </dependency>
+      <dependency>
+        <groupId>org.cloudera.htrace</groupId>
+        <artifactId>htrace-zipkin</artifactId>
+        <version>${htrace.version}</version>
+      </dependency>
     </dependencies>
   </dependencyManagement>
   <!-- Dependencies needed by subprojects -->

書き換えたら、HBaseをビルドします。

$ mvn package -DskipTests

ビルドが終わったら、そのままソースツリーの中からサーバを実行することができます。デフォルトの設定の場合、ZooKeeperサーバもHMasterもHRegionServerも全部、1つのプロセス中で実行するローカルモードで起動します。

$ bin/hbase master start

つぎに、トレーシング情報を表示するために利用するZipkinをダウンロードします。

$ git clone https://github.com/twitter/zipkin
$ cd zipkin

Zipkinはソースツリーを取得してコマンドを実行すると、その場でまずビルドが行われ、サーバプロセスが起動してきます。とりあえずターミナルを3つ開いて、以下のコマンドを実行します。

(ターミナル1)$ bin/collector
(ターミナル2)$ bin/query
(ターミナル3)$ bin/web

準備が整ったら、HBase Shellを起動し、トレーシングを実行してみます。

$ bin/hbase master start
> trace 'start'
> create 'test', 'f'
> trace 'stop'

ZipkinのWeb UIはデフォルト設定では8080番ポートで待ち受けているため、Webブラウザでそこにアクセスして、トレーシング情報を確認します。

ローカルモードで確認できたら、HBaseを疑似分散モードで起動するように設定変更して試してみるとよいでしょう。上の方で貼付けた画像は、疑似分散モードで試した際のものです。

Stormのソースコードを読むためのメモ

環境

GNU Global

GNU GLOBALはソースコードタギングツール。Emacs/Vimと組み合わせて使う。対応しているプログラミング言語はC, C++, Yacc, Java, PHP4 and assemblyだけだが、タグテーブル作成にctagsを利用するモードを使うとかなりの言語をカバーできる。
StormはプリミティブなパーツをJavaで定義し、それらをClojureでサーバデーモンとして組み上げ、Pythonスクリプトを利用して起動するという感じで言語を併用しているが、それらのソースコード間を横断的にタグジャンプしていくことができる。同じように、スクリプト言語Java、各種JVM言語を組み合わせて利用するプロダクトは多いので、応用しやすい気がする。
また、gtagsでは探したい文字列パターンを探して候補を全部表示してくれるので、Javaソースコードの関数定義を探すときにも、インタフェースだけではなく各実装の定義部分に直接飛べて便利だと感じる。
インストール方法は以下。

$ wget http://tamacom.com/global/global-6.2.9.tar.gz
$ tar zxf global-6.2.9.tar.gz
$ cd global-6.2.9
$ ./configure --prefix=/usr/local && make && sudo make install

インストールしたら、ホームディレクトリに設定ファイルをコピーし、修正する。Exuberant Ctagsを使う場合の設定のところで、Lispに対応するファイルの拡張子として.cljを追加する。

$ cp /usr/local/share/gtags/gtags.conf ~/.globalrc
$ vim ~/.globalrc
$ diff /usr/local/share/gtags/gtags.conf ~/.globalrc
77c77
<       :langmap=Lisp\:.cl.clisp.el.l.lisp.lsp:\
---
>       :langmap=Lisp\:.cl.clisp.el.l.lisp.lsp.clj:\

ソースツリーのトップでgtagsコマンドを実行し、タグテーブルを作成する。--gtagslabelオプションでctagsを使うモードを指定する。

$ git clone https://github.com/nathanmarz/storm.git
$ cd storm
$ gtags --gtagslabel=exuberant-ctags

すると、ソースコードのファイル名リストとタグテーブルが作成される。

$ ls G*
GPATH  GRTAGS  GTAGS
Emacs

Emacsからgtagsを利用するために、.emacsに以下のような内容を追加する。clojure-modeについては後述。

(setq load-path (cons "/usr/local/share/gtags" load-path))
(setq gtags-suggested-key-mapping t)
(setq gtags-path-style 'relative)
(when (locate-library "gtags") (require 'gtags))
(add-hook 'c-mode-hook '(lambda () (gtags-mode 1)))
(add-hook 'c++-mode-hook '(lambda () (gtags-mode 1)))
(add-hook 'java-mode-hook '(lambda () (gtags-mode 1)))
(add-hook 'perl-mode-hook '(lambda () (gtags-mode 1)))
(add-hook 'python-mode-hook '(lambda () (gtags-mode 1)))
(add-hook 'ruby-mode-hook '(lambda () (gtags-mode 1)))
(add-hook 'clojure-mode-hook '(lambda () (gtags-mode 1)))
(add-hook 'erlang-mode-hook '(lambda () (gtags-mode 1)))

タグテーブルを作成済みの状態で、gtags-modeをONにした各プログラミング言語用モードを開くと、自動的にタグテーブルを見つけてくれる。以下のようなキーバインディングを利用して、gtagsの機能を利用する。

M-.
タグテーブルからシンボル名の定義を検索
C-c g
ソースコードファイルから文字列をgrepで検索
M-*
ひとつ前のバッファに戻る。

カーソルを探したいシンボル名の上にもっていき、 M-. や C-c g を押すと、そこにあるシンボル名を検索することができる。(ミニバッファで手で入力することもできる)。キーを押すと、自動的に候補となるファイル名と該当行の一部が一覧表示されるので、そのなかからお目当てと思しき行を選んでリターンキーを押すと、そこを開くことができる。
特にClojureソースコードの場合、関数の定義場所をタグテーブルでみつけられないことが多いので、grepを多用することになる。gtags-modeの中からgrepを使うと、(GPATHを利用して)ソースコードだけを対象として検索することができるので、意外と速くて便利。
たぶんVimでも同じようなことができるはず。

clojure-mode

EmacsClojureモード。githubから取得する。

$ cd
$ git clone https://github.com/clojure-emacs/clojure-mode

clojure-mode用の設定として、.emacsに以下の設定を追加する。

(setq load-path (cons "~/clojure-mode" load-path))
(require 'clojure-mode)
(add-hook 'clojure-mode-hook
          '(lambda () (setq gtags-symbol-regexp "[A-Za-z_][A-Za-z_0-9\-\!\?]*")))

gtags-modeで現在のカーソル位置にあるシンボル名を検索しようとしても、Clojureの単語をハイフンでつないだ関数名がシンボルパターンとして認識されないので、パターンを修正をしている。
たぶんVimでも同じようなことができるはず。

予備知識

依存プロダクト

ストームが利用しているパーツであるところの、JZMQ、LMAX DisruptorやNetflix Curatorのツールがどんなものか知っておくとよい。これらのAPIを呼び出す部分から先はとりあえずブラックボックスとしておく。

JZMQ
ZeroMQのJavaバインディング
LMAX Disruptor
Worker内のスレッド間キュー
Netflix Curator
ZooKeeperのラッパー
Thrift

おなじみのRPCのフレームワーク。StormではおもにクライアントがNimbusにアクセスするためのAPIのために利用されている。
storm-core/src/storm.thriftにthrift固有の文法でデータ型とサービスが定義されていて、storm-core/src/genthrift.shをサーバ/クライアントのコードが生成される。生成されたコードはstorm-core/src/jvm/backtype/generatedにあるが、これは人間が読んでもあまり意味はない。

misc
  • ディレクトリ構造はcljとjvmの2つに分かれている。cljにはClojureの、jvmにはJavaのコードが置かれている。
  • パッケージ名のbacktypeはStorm開発者が所属していた会社の名前。Twitter社に買収された。
  • Clojureで分からないことがあったら、チートシートで調べるとよい。
  • clojureの関数の定義が見つけにくいことがある。
    • defnだけではなくdefnkとかdefserverfnみたいなマクロを作って利用していることもある。
    • letのところで関数を返す関数を呼び出してバインドしていることも多い。
  • letのbinding-form部分はパターンマッチングができるので、単純なシンボル名だけが書かれているとは限らない。
  • reifyはJavaの無名クラスと同じような感じで、インタフェースの実装しつつインスタンスを返す。
  • Clojureのコードはマクロが絡むとすこし難しいので、最初は無理に追わなくてもよいのでは。

RabbitMQ tutorialsのErlangクライアントのサンプルコードを1行ずつ実行する

https://github.com/rabbitmq/rabbitmq-tutorials/tree/master/erlangにあるやり方で、Erlangクライアントのサンプルはすぐ動かせる。

  $ git clone https://github.com/rabbitmq/rabbitmq-tutorials
  $ cd rabbitmq-tutorials/erlang
  
  $ wget http://www.rabbitmq.com/releases/rabbitmq-erlang-client/v2.7.0/rabbit_common-2.7.0.ez
  $ unzip rabbit_common-2.7.0.ez
  $ ln -s rabbit_common-2.7.0 rabbit_common
  
  $ wget http://www.rabbitmq.com/releases/rabbitmq-erlang-client/v2.7.0/amqp_client-2.7.0.ez
  $ unzip amqp_client-2.7.0.ez
  $ ln -s amqp_client-2.7.0 amqp_client

サーバはもう起動しているものとして、以下を実行する。

  $ ./send.erl
   [x] Sent 'Hello World!'
  
  $ ./receive.erl
   [*] Waiting for messages. To exit press CTRL+C
   [x] Received <<"Hello World!">>
  ^C

このサンプルコードをerlang shellから1行ずつ対話的に実行したいが、.hrlファイルを-include_libする部分はシェルからでは実行できない。

#!/usr/bin/env escript
%%! -pz ./amqp_client ./rabbit_common ./amqp_client/ebin ./rabbit_common/ebin

-include_lib("amqp_client/include/amqp_client.hrl").

そこで、カレントディレクトリにuser_default.erlというファイルを作り、以下のような内容を記述する。

  $ cat <<EOF > user_default.erl
  -module(user_default).
  -compile(export_all).
  -include_lib("amqp_client/include/amqp_client.hrl").
  EOF

user_default.erlをコンパイルしておくと、次にerlang shellを起動したときに、user_defaultが自動的にロードされる。

  $ erlc user_default.erl

必要なオプションを与えてerlang shellを起動し、tutorialsのコードの内容を1行ずつ実行してみる。

  $ erl -pz ./amqp_client ./rabbit_common ./amqp_client/ebin ./rabbit_common/ebin
  ...
  1> {ok, Connection} =
  1>         amqp_connection:start(#amqp_params_network{host = "localhost"}).
  {ok,<0.46.0>}
  2> {ok, Channel} = amqp_connection:open_channel(Connection).
  {ok,<0.59.0>}

RabbitMQの挙動の調べかた

Erlangのお勉強も兼ねて。

まず、Erlangをインストールする。

$ sudo apt-get install erlang

ソースコードを読むとき用の仕込みとして、.emascに以下のような記述を追加する。

(setq load-path (cons "/usr/lib/erlang/lib/tools-2.6.7/emacs " load-path))
(setq erlang-root-dir "/usr/lib/erlang")
(require 'erlang-start)
(add-hook 'erlang-mode-hook '(lambda () (gtags-mode 1)))

rabbitmq-serverのビルドはドキュメントにあるとおりにやればできた。

$ hg clone http://hg.rabbitmq.com/rabbitmq-codegen
$ hg clone http://hg.rabbitmq.com/rabbitmq-server
$ cd rabbitmq-server
$ make

exuberant-ctagsがErlangに対応していたので、ソース読み用にそれを利用する。

$ gtags --gtagslabel=exuberant-ctags

makeのrunターゲットを実行すると、serverが起動する。データファイルやログは/tmp下に出力される。

$ make run

起動したerlang shellで関数を実行することで、状態を確認できる。

1> regs().
...(省略)
rabbit                <0.103.0>    application_master:start_        45055    0
rabbit_alarm          <0.146.0>    gen_event:init_it/6                704    0
rabbit_alarm_sup      <0.145.0>    supervisor:rabbit_restart           99    0
rabbit_amqqueue_sup   <0.188.0>    supervisor2:init/1                  98    0
rabbit_direct_client_ <0.200.0>    supervisor2:init/1                  54    0
rabbit_disk_monitor   <0.150.0>    rabbit_disk_monitor:init/      2734286    0
rabbit_disk_monitor_s <0.149.0>    supervisor:rabbit_restart           99    0
rabbit_event          <0.142.0>    gen_event:init_it/6                609    0
rabbit_event_sup      <0.141.0>    supervisor:rabbit_restart           99    0
rabbit_guid           <0.174.0>    rabbit_guid:init/1                 110    0
rabbit_guid_sup       <0.173.0>    supervisor:rabbit_restart         1002    0
rabbit_log            <0.144.0>    rabbit_log:init/1                  851    0
...(省略)

トレースのためのモジュールを呼び出してみる。dbg:tpはトレース対象の指定で、モジュール、関数、アリティ、引数、返り値などのパターンを、match_specにしたがって指定できる。また、dbg:pはどのプロセスをトレース対象とするかの指定で、allだと全部。詳細はdbgのドキュメントで確認できる。

1> dbg:start().
{ok,<0.206.0>}

2> Tracer = dbg:tracer().
{ok,<0.206.0>}

3> Tracer.
{ok,<0.206.0>}

4> dbg:tp(rabbit_variable_queue, '_', []).
{ok,[{matched,rabbit@x121e,36}]}

5> dbg:p(all, c).

トレースを仕掛けた状態で、別コンソールからチュートリアルをダウンロードして実行してみる。

$ cd ~/srcs
$ git clone https://github.com/rabbitmq/rabbitmq-tutorials.git
$ cd rabbitmq-tutorials/erlang/
$ wget http://www.rabbitmq.com/releases/rabbitmq-erlang-client/v3.0.4/rabbit_common-3.0.4.ez
$ unzip rabbit_common-3.0.4.ez
$ ln -s rabbit_common-3.0.4 rabbit_common
$ wget http://www.rabbitmq.com/releases/rabbitmq-erlang-client/v3.0.4/amqp_client-3.0.4.ez
$ unzip amqp_client-3.0.4.ez
$ ln -s amqp_client-3.0.4 amqp_client

$ ./send.erl 
 [x] Sent 'Hello World!'

$ ./receive.erl 
 [*] Waiting for messages. To exit press CTRL+C
 [x] Received <<"Hello World!">>

トレースを仕掛けた方のコンソールを見ると、トレースした結果がわらわらと出力されている。

(<0.233.0>) call rabbit_variable_queue:init({amqqueue,{resource,<<"/">>,queue,<<"hello">>},
          false,false,none,[],<0.233.0>,[],[],undefined,[]},false,#Fun<rabbit_amqqueue_process.5.115664289>)
(<0.233.0>) call rabbit_variable_queue:drain_confirmed({vqstate,
    {0,{[],[]}},
    {0,{[],[]}},
    {delta,undefined,0,undefined},
    {0,{[],[]}},
    {0,{[],[]}},
    0,
    {0,nil},
    {0,nil},
    {qistate,"/tmp/rabbitmq-rabbit-mnesia/queues/850UOO636QPB9FAFP10MQ2OM",
        {{dict,0,16,16,8,80,48,
             {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},
             {{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]}}},
         []},
        undefined,0,65536,#Fun<rabbit_variable_queue.2.87551502>,
        {0,nil}},
    {undefined,
        {client_msstate,msg_store_transient,
            <<239,174,50,42,106,128,27,95,229,82,194,50,42,156,133,246>>,
            {dict,0,16,16,8,80,48,
                {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},
                {{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]}}},
            {state,200769,"/tmp/rabbitmq-rabbit-mnesia/msg_store_transient"},
            rabbit_msg_store_ets_index,
            "/tmp/rabbitmq-rabbit-mnesia/msg_store_transient",<0.180.0>,
            204866,196665,208963,213060}},
    false,0,0,0,infinity,0,0,0,0,0,

トレースを止めて、設定した条件をクリアするためには以下を実行する。

> dbg:stop_clear().

PuppetのSSL関連のエラーについて

puppetをmaster/agent構成で利用していると、agentからmasterへの通信でSSLが使われるため、証明書関連のエラーではまりがち。証明書はmasterやegentを起動した際に自動的に作ってくれるし、master側がCAとして署名してくれるので設定自体の手間はないが、その証明書のSubjectのCNと、ネットワーク的に逆解決できる名前が一致しないと?エラーになってしまう。

問題への対処法のひとつとして、masterやagentの起動時に --certnameオプションを指定することで、証明書のSubjectを任意の値に変えることができる。

$ sudo puppet agent --certname=hoge ...
...
$ sudo openssl x509 -in /var/lib/puppet/ssl/certs/hoge.pem -text | grep Subject
        Subject: CN=hoge

この場合、マニフェストのnode定義に、--certnameオプションで指定した値を書くと、マッチするようになる。

node hoge {
  ...

ただし、masterのサーバ証明書のSubjectを変更した場合には、その名前とagent実行時に--serverオプションで指定する接続先のホスト名が一致しなければ、以下のようなエラーになるため、/etc/hostsで対応をとるなどする必要がある。

Error: Failed to apply catalog: hostname does not match the server certificate
Error: Could not send report: hostname does not match the server certificate

SSLをoffにする方法がないかと思ったが、コードのいろいろな部分でSSLの利用が前提になっているようにみえる。
agentのコードでは、必ず証明書を取得しにいく。Puppet::Application::Agent#setup_host::

  def setup_host
    @host = Puppet::SSL::Host.new
    waitforcert = options[:waitforcert] || (Puppet[:onetime] ? 0 : Puppet[:waitforcert])
    cert = @host.wait_for_cert(waitforcert) unless options[:fingerprint]
  end

managerへのアクセスのためのモジュールでは、常ににuse_sslがtrueで呼び出されている。Puppet::Network::HTTP::Connection#initialize::

    class Connection
      include Puppet::Network::Authentication
  
      def initialize(host, port, use_ssl = true)
        @host = host
        @port = port
        @use_ssl = use_ssl
      end

puppetがバージョン2の頃は、master側だけは--servertype=mongrelとすることでSSLをoffにできたが、バージョン3になってからはMongrelはサポートされなくなっていた。

そもそもmaster/agent構成を使わず、各ノードにマニフェストが格納された/etc/puppet以下をコピーし、各ノードのローカルでpuppet applyコマンドを実行して適用するのが、多くの場合に無難な気もする。
マニフェストの配置はrsyncを使うか、subversionやgitを利用してチェックアウトすればよい。psshなどを利用すれば、puppet applyコマンドの実行も含めて、割と楽にできる。

puppet applyコマンドでも--certnameオプションは指定可能で、nodeのマッチングも変わるようなので、EC2のようなホスト名が変わりやすい環境でも、役に立つかもしれない。

(上記はpuppet-3.1.1で調べた結果にもとづく。)

ClojureのREPLで動きを解析

バイナリでznodeに格納されているデータを読み出してみた場合の例。

$ cd storm

$ lein repl

user=> (use '(backtype.storm config))
nil

user=> (use '(backtype.storm zookeeper))
nil

user=> (import '(backtype.storm.utils Utils))
backtype.storm.utils.Utils

user=> (let [zk (let [conf (read-storm-config)] (mk-client conf ["localhost"] 2181))] (Utils/deserialize (get-data zk "/storm/storms/mytopology-4-1361812014", false)))
1496860 [Thread-1] INFO  com.netflix.curator.framework.imps.CuratorFrameworkImpl  - Starting
#backtype.storm.daemon.common.StormBase{:storm-name "mytopology", :launch-time-secs 1361812014, :status {:type :active}, :num-workers 2, :component->executors {"split" 1, "__acker" 1, "count" 1, "sentence-parametrized" 1, "__system" 0, "sentence" 1}}
1496870 [Thread-1-EventThread] INFO  backtype.storm.zookeeper  - Zookeeper state update: :connected:none

user=> (let [zk (let [conf (read-storm-config)] (mk-client conf ["localhost"] 2181))] (Utils/deserialize (get-data zk "/storm/assignments/mytopology-4-1361812014", false)))
1499720 [Thread-1] INFO  com.netflix.curator.framework.imps.CuratorFrameworkImpl  - Starting
#backtype.storm.daemon.common.Assignment{:master-code-dir "storm-local/nimbus/stormdist/mytopology-4-1361812014", :node->host {"2de24c3a-9537-4522-a70b-117646d3bc44" "centos6"}, :executor->node+port {[3 3] ["2de24c3a-9537-4522-a70b-117646d3bc44" 6703], [5 5] ["2de24c3a-9537-4522-a70b-117646d3bc44" 6703], [4 4] ["2de24c3a-9537-4522-a70b-117646d3bc44" 6702], [2 2] ["2de24c3a-9537-4522-a70b-117646d3bc44" 6702], [1 1] ["2de24c3a-9537-4522-a70b-117646d3bc44" 6703]}, :executor->start-time-secs {[1 1] 1361812014, [5 5] 1361812014, [3 3] 1361812014, [2 2] 1361812014, [4 4] 1361812014}}
1499740 [Thread-1-EventThread] INFO  backtype.storm.zookeeper  - Zookeeper state update: :connected:none

rebalanceをコマンドを打ってから

$ bin/storm rebalance mytopology -w 60 -n 4 -e sentence=2 -e sentence-parametrized=2

反映されるまでの間と、

user=> (let [zk (let [conf (read-storm-config)] (mk-client conf ["localhost"] 2181))] (Utils/deserialize (get-data zk "/storm/storms/mytopology-4-1361812014", false)))
1570454 [Thread-1] INFO  com.netflix.curator.framework.imps.CuratorFrameworkImpl  - Starting
#backtype.storm.daemon.common.StormBase{:storm-name "mytopology", :launch-time-secs 1361812014, :status {:type :rebalancing, :delay-secs 60, :old-status {:type :active}, :num-workers 4, :executor-overrides #<HashMap {sentence-parametrized=2, sentence=2}>}, :num-workers 2, :component->executors {"split" 1, "__acker" 1, "count" 1, "sentence-parametrized" 1, "__system" 0, "sentence" 1}}
1570480 [Thread-1-EventThread] INFO  backtype.storm.zookeeper  - Zookeeper state update: :connected:none

user=> (let [zk (let [conf (read-storm-config)] (mk-client conf ["localhost"] 2181))] (Utils/deserialize (get-data zk "/storm/assignments/mytopology-4-1361812014", false)))
1576881 [Thread-1] INFO  com.netflix.curator.framework.imps.CuratorFrameworkImpl  - Starting
#backtype.storm.daemon.common.Assignment{:master-code-dir "storm-local/nimbus/stormdist/mytopology-4-1361812014", :node->host {"2de24c3a-9537-4522-a70b-117646d3bc44" "centos6"}, :executor->node+port {[3 3] ["2de24c3a-9537-4522-a70b-117646d3bc44" 6703], [5 5] ["2de24c3a-9537-4522-a70b-117646d3bc44" 6703], [4 4] ["2de24c3a-9537-4522-a70b-117646d3bc44" 6702], [2 2] ["2de24c3a-9537-4522-a70b-117646d3bc44" 6702], [1 1] ["2de24c3a-9537-4522-a70b-117646d3bc44" 6703]}, :executor->start-time-secs {[1 1] 1361812014, [5 5] 1361812014, [3 3] 1361812014, [2 2] 1361812014, [4 4] 1361812014}}
1576888 [Thread-1-EventThread] INFO  backtype.storm.zookeeper  - Zookeeper state update: :connected:none

反映された後。

user=> (let [zk (let [conf (read-storm-config)] (mk-client conf ["localhost"] 2181))] (Utils/deserialize (get-data zk "/storm/storms/mytopology-4-1361812014", false)))
1660965 [Thread-1] INFO  com.netflix.curator.framework.imps.CuratorFrameworkImpl  - Starting
#backtype.storm.daemon.common.StormBase{:storm-name "mytopology", :launch-time-secs 1361812014, :status {:type :active}, :num-workers 4, :component->executors {"split" 1, "__acker" 1, "count" 1, "sentence-parametrized" 2, "__system" 0, "sentence" 2}}
1660991 [Thread-1-EventThread] INFO  backtype.storm.zookeeper  - Zookeeper state update: :connected:none

user=> (let [zk (let [conf (read-storm-config)] (mk-client conf ["localhost"] 2181))] (Utils/deserialize (get-data zk "/storm/assignments/mytopology-4-1361812014", false)))
1664021 [Thread-1] INFO  com.netflix.curator.framework.imps.CuratorFrameworkImpl  - Starting
#backtype.storm.daemon.common.Assignment{:master-code-dir "storm-local/nimbus/stormdist/mytopology-4-1361812014", :node->host {"2de24c3a-9537-4522-a70b-117646d3bc44" "centos6"}, :executor->node+port {[3 3] ["2de24c3a-9537-4522-a70b-117646d3bc44" 6701], [5 5] ["2de24c3a-9537-4522-a70b-117646d3bc44" 6703], [4 4] ["2de24c3a-9537-4522-a70b-117646d3bc44" 6700], [2 2] ["2de24c3a-9537-4522-a70b-117646d3bc44" 6702], [1 1] ["2de24c3a-9537-4522-a70b-117646d3bc44" 6703]}, :executor->start-time-secs {[3 3] 1361812175, [1 1] 1361812175, [5 5] 1361812175, [4 4] 1361812175, [2 2] 1361812175}}
1664050 [Thread-1-EventThread] INFO  backtype.storm.zookeeper  - Zookeeper state update: :connected:none

rebalanceコマンドにはexecutorの数についてのオプションも用意されているが、指定しても数は変わらなかった。というのは、topology起動時に決まるタスク数の数は変えられないからだった。
https://github.com/nathanmarz/storm/issues/54
起動時に1 taskごとに1 executorの割りあてがある状態では、それ以上はexecutorスレッドは増やしようがない。