2015年2月27日金曜日

Apache Kafkaを試してみた

概要

KafkaはScalaで書かれたpub/subモデルの分散メッセージングプラットフォームです
有名なサービスだとLinkedInが導入しているようです
同じようなミドルウェアにKestrel(ケストレル)やRabbitMQがあります
そもそも、KafkaさんがどんなものなのかよくわかったなかったのでとりあえずQuickStartをやってみたので紹介します

環境

  • Mac OS X 10.8.5
  • kafka 0.8.2.0 (Scala 2.11)

インストール

公式で配布されているバイナリをダウンロードしてインストールしてみたいと思います

cd ~/Downloads
wget http://supergsego.com/apache/kafka/0.8.2.0/kafka_2.11-0.8.2.0.tgz
tar zvxf kafka_2.11-0.8.2.0.tgz
mv kafka_2.11-0.8.2.0 /usr/local/
cd /usr/local
ln -s kafka_2.11-0.8.2.0/ kafka

MacであればHomebrewでもインストールできそうです
インストールは上記で完了です
インストールが完了したら早速試してみましょう

試してみる

kafkaの起動

zookeeperの起動

cd /usr/local/kafka
sh bin/zookeeper-server-start.sh config/zookeeper.properties

kafkaサーバの起動

cd /usr/local/kafka
sh bin/kafka-server-start.sh config/server.properties

先にzookeeperのプロセスを起動してください
それぞれ別ターミナルで実行してください

シェルを叩いているだけですが、内部的にはjarファイルが実行されてJavaのプロセスが立ち上がっています

トピックの作成

メッセージを送信するためのトピックを作成します

cd /usr/local/kafka
sh bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test

testというトピックを作成しました
bin/kafka-topics.sh --zookeeper localhost:2181の部分はテンプレ的な感じです
その他パラメータの説明は以下の通りです

  • create・・・トピックを作成するためのオプションです、このオプションを変更することでトピックの一覧を表示したり削除したりすることができます
  • replication-factor・・・kafkaクラスタ内に存在するkafkaブローカーの何台にトピックを作成するか指定できます、なのでkafkaブローカーが1つしかない場合は1以上は指定できません
  • partitions・・・トピックをいくつのパーティションに分割するか指定します、この辺はちょっと自身がないですが1つのバーティション上にトピックを作成するとアクセス効率が悪いので複数のバーティションに分割することでアクセスを分散するためだと思います(The number of partitions for the topic being created or altered (WARNING: If partitions are increased for a topic that has
    a key, the partition logic or ordering of the messages will be affected)
  • topic・・・作成するトピック名を指定します

といった感じです
この辺のオプション周りは詳しくは調べていないので詳細は公式ドキュメントを見ていただければと思います

作成したトピックを確認するにはlistを使います

cd /usr/local/kafka
sh bin/kafka-topics.sh --list --zookeeper localhost:2181

作成したトピックの一覧が表示されると思います

更に各トピックごとの詳細を知りたい場合は「–describe」を利用します

cd /usr/local/kafka
sh bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

以下のような感じでパーティションの情報やレプリケーションの情報を確認することができます

Topic:test      PartitionCount:1        ReplicationFactor:1     Configs:
        Topic: test     Partition: 0    Leader: 0       Replicas: 0     Isr: 0

トピックの削除

ここで注意が必要ですが今回使っているkafka0.8.2.0ではトピックを削除するのに「–delete」というオプションを使えばいいのですが、release版の0.8.1.1では「–delete」オプションがありません
そのため作成したトピックは削除することができないようです

0.8.2.0では以下のようにしてトピックを削除します

cd /usr/local/kafka
echo 'delete.topic.enable=true' >> config/server.properties

まず「–delete」オプションを有効にする設定を追記します
そして再度kafkaサーバを起動し以下のコマンドを実行しま

cd /usr/local/kafka
bin/kafka-topics.sh --delete --topic test --zookeeper localhost:2181

これで作成したトピックの削除を実施することができます

メッセージを送信

作成したtestトピックにメッセージを送信してみましょう

cd /usr/local/kafka
sh bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

メッセージを送信する際のポートは9092になります

上記を実行すると入力待ち状態になります
メッセージを入力してEnterを押すとメッセージを送信することができます
連続で何個もメッセージを送信することができます
終了する場合はCtrl+cで抜けます

メッセージを受信

では送信したメッセージを受信してみましょう
以下のコマンドで受信することができます

cd /usr/local/kafka
sh bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

先ほど送信したメッセージの一覧が表示されると思います
メッセージの受信も送信時と同じように待ち状態になります
この状態でメッセージの送信を別のターミナルで実行してみるとリアルタイムにメッセージを受信できると思います

「–from-beginning」を指定するとトピックに送信されたメッセージを最初からすべて取得することができます
接続した時点からのメッセージのみを取得したい場合は「–from-beginning」オプションを削除して受信してみてください

最後に

Kafkaはあくまでもメッセージを管理するための基盤で、スケーラビリティとか分散に特化したミドルウェアです
Kafka自体にはMQTTやWebSocket、HTTPなどのインタフェースを持っていなくて本当にメッセージを管理するだけの基盤になります
いろいろ調べるとkafkaと連携してMQTTインタフェースを持たせるブリッジ的なものを作成している方もいたので、インタフェース周りはエコシステム的に作られていくのかなと思いました

今回は基本的な部分しか触れていませんがQuickStartはこの後Kafkaクラスタを構築する手順になっています
本気でやろうとするとKafkaクラスタを構築するのはおそらく必須条件になると思います

参考サイト

0 件のコメント:

コメントを投稿