2013年12月20日金曜日

キャッシュ層と永続化層、Oracle Coherence と Riak の連携(概要編)

前のブログが Riak Source Code Reading の記事だったのですが、気がつけば Basho に入社してました。

この記事はみんなでやる Riak Advent Calendar 2013 の 20日目の記事です。

今回は Riak と外部プロダクトの連携を考えてみます。
時間がなかったので、詳細はまた別途書きます。

Oracle Coherenceとの連携

Oracle Coherenceは 分散キャッシュ製品(オラクルの言い方だと分散データグリッド)です。キャッシュなので、一般的にRDBなどの前段に置いて、高速化目的で使われます。Coherenceは分散キャッシュであるためスケールアウトもできます。アプリの性能要件に対応しCoherenceを使って高速化。これはよくあるシナリオなのです。

ここで、OLTP系のシステムなどデータの更新が多いシステムの場合を考えてみましょう。Coherenceはさばいた更新量を、その下の永続化層であるDBがさばききれない可能性がありますね。もちろん本来ならここで、RDBのチューニングやら、Coherenceの永続化周りのオブジェクト設計をしっかり考えるのですが、ここで第3の選択肢を考えてみましょう。

今回はRiakのアドベントカレンダーなのでRiakを使ってみます。CoherenceもKVS、RiakもKVS、つまりCoherenceに格納されたデータを "そのまま" の形式でRiakに永続化できそうです。しかもRiakはスケールアウトが容易ですので、Coherenceの更新量に負けないだけのRiakサーバーを用意すればOKです。CoherenceとRiakは実は相性が良いのです。(Oracle RAC もスケールアウトするし!とか野暮な突っ込みは勘弁してください。)


というわけで試してみました。


Oracle Coherence の内部形式のままRiakに保存する

Coherenceのキャッシュにデータを格納する際、データすなわちKeyとValueはシリアライズされて格納されます。CoherenceではPOFと呼ばれるCoherenceが提供する方式でのシリアライズを推奨しています。アプリからCoherenceに格納する際はシリアライズして転送&格納、データを取り出す際は取り出したあとにデシリアライズといった具合になります。
本来CoherenceからRDBに永続化する際は、永続化のタイミングでもデシリアライズが必要です。しかしRiakに格納する場合はどうでしょうか?RiakはKeyとValueをバイナリで格納します。つまりデシリアライズなんて必要ないのです。Coherenceが使う内部形式であるPOFのままRiakに書き込むことが可能です。これはCoherenecで通常用いるCacheStoreではなく、BinaryEntryStoreと呼ばれるものを使えば実現できます。(なお、厳密な意味でCoherenceの内部形式はPOFをさらにラップした形式なのですが、ここでは内部形式=POFと表現しておきます。)


試しに作ったもの

というわけでさくっと作ってみました。
当然各製品はそれぞれ必要です。
・Oracle Coherence 12c 
・Riak 1.4.2 

