こんにちは、上坂(@takashiuesaka)です。
前回に引き続き、EventHubsのお話です。今回は受信側のお話をしましょう。英語の文章をひたすら読んでいたせいか、どこか日本語が翻訳っぽい気がしますが大丈夫でしょうか。大丈夫かって聞かれても困りますね、すいません気にしないでください。
今回も↓こちらを読み込んで実際にテストして私が把握した内容のまとめです。どこまで正しいかは私にはわからないので正確な情報はこちらをご覧下さい。
Azure Event Hubs の概要
イベント ストリーム指向のシステムの台頭
さて、前回はPartitionにイベントが投入されるまでの話をしました。次は受信側です。まず大前提として認識しておかなければならないのが、 EventHubsに格納されたイベントは、自分で取りに行くのではなく、通知されるプッシュ型である、ということです。なので、最初にセッションを張っ て、以降はひたすらイベント(という名の文字列)が到着するのを待つ感じになります。
これは、イベントの受信に使用するAMQPというプロトコルがそういう仕様だからようです。ちなみに送信時も.NETのSDKを使うとAMQPでイベントを投げます。RESTも投げられるようですけど試していません。でも受信側はAMQP一択のようです。
(ちなみにAMQPをプッシュ型、というとかなり語弊があるかと思います。実際には双方向のようなので。でもプッシュされる、と思ったほうがEventHubsの場合はわかりやすいかなぁと思うので、敢えて。)
そして前回でも書きましたが、とても重要なのは1つのPartitionに接続できる受信クライアントは1つだけ、ということなんですが、それだと 1つのイベントは1つの受信クラアントしか処理できないことになります。それじゃあとても不便だね、ということで用意されているのが ConsumerGroupというものです。
EventHubsには1つのパーティションに1つずつ用意されたイベントの読み取り器がある、と思って下さい。だからPartitionが8つだったら読み取り器も8つ。で、受信クライアントはその読み取り器に接続しにいくわけです。
この読み取り器のことをドキュメントではリーダーと表現していたりします。さて、この8つの読み取り器をまとめてConsumerGroup、と呼 んでいるんです。そしてConsumerGroupは増やすことができます。(管理ポータルでも、コードで増やすこともできる)増やせば、当然読み取り器 も増えます。
つまりConsumerGroupごとに各Partitionへの読み取り器が用意される、つまり同じイベントに対して異なる処理を行う受信クライアントを用意できます。
さて、EventHubsに格納されたイベントは外側から消すことはできません。保持期間が過ぎるのをじっと待つだけです。受信クライアントがずっ と立ちあげっぱなしの場合はイベントがプッシュされるのを待っているので、特に問題を感じませんが、一度でも受信クライアントが停止してしまったらどうな るでしょう。Partitionに格納されている一番古いイベントからイベントが全部プッシュされてしまいます。困りますね。どうしましょうか。
各イベントを受信すると、offsetというものが一緒に渡されます。数字なんですけど、このoffsetが格納されたイベントの位置を示していま す。CousumerGroupの読み取り器にに対して接続を行う時に、offsetを渡すと、読み取り開始位置をoffsetまでずらすことができるよ うになっています。だから受信クライアントは渡されたoffsetを記録しておかなければなりません。そしてoffsetは当然ですがPartition ごとに保持する必要があります。
ドキュメントを読むと、チェックポイント機能というのがあります。これが意味することは
- 受信クライアント側で「このイベントのoffsetを記録する!」というメソッドを叩くように実装すること(PartitionContext.CheckpointAsyncメソッドを叩くということ)
- メソッドが叩かれたら、offsetを記録するクラスのメソッドを呼ぶから、そこで保存するように実装すること(ICheckpointManager.CheckpointAsyncのこと)
ということなんです。わかんないですね。イベントを受信するクライアント側ではそのイベントを処理したらoffsetを保存してそのイベントはもう処理しないようにしたいわけです。そのために用意されているメソッドがあるので、それを叩きなさい、というだけのことです。
次に、offsetをどこにどうやって記録するのかは、完全に受信クライアントの実装に任されています。ファイルだろうがDBだろうが、何に記録し ようが勝手です。そのクラスはICheckpointManagerインタフェースを実装する必要があるんです。このインターフェースは CheckpointAsyncというメソッド1つだけを持っています。その内部でoffsetを記録すれば良いのです。
えーいちいち実装するのめんどくさーいと思いますよね。(やってみるとそんなに面倒でもないのですが)そこであらかじめ用意されているのがEventProcessorHostクラスというやつで、検索するとまずこいつの使い方しか説明がないわけです。
このクラスのコンストラクタにはなぜかAzureStorageの接続情報を渡さないといけないのですが、EventProcessorHostクラスの内部実装でPartitionIdごとのoffset値をAzureStorageに記録しているからなんですね。
どの位置のイベントから受信するのかは100%クライアント側の責任であって、EventHubsはそんなこと知らねぇ!というスタンスをとっていることはよく覚えておく必要がありますね。
以上、EventHubsの概要まとめでした。面倒なのでテストコードは載せませんでしたが、いかがだったでしょうか。参考になったら嬉しいです!