さて、Riak側はデフォルトの設定のまま起動してもらっても構いません。そしてCoherenceですが、今回のサンプルアプリを作るのに用意したファイルはたったの3つ。
・Coherenceの設定ファイル(coherence-cache-config.xml
・CoherenceのデータをRiakに格納する処理(RiakCacheStore.java
・Coherenceにデータを読み書きするただのサンプル(TestClient.java


実行してみた

RiakとCoherenceを起動した後、サンプルアプリである、TestClientを実行してみます。 このアプリは、最初に50件データをputした後、Coherenceのキャッシュを一度クリアーします。その後最初に格納したデータと同じKeyでgetを行います。

TestClient.main() - put data to the TestCache
TestClient.main() - Clear TestCache
TestClient.main() - Get key=Testkey0, value=value0
TestClient.main() - Get key=Testkey1, value=value1
TestClient.main() - Get key=Testkey2, value=value2
・・・略
TestClient.main() - Get key=Testkey45, value=value45
TestClient.main() - Get key=Testkey46, value=value46
TestClient.main() - Get key=Testkey47, value=value47
TestClient.main() - Get key=Testkey48, value=value48
TestClient.main() - Get key=Testkey49, value=value49

裏では、Coherenceのputの際、CoherenceのBinaryEntryStoreを通じて、Riakにputが実行されています。またcacheをクリアーした後のgetでは、Coherence上でcache missした場合、やはりCoherenceのBinaryEntryStoreを通じて、Riakからのfetchが実行されています。


CoherenceでReadWriteBackingMapと呼ばれるキャッシュ設定を使った場合、Coherenceのputに対応してstoreメソッドが、get時cache missに対応してloadメソッドが実行されます。実質以下のコードが今回の全てです。
 public void load(BinaryEntry bEntry) {
  String riakKey = new String(bEntry.getBinaryKey().toByteArray());
  System.out.println("RiakCacheStore.load() - from Riak to Coherence [key=" + riakKey +"]");
  try {
   IRiakObject myData = myBucket.fetch(riakKey).execute();
   bEntry.updateBinaryValue(new Binary(myData.getValue()));
  } catch (RiakException e) {
   throw ensureRuntimeException(e);
  }
 }
 
 public void store(BinaryEntry bEntry){
  String riakKey = new String(bEntry.getBinaryKey().toByteArray());
  System.out.println("RiakCacheStore.store() - from Coherence to Riak [key=" + riakKey +"]");
  try {
   myBucket.store(riakKey, bEntry.getBinaryValue().toByteArray()).execute();
  } catch (RiakException e) {
   throw ensureRuntimeException(e);
  }
 }
ほら簡単ですよね。 今回は時間が無いのでこの辺りで。かなり端折って書いたので、次回は、CoherenceとRiakの説明を交えつつ詳細に書きたいと思います。

2013年2月20日水曜日

Erlang導入からRiakのクラスター参加まで

Erlang/OTP上で動く分散データベース Riak のソースコードを読む勉強会、 Riak Source Code Reading@東京 で、 ついに担当がまわってきてしまったので、重い腰をあげることになりました。 てかソースコードリーディングの初回から参加しているのに、 忙しさを理由に、未だに Riak 起動させたこともなければ、Erlangもまともに読み書きしてこなかったので反省。 というわけでまずは Riak を動かしてみます。どの環境で動かそうかなと思ったけど、 どうも Riak は Windows をサポートしてないようなんで、Mac 上に環境を導入してみました。

Erlangの導入

homebrew使ってる人は、簡単に導入出来ます。 Riakを動かす際のバージョンは、R15B01 推奨らしいので、そのバージョンをいれてみます、
$ brew versions erlang
R15B03-1 git checkout 168742f /usr/local/Library/Formula/erlang.rb
R15B03 git checkout 311472d /usr/local/Library/Formula/erlang.rb
R15B02 git checkout 44e09dd /usr/local/Library/Formula/erlang.rb
R15B01 git checkout 6b8d25f /usr/local/Library/Formula/erlang.rb
R15B git checkout 497b13a /usr/local/Library/Formula/erlang.rb
R14B04 git checkout aedacdf /usr/local/Library/Formula/erlang.rb
R14B03 git checkout 9332ca9 /usr/local/Library/Formula/erlang.rb
R14B02 git checkout b782d9d /usr/local/Library/Formula/erlang.rb
R14B01 git checkout 0476235 /usr/local/Library/Formula/erlang.rb
R14B git checkout 7871a99 /usr/local/Library/Formula/erlang.rb
R13B04 git checkout 31f1fab /usr/local/Library/Formula/erlang.rb
R13B03 git checkout 729f3fa /usr/local/Library/Formula/erlang.rb
R13B02-1 git checkout 0d673c6 /usr/local/Library/Formula/erlang.rb
brewだと、以下の手順で任意のバージョンを導入可能です。
$ cd /usr/local/Library/Formula/
$ git checkout 6b8d25f /usr/local/Library/Formula/erlang.rb
バージョンを確認して、インストール。
$ brew info erlang
erlang: stable R15B01 (bottled), HEAD
http://www.erlang.org
...

$ brew install erlang
==> Downloading https://downloads.sf.net/project/machomebrew/Bottles/erlang-R15B01.mountainlion.bottle.tar.gz
...
Erlangのシェルを起動してみます。R15B01が入りました。
$ erl
Erlang R15B01 (erts-5.9.1) [source] [64-bit] [smp:4:4] [async-threads:0] [hipe] [kernel-poll:false] [dtrace]
Eshell V5.9.1 (abort with ^G)

Riakのインストール

Riak のインストールは非常に簡単です。すでにバイナリが提供されており、自分の環境にあったものをダウンロードするだけです。 また、Homebrewからでも導入可能です。手順はともに Riak のドキュメントに書かれている。
$ curl -O http://downloads.basho.com.s3-website-us-east-1.amazonaws.com/riak/1.2/1.2.1/osx/10.4/riak-1.2.1-osx-x86_64.tar.gz
$ tar xzvf riak-1.2.1-osx-x86_64.tar.gz

・・・が、今回はソースコードリーディングなわけで、それぐらいソースからビルドしなくてどうする! ということでソースコード持ってきましょう。とはいっても大差ないぐらい簡単だけど。 ドキュメントはこれ
$ curl -O http://downloads.basho.com.s3-website-us-east-1.amazonaws.com/riak/1.2/1.2.1/riak-1.2.1.tar.gz
$ tar zxvf riak-1.2.1.tar.gz
$ cd riak-1.2.1
$ make rel
ビルド中に github につなぎに行って、依存している leveldb を落としにいくようで、インターネットに繋がらない環境でやる場合は注意。 さらに、make deverel とやると、dev1, dev2, dev3, dev4 と4つの Riak 環境を作ってくれます。
$ make devrel
$ cd dev/
$ ls
dev1 dev2 dev3 dev4
先ほどのディレクトリ配下の dev というディレクトリ内に、4つの環境ができています。 それぞれに riak コマンドが入っていて、個別に riak を起動出来ます。 こうすると、ローカルの1つのOS上で、複数の riak を起動して、クラスターを組ませたりできます。


Riakの起動

Riakを起動してプロセスを見てみます。riak start コマンドで起動できます。
$ ./riak start

$ ps -ef | grep riak
  501 802 1 0 3:50PM ?? 0:00.01 /workspace/riak-1.2.0/dev/dev1/erts-5.9.1/bin/epmd -daemon
  501 819 1 0 3:50PM ?? 0:00.01 /workspace/riak-1.2.0/dev/dev1/erts-5.9.1/bin/run_erl -daemon /tmp//workspace/riak-1.2.0/dev/dev1/ /workspace/riak-1.2.0/dev/dev1/log exec /workspace/riak-1.2.0/dev/dev1/bin/riak console
  501 821 819 0 3:50PM ttys004 0:02.48 /workspace/riak-1.2.0/dev/dev1/erts-5.9.1/bin/beam.smp -K true -A 64 -W w -- -root /workspace/riak-1.2.0/dev/dev1 -progname riak -- -home /Users/taka -- -boot /workspace/riak-1.2.0/dev/dev1/releases/1.2.0/riak -embedded -config /workspace/riak-1.2.0/dev/dev1/etc/app.config -pa ./lib/basho-patches -name dev1@127.0.0.1 -setcookie riak -- console
epmd、run_erl、beam.spm の3つのプロセスが上がってますね。 PIDを見ると epmd は独立していて、run_erl 経由で beam.spm が動いているようです。 止めてみます。riak stop で止まります。
$ ./riak stop
ok

$ ps -ef | grep riak
  501 802 1 0 3:51PM ?? 0:00.01 /workspace/riak-1.2.0/dev/dev1/erts-5.9.1/bin/epmd -daemon
epmdってなんだろう? これはRiakじゃなくてErlangがそもそも提供しているものっぽい。Erlangのドキュメントを見てみる。
Erlang Port Mapper Daemon
This daemon acts as a name server on all hosts involved in distributed Erlang computations.
なるほど、nameserver的な動きをするらしい。 dev1,dev2,dev3 の 3個起動してみると、epmdは1個だけだ。
$ ps -ef | grep riak
  501 802 1 0 3:50PM ?? 0:00.02 /workspace/riak-1.2.0/dev/dev1/erts-5.9.1/bin/epmd -daemon
  501 2020 1 0 3:57PM ?? 0:00.01 /workspace/riak-1.2.0/dev/dev1/erts-5.9.1/bin/run_erl -daemon /tmp//workspace/riak-1.2.0/dev/dev1/ /workspace/riak-1.2.0/dev/dev1/log exec /workspace/riak-1.2.0/dev/dev1/bin/riak console
  501 2157 1 0 3:57PM ?? 0:00.01 /workspace/riak-1.2.0/dev/dev2/erts-5.9.1/bin/run_erl -daemon /tmp//workspace/riak-1.2.0/dev/dev2/ /workspace/riak-1.2.0/dev/dev2/log exec /workspace/riak-1.2.0/dev/dev2/bin/riak console
  501 2592 1 0 3:59PM ?? 0:00.01 /workspace/riak-1.2.0/dev/dev3/erts-5.9.1/bin/run_erl -daemon /tmp//workspace/riak-1.2.0/dev/dev3/ /workspace/riak-1.2.0/dev/dev3/log exec /workspace/riak-1.2.0/dev/dev3/bin/riak console
  501 2022 2020 0 3:57PM ttys004 0:01.96 /workspace/riak-1.2.0/dev/dev1/erts-5.9.1/bin/beam.smp -K true -A 64 -W w -- -root /workspace/riak-1.2.0/dev/dev1 -progname riak -- -home /Users/taka -- -boot /workspace/riak-1.2.0/dev/dev1/releases/1.2.0/riak -embedded -config /workspace/riak-1.2.0/dev/dev1/etc/app.config -pa ./lib/basho-patches -name dev1@127.0.0.1 -setcookie riak -- console
  501 2159 2157 0 3:57PM ttys005 0:02.04 /workspace/riak-1.2.0/dev/dev2/erts-5.9.1/bin/beam.smp -K true -A 64 -W w -- -root /workspace/riak-1.2.0/dev/dev2 -progname riak -- -home /Users/taka -- -boot /workspace/riak-1.2.0/dev/dev2/releases/1.2.0/riak -embedded -config /workspace/riak-1.2.0/dev/dev2/etc/app.config -pa ./lib/basho-patches -name dev2@127.0.0.1 -setcookie riak -- console
  501 2594 2592 0 3:59PM ttys006 0:01.62 /workspace/riak-1.2.0/dev/dev3/erts-5.9.1/bin/beam.smp -K true -A 64 -W w -- -root /workspace/riak-1.2.0/dev/dev3 -progname riak -- -home /Users/taka -- -boot /workspace/riak-1.2.0/dev/dev3/releases/1.2.0/riak -embedded -config /workspace/riak-1.2.0/dev/dev3/etc/app.config -pa ./lib/basho-patches -name dev3@127.0.0.1 -setcookie riak -- console


Riakのクラスター参加

現在1つのマシン上で、dev1, dev2, dev3 の名前で、それぞれ riak start して、riakが起動している状態です。 これらはまだ独立して起動しているだけで、クラスターを組んでいません。 クラスター制御は、riak-admin コマンドで行います。 今回は、dev2とdev3をdev1のクラスターに参加させるイメージなので、 dev2とdev3でそれぞれ、./riak-admin cluster join dev1@127.0.0.1 を実行します。

$ ./riak-admin cluster join dev1@127.0.0.1
Success: staged join request for 'dev3@127.0.0.1' to 'dev1@127.0.0.1'

ここでメッセージに注目すると、"staged join request" となっています。 Riakのドキュメントを読むと、以下のように書かれていました。
As of version 1.2, Riak provides a multi-phased approach to cluster administration that allows changes to be staged and reviewed before being committed. This approach to cluster administration allows multiple changes to be grouped together, such as adding multiple nodes at once, or adding some nodes while removing others.
joinするまでには段階があって、stagedな状態になったあと、管理者がreviewしてcommitするまでは反映されないようですね。 riak-admin cluster plan を実行することで、今回 dev1,dev2,dev3でクラスタを組んだ場合に、 どのようなRingの配置になるか確認します。

$ ./riak-admin cluster plan
=============================== Staged Changes ================================
Action Nodes(s)
-------------------------------------------------------------------------------
join 'dev2@127.0.0.1'
join 'dev3@127.0.0.1'
-------------------------------------------------------------------------------


NOTE: Applying these changes will result in 1 cluster transition

###############################################################################
                         After cluster transition 1/1
###############################################################################

================================= Membership ==================================
Status Ring Pending Node
-------------------------------------------------------------------------------
valid 100.0% 34.4% 'dev1@127.0.0.1'
valid 0.0% 32.8% 'dev2@127.0.0.1'
valid 0.0% 32.8% 'dev3@127.0.0.1'
-------------------------------------------------------------------------------
Valid:3 / Leaving:0 / Exiting:0 / Joining:0 / Down:0

WARNING: Not all replicas will be on distinct nodes

Transfers resulting from cluster changes: 42
  21 transfers from 'dev1@127.0.0.1' to 'dev3@127.0.0.1'
  21 transfers from 'dev1@127.0.0.1' to 'dev2@127.0.0.1'


どうやら、現在dev1が100%持っているRingの配分が、dev1:dev2:dev3=34.4:32.8:32.8になるようですね。 問題なければ riak-admin cluster commit。

$ ./riak-admin cluster commit
Cluster changes committed
riak-admin memver-status で現在の状態が確認できます。plan通りの配置になりました。 ちなみに 21 transfers from 'dev1@127.0.0.1' to 'dev3@127.0.0.1' とか書かれていたとおり、 転送に若干時間がかかるので、commit した1秒後とかに member-status を見ると、 未転送分は Pending のところに%で表示されます。

$ ./riak-admin member-status
================================= Membership ==================================
Status Ring Pending Node
-------------------------------------------------------------------------------
valid 34.4% -- 'dev1@127.0.0.1'
valid 32.8% -- 'dev2@127.0.0.1'
valid 32.8% -- 'dev3@127.0.0.1'
-------------------------------------------------------------------------------
Valid:3 / Leaving:0 / Exiting:0 / Joining:0 / Down:0



おもむろにパケットダンプ。Wiresharkはプロトコルがepmdだと認識してますね。すごい。dev2って名前のErlangノード誰?ってリクエストとに対して、返事返ってるのがわかります。


実際にRiakが投げるメッセージとかは、erldpというプロトコルで飛ばされている様子がわかります。Erlang Distribution Protocol の略みたい。次は、こっちの視点からコードを追いかけてみたいな。


コードからの確認

次は、cluster参加するさいのコードがどうなっているのかを確認します。 まずはどのソースコードなのか特定します。 riakのコードは、riak_core, riak_kv, riak_console, riak_pb, ... など機能ごとに細かく分かれています。 どうやって探そうかな、Erlangだとどうするかいまいち分からないので、ソースコードを先ほど出ていたログ "Success: staged join request" で grep してみると、riak_kv_console.erl の中にあるっぽい。 ここは riak_kv のレイヤーで、実際の処理は rika_core のレイヤーにあるみたい。
staged_join([NodeStr]) ->
    Node = list_to_atom(NodeStr),
    join(NodeStr, fun riak_core:staged_join/1,
         "Success: staged join request for ~p to ~p~n", [node(), Node]).
実際のソースコードリーディングの時には、「riak-adminコマンドはシェルスクリプトなんで、その中見ると、riak-adminの各コマンドが何を呼び出しているか分かるよ」とコメントを頂きました。 riak_core:staged_join はこんな感じ。Node には、"dev1@127.0.0.1" 的な文字列が入ってくるはず。 ここからnodeの名前を取り出して実際の処理にうつっているっぽい。
%% @doc Join the remote cluster without automatically claiming ring
%%      ownership. Used to stage a join in the newer plan/commit
%%      approach to cluster administration. See {@link riak_core_claimant}
staged_join(Node) ->
    join(Node, false).

join(NodeStr, Auto) when is_list(NodeStr) ->
    join(riak_core_util:str_to_node(NodeStr), Auto);
join(Node, Auto) when is_atom(Node) ->
    join(node(), Node, Auto).
riak_core_gossip:legacy_gossip() が呼び出されている。追っていくとどうも、 プロトコルに legacy_gossip って書いてある
join(Node, Node, _) ->
    {error, self_join};
join(_, Node, Auto) ->
    join(riak_core_gossip:legacy_gossip(), node(), Node, false, Auto).
Riakが使っているgossipのプロトコルには複数バージョンがあるっぽい。 コードを追っていくと、極力下位互換性を保とうとする考えが伺えますね。 実際にRingの処理のところでも、Ringのバージョンが LEGACY_RING_VSN なのか、 CURRENT_RING_VSN なのかで、処理を分けているところがあった。

ちなみにこれはまだ staged join の段階のコードなので、実際の commit 時は、また別の処理が行われているみたい。さすがに今回は追いかけ切れそうにないので、今日はここまで。