DMMグループの一番深くておもしろいトコロ。
テクノロジー

Redis Streamsを活用したイベントドリブンアーキテクチャの構築事例

DMMグループの一番深くておもしろいトコロ。

はじめに

EXNOA プラットフォームマイグレーション部所属の寺谷です。

プラットフォームマイグレーション部はDMM GAMESプラットフォームのリプレイスを推進・実践する部署で、私が所属しているチームでは主にデータベース(以下DB)などのデータを中心とした、バックエンドサービスのリプレイスを実践しています。

リプレイス内容はオンプレミスにあるPHPで構築されているサービスを、AWSを利用したクラウドネイティブへの移行およびGoで再構築するといったものです。

下記記事でも弊部署所属の岡崎がリプレイス事例に関して発表を行っていますので、参考にしていただけますと幸いです。

DMM.go #4をオンライン開催しました!

 

バックエンドサービスのAPIを用いてのDBへのデータ追加・更新は性能のボトルネックになりやすく、高負荷・低レイテンシが求められる場面では非同期でのデータ追加・更新を検討する場合があると思います。

本記事では、非同期でのデータ追加・更新にあたり、イベントドリブンアーキテクチャとしてRedis Streamsを採用した構築事例を紹介します。

イベントドリブンアーキテクチャを検討中の方や既に運用している方、RedisのStream型を知らなかった方などの参考になれば幸いです。

背景

バックエンドサービスにあるAPIのリプレイスにあたり、DBへのデータ書き込みがあるAPI(POSTやPUTメソッド)の一部ユースケースにて、低レイテンシかつ高スループットの実現が必要になりました。

具体的には下記の要件です。

  • データの追加・更新が、DBおよびキャッシュ(redis)に同時に発生する
  • データが即時利用されないため、データの追加・更新は結果整合で問題ない
  • p95で50ms未満の応答性能
  • 100,000rps以上のスループット性能

この要件を満たす場合、既存のアーキテクチャ構成では下記のような課題がありました。

  • RDS(Aurora)+ElastiCache for Redisのwrite-through方式では負荷試験の結果約50,000rpsが性能限界であった
    • RDSのCPUが性能限界を迎えたことが原因
    • 加えてレイテンシはRDSの負荷に引きずられて安定せず、50msを大幅に超えていた
  • RDSのprimaryの負荷分散のためのシャーディングは運用を簡単にするために極力避けたかった
    • データの増え方が流動的なためシャード数の予測が困難な状態でもあった
  • 課題解決へのアプローチ手法に関しての検証期間が、リリーススケジュール的に大きく取ることができなかった
    • 既存知識+αで補える範囲での解決が求められた

上記課題より、非同期でのデータ書き込みを行うためにイベントドリブンアーキテクチャを検討することになり、結果的に私のチームではRedis Streamsの採用に至りました。

Redis Streamsとは

Redis StreamsとはRedis 5.0で導入されたデータ型で、下記のような特徴を持っています。

  • データを時系列に格納する追記型のデータ構造
  • データ取得後も揮発せずに参照が可能
  • 取得開始位置および範囲指定しての複数データ取得可能
  • コンシューマグループの概念によりコンシューマをグループ化し、負荷分散やグループ単位でのデータ配信などが可能

詳しくは公式を参照いただければと思います。

Redisで非同期処理といえばPub/SubやBlocked listがありますが、これらは一度queueからデータを取り出した段階でデータが揮発してしまい、取り出したデータを処理する段階で何かしらの不具合が起きると、データが復元できなくなってしまいます。

また、Pub/Subはpublisher(データをqueueにpushする側)がデータを送信した際subscriber(データをqueueからpullする側)が購読を行わないと、データが揮発してしまいます。

つまり、subscriberが何かしらの理由で購読ができない期間のデータは永遠に読み取ることができません。

Redis StreamsではPub/SubやBlocked listのデータの揮発という欠点を克服し、障害時の復旧も容易になっています。

採用理由

Redis Streamsに関して紹介を行いましたが、私のチームが最終的にイベントドリブンアーキテクチャとしてRedis Streamsを採用した理由は下記の通りです。

  • 既にElastiCache for Redisを導入していた
    • 新規でミドルウェアを追加するコストが掛からず低コストで済むため
  • 他のミドルウェアの知識をチームメンバーが持ち合わせていなかった
    • イベントドリブンアーキテクチャとしてはApache KafkaやAmazon Kinesisなどが考えられるが、所属部署として知見をあまり持ち合わせていなかった
  • 検証の時間が多く取れなかった
    • サービスのリリース時期の関係もあり、完全新規のアーキテクチャを検証するほどの時間を取ることが困難であった

主に費用対効果を考慮した際に、初めてのイベントドリブンアーキテクチャということもあり、低コストで実現できそうな目処が立っているRedis Streamsを採用する方針に至りました。

構成図

最終的にRedis Streamsを採用したアーキテクチャは下図のようになりました。

Redis Streamsを採用したアーキテクチャ
Redis Streamsを採用したアーキテクチャ

EKS上にデプロイされたproducerがElastiCache for Redisに対しpublishを行い、同じくEKS上にデプロイされているconsumerがElastiCache for Redisからpublishされたデータをイベントドリブンで購読しRDS(Aurora)にデータの書き込みを行う構成です。

 

Redis Streamsの処理フロー

次に、私のチームで運用しているRedis Streamsを採用したプロダクトの大まかなシーケンスをコードとともに紹介します。

開発は先述の通りGoで行っており、Redisクライアントにはgo-redisを利用しています。

 

構成図の通りproducerとconsumerの2サービスで構成されているので、それぞれ順に説明します。

producer

producerはStreamへのデータ追加が主な責務です。

producerのシーケンスは下図のようになっています。

procude
producerのシーケンス図

RedisへはXADDコマンドを利用してStreamへデータ(以下entry)をpushします。

具体的には下記のようなコードで実現します。

※以降掲載しているコードは本記事のために簡略化したものです。

// main.go
package main
 
import (
     "context"
 
     "github.com/go-redis/redis/v8"
)
 
func main() {
     // create redis client
     redisClient := redis.NewClient(&redis.Options{
            Addr: "localhost:6379",
     })
 
     ctx := context.Background()
     // handshake
     if err := redisClient.Ping(ctx).Err(); err != nil {
            panic(err) // error handling
     }
     // push to streams
     err := redisClient.XAdd(ctx, &redis.XAddArgs{
            Stream: "sample_streams",
            Values: map[string]interface{}{"sample_key": "sample_value"},
     }).Err()
     if err != nil {
            panic(err) // error handling
     }
} 

producerはStreamへentryをpushするだけの役割なので非常にシンプルです。

redisクライアントの生成後、ハンドシェイクを行い疎通に問題なければXAdd()を実行します。

XAdd()では最低限Stream名および、pushするvalueを指定するだけでいいです。

Streamへのpush時にIDやStreamのサイズなどを自分で指定する場合はオプションとして別途指定が可能です。

consumer

consumerはStreamに存在するentryを購読し、実際にDBなどにデータを追加・更新することが責務となります。

また、consumerでは何かしらの不具合などで処理しきれなかったentryを再度処理するプロセスや、ハードウェアリソースの効率化のために一定期間アイドルになっているconsumerのプロセスをkillする責務もあります。

そのため、consumerでは下記3種類のプロセスが存在します。

  • entry処理用のconsume events
  • pending状態のentryを処理するconsume pending events
  • アイドルconsumerを削除するdelete idle consumer

各processはgoroutineで制御します。

具体的には下記のようなイメージです。

// main.go
package main
 
import (
     "context"
 
     "github.com/go-redis/redis/v8"
)
 
func main() {
     // create redis client
     redisClient := redis.NewClient(&redis.Options{
            Addr: "localhost:6379",
     })
 
     ctx := context.Background()
     ctx1, cancel1 := context.WithCancel(ctx)
     ctx2, cancel2 := context.WithCancel(ctx)
     ctx3, cancel3 := context.WithCancel(ctx)
     defer func() {
            cancel1()
            cancel2()
            cancel3()
            ctx.Done()
     }()
     go ConsumeEvents(ctx1, redisClient)
     go ConsumePendingEvents(ctx2, redisClient)
     go DeleteIdleConsumer(ctx3, redisClient)
} 

子のcontextをプロセス分生成し、各goroutineを制御します。

実際はエラー用のchannelも組み合わせエラーハンドリングを行っていますが今回は割愛いたします。

それでは、各consumerプロセスのシーケンス図とコードを順に説明します。

consume events

consume eventsのプロセスではproducerによって追加されたentryを購読し、実際にDBなどへ追加・更新を行うことが責務です。

consume eventsのシーケンスは下図のようになっています。

consume eventsのシーケンス図
consume eventsのシーケンス図

まず、負荷分散のために複数のconsumerでentryを購読するために、consumer groupの生成を行います。

consumer groupの生成はXGROUP CREATEコマンドで行います。

負荷分散は具体的にはEKS上のconsumer用podを複数台横に展開して行っています。

次に、XREADGROUPコマンドでStreamにpushされたentryを購読します。

XREADGROUPコマンドはStreamにentryがpushされるまで待ち続ける点に注意してください。

正常に購読がされると、entryは保留エントリーリスト(Pending Entry List。以下PEL)に追加され、処理中の状態となります。

購読が問題なくできたら実際にDBなどにデータの書き込みを行いますが、今回は詳細を割愛させていただきます。

データの書き込みなどが正常に完了したらentryは削除して問題ないため、XACKおよびXDELコマンドでStream内からentryの削除を行います。

XACKPELからentryの削除を行い、XDELでStreamからも削除を行います。

entryの削除はStreamのサイズ指定を行っても達成は可能です。producerの項目で紹介をしたXADDコマンドのオプションにあるMAXLENの指定、またはXTRIMコマンドを利用します。

今回はユースケース的にStreamのサイズが指定できない状態だったため、コマンドでの削除を行っています。

XDELを行わない場合、entryは完全にStreamから削除はされずRedisのメモリを圧迫していくため、Streamのサイズ指定をしない場合は必須のコマンドかと思います。

具体的には下記のようなコードで実現します。

// consumer.go
package consumer
 
import (
     "context"
 
     "github.com/go-redis/redis/v8"
     "github.com/google/uuid"
)
 
const(
     streamName = "sample_streams"
     consumerGroupName = "sample_consumer_group"
)
 
func ConsumeEvents(ctx context.Context, redisClient *redis.Client) {
     // handshake
     if err := redisClient.Ping(ctx).Err(); err != nil {
            panic(err) // error handling
     }
     err := redisClient.XGroupCreateMkStream(ctx, streamName, consumerGroupName, "0").Err()
     if err != nil {
            panic(err) // error handling
     }
     id, _ := uuid.NewRandom() // create consumer id
     // event loop
     for {
            streams, err := redisClient.XReadGroup(ctx, &redis.XReadGroupArgs{
                    Streams:  []string{streamName, ">"},
                    Group: consumerGroupName,
                    Consumer: id.String(),
                    Count: 1,
                    Block: 0,
            }).Result()
            if err != nil {
                    panic(err) // error handling
            }
            entries := streams[0].Messages
            for _, entry := range entries {
                    val, ok := entry.Values["sample_key"]
                    if !ok {
                         panic(err) // error handling
                    }
                    // write to DB etc...
                    // delete from pending entry list.
                    err = redisClient.XAck(ctx, streamName, consumerGroupName, entry.ID).Err()
                    if err != nil {
                         panic(err) // error handling
                    }
                    // delete from redis. its option.
                    err = redisClient.XDel(ctx, streamName, entry.ID).Err()
                    if err != nil {
                         panic(err) // error handling
                    }
            }
     }
} 

producer同様、Redisへのハンドシェイクを行い疎通に問題なければXGroupCreateMkStream()にて、consumer groupおよびStreamの生成を行います。

Streamの生成はオプションですが、consumeするStreamが存在しないと後続の購読処理でエラーが出るため、念のためStreamも同時に生成するようにオプションを指定します。

XReadGroup()をcallするためにはconsumerの一意なIDが必要なため、UUID(v4)にてconsumerのID生成を行っています。

consumerのID生成後、イベントループ内にてXReadGroup()をcallしentryを購読します。

購読に際しconsumerがどこから何件読み取るかの指定が可能で、コード中のXReadGroupArgs.Streamsフィールドにて開始位置として>を指定しています。

これは未読のentryを読み取る開始位置で、まだどのconsumerも購読していない、つまりPELにないentryを読み取ることになります。

またXReadGroupArgs.Countフィールドにて1回の購読で何件読み取るかの指定が可能です。

entryを正常に購読できたらDBなどへの書き込み処理を行い、問題なく完了したらentryの削除処理へと移ります。

削除にはXAck()およびXDel()を利用します。単一entryの削除になるのでentry.IDを指定します。

一連の処理が完了すると、イベントループにて次のentryを購読しにいきます。

consume pending events

consume pending eventsのプロセスでは、前項のconsume eventsが何かしらの不具合などでentryを正常に処理できなった場合に、PELに一定時間アイドル状態になっているentryの一覧を取得し、代わりに処理を行うことが責務です。

consume pending eventsのシーケンスは下図のようになっています。

cosume pending eventsのシーケンス図
cosume pending eventsのシーケンス図

consumer groupの生成まではconsume eventsと同じです。

次に発行される、XPENDINGコマンドにて、現在PELに存在するentry一覧の詳細を取得します。

一覧取得後、一定期間アイドル状態になっているentryを抽出し、XCLAIMにて処理対象のconsumerをconsume pending eventsで生成したconsumerに移譲します。

Redis Streamsは非同期的な処理ではありますが、entryがpushされてから実際に購読およびデータ処理が行われるまで基本的に大幅なラグはないため、ある程度の期間(1分など)処理がされていなければconsumer eventsに不具合があったと判断していいと思います。

XCLAIM後はconsumer eventsと同様にDBなどへのデータの書き込みおよびentryの削除を行います。

具体的には下記のようなコードで実現します。

import (
     "context"
     "fmt"
     "time"
 
     "github.com/go-redis/redis/v8"
     "github.com/google/uuid"
)
 
const(
     streamName = "sample_streams"
     consumerGroupName = "sample_consumer_group"
)
 
func ConsumePendingEvents(ctx context.Context, redisClient *redis.Client) {
     // handshake
     if err := redisClient.Ping(ctx).Err(); err != nil {
            panic(err) // error handling
     }
     err := redisClient.XGroupCreateMkStream(ctx, streamName, consumerGroupName, "0").Err()
     if err != nil {
            panic(err) // error handling
     }
     id, _ := uuid.NewRandom() // create consumer id
     // create ticker.
     ticker := time.NewTicker(time.Minute * 1)
     defer ticker.Stop()
     // event loop
     for range ticker.C {
            pendingEntries, err := redisClient.XPendingExt(ctx, &redis.XPendingExtArgs{
                    Stream: streamName,
                    Group:  consumerGroupName,
                    Start:  "0",
                    End: "+",
                    Count:  1,
            }).Result()
            if err != nil {
                    panic(err) // error handling
            }
            pendingEntryIDs := make([]string, 0, len(pendingEntries))
            for _, entry := range pendingEntries {
                    if entry.Idle <= time.Second*60 {
                            continue
                    }
                    pendingEntryIDs = append(pendingEntryIDs, entry.ID)
            }
            // read pending stream messages.
            entries, err := redisClient.XClaim(ctx, &redis.XClaimArgs{
                    Stream:   streamName,
                    Group: consumerGroupName,
                    Consumer: id.String(),
                    MinIdle:  time.Minute * 1,
                    Messages: pendingEntryIDs,
            }).Result()
            if err != nil {
                    panic(err) // error handling
            }
            for _, entry := range entries {
                    val, ok := entry.Values["sample_key"]
                    if !ok {
                         panic(err) // error handling
                    }
                    // write to DB etc...
                    // delete from pending entry list.
                    err = redisClient.XAck(ctx, streamName, consumerGroupName, entry.ID).Err()
                    if err != nil {
                         panic(err) // error handling
                    }
                    // delete from redis. its option.
                    err = redisClient.XDel(ctx, streamName, entry.ID).Err()
                    if err != nil {
                         panic(err) // error handling
                    }
            }
     }
} 

consume pending eventsの場合、PELに存在するentryのアイドル期間を参照するため、イベントループはtime.Tickerを利用した一定間隔のループにするのがいいと思います。

XPendingExt()にてPELに存在するentryの一覧を取得します。

XPendingExt()でも取得範囲の指定が可能です。今回はPELに存在する全entryを対象に取得したかったため、XPendingExtArgs.Endフィールドの値を末尾を示す+としています。

XReadGroupと同様にXPendingExtArgs.Countにてentry一覧から何件読み取るかの指定が可能です。

XPendingExt()でentryの一覧が取得できたらentry.Idleを参照し、一定期間アイドル状態になっているかの確認を行います。

対象のentryのidをsliceに格納し、XClaim()をcallし対象のentryを全てconsume pending eventsで生成したconsumerに移譲します。

XClaim()後はconsume eventsと同様にentryの処理を行います。

delete idle consumer

delete idle consumerのプロセスでは一定期間アイドル状態のconsumerを削除することが責務です。

delete idle consumerのシーケンスは下図のようになっています。

delete idle consumerのシーケンス図
delete idle consumerのシーケンス図

consumer groupの生成まではこれまで同様です。

XINFO CONSUMERSコマンドは指定したconsumer groupの全consumer一覧を取得します。

これにより削除すべきconsumerがあるかどうかの判断が可能です。

削除対象のconsumerが存在する場合、XGROUP DELCONSUMERコマンドにて対象のconsumerを削除します。

具体的には下記のようなコードで実現します。

// consumer.go
package consumer
 
import (
     "context"
     "time"
 
     "github.com/go-redis/redis/v8"
     "github.com/google/uuid"
)
 
const(
     streamName = "sample_streams"
     consumerGroupName = "sample_consumer_group"
)
 
func DeleteIdleConsumer(ctx context.Context, redisClient *redis.Client) {
     // handshake
     if err := redisClient.Ping(ctx).Err(); err != nil {
            panic(err) // error handling
     }
     err := redisClient.XGroupCreateMkStream(ctx, streamName, consumerGroupName, "0").Err()
     if err != nil {
            panic(err) // error handling
     }
     // create ticker.
     ticker := time.NewTicker(time.Minute * 1)
     defer ticker.Stop()
     // event loop.
     for range ticker.C {
            // delete idle consumer.
            infos, err := redisClient.XInfoConsumers(ctx, streamName, consumerGroupName).Result()
            if err != nil || len(infos) == 0 {
                    panic(err) // error handling
            }
            for _, info := range infos {
                    if 0 < info.Pending { // consumer has pending entries.
                            continue
                    }
                    // delete idle consumer that passed period of time.
                    if time.Minute*1 <= time.Duration(info.Idle)*time.Millisecond {
                            if err = redisClient.XGroupDelConsumer(ctx, streamName, consumerGroupName, info.Name).Err(); err != nil {
                                 panic(err) // error handling
                            }
                    }
            }
     }
} 

consumerの削除も一定間隔で問題ないはずなので、先ほどと同様にtime.Tickerを用いたイベントループで処理を行います。

XInfoConsumers()にStream名とconsumer group名を指定し、対象のconsumerの一覧を取得します。

取得後、consumerのステータスを参照し、info.Idleフィールドの値が一定時間経過したconsumerをXGroupDelConsumer()にて削除します。

consumerの指定はinfo.Nameフィールドに格納されているconsumerのidを指定します。

構築・運用をしてみてのprosとcons

実際にRedis Streamsの構築・運用をしてみて良かった点と悪かった点は下記の通りです。

pros

  • 比較的低コストで導入が可能
    • キャッシュ機構としてRedisを採用したことがあればアーキテクチャは既存の踏襲となるため、導入までのコストは低い
    • 上記に付随するが、Redis自体を扱ったことがあれば覚えるべきコマンドも前項で紹介したコマンドが必要十分なので学習コストも低い
  • 運用コストが低い
    • すでにRedisを導入しているアプリケーションであれば、新規で監視などの導入をしなくて良い
  • 処理中にエラーが発生しても揮発せずに再度処理が可能
    • 処理が未完了のentryに関してはPELに保留中のentryとして残り続けるため、エラーハンドリングおよび再実行が容易である
  • アプリケーションおよびredisのスケールアウト・スケールアップで20,000rps程度までは耐える
    • アプリケーションサーバのスケールアウトとElastiCache自体のスケールアップである程度の負荷までは柔軟にスループットを伸ばすことが可能

cons

  • Redis Clusterによる負荷分散ができない
  • 負荷分散のためのシャーディングはコストに見合わない
    • そのままではRedis Clusterによる負荷分散が行えないため、負荷分散をするためにはアプリケーション側でStreamのシャーディングを行う必要がある。
      しかしながら一度シャーディングを行うと動的なスケールイン・アウトが非常にしづらくなる。
      またアプリケーション自体も非常に複雑なものとなるため、シャーディングが必要になるくらいのスループットが求められる場合は他のミドルウェアを検討した方が良い
  • Redis自体の性能限界
    • 負荷上限を見るためシャーディング機構をアプリケーションに実装した状態で負荷試験を行った結果、 スループットを伸ばすためにアプリケーションサーバを横に並べた際にRedis自体の接続数上限に達してしまい約70,000rpsが最大であった
  • Redisがフェイルオーバした場合にはデータが揮発する
    • Redis自体がインメモリキャッシュであるため揮発することを前提に利用をする必要はある。
      対策としては定期的なスナップショットの取得やAWSの場合はAmazon MemoryDB for RedisというRedis互換の永続データストアで回避が可能

Redisを扱ったことがあれば比較的容易にイベントドリブンアーキテクチャの導入と運用が可能だと感じました。

また、再実行によるアプリケーションとしての耐障害性が強いのは運用していく上で非常に心強いです。

しかしながら場合によっては致命的な欠点もあり、特に負荷分散に関しては大きな課題があると感じています。

当初の要件にあった100,000rpsというスループットはリプレイス予定APIのrpsを過去からの一定期間計測した結果、シャーディングを行っていない状態のRedis Streamsの最大スループット以内で余裕を持って収まることが判明したため、要件自体を下方修正しRedis Streamsでも問題ない状態としています。

今後について

今回初めてイベントドリブンアーキテクチャを構築・運用してみましたが、Redisという多くのサービスで採用されているミドルウェアに備わっている機能でも、要件次第では十分にプロダクトに採用可能であることがわかりました。

また、現在チームではユーザ単位の大規模キャッシュ生成のユースケースでもRedis Streamsを利用したイベントドリブンアーキテクチャを採用し運用を行っています。

しかしながらRedis Streamsでは負荷分散や揮発性の問題で用途が限定されるため、今後は負荷分散・データの不揮発性を考慮し、Apache Kafka(Amazon Managed Streaming for Apache Kafka)の採用を目下検討中です。

Kafkaを利用したマイクロサービス間におけるデータ伝播を取り入れ、リプレイスに活用したいと思います。

まとめ

最後にRedis Streamsの導入検討要件を以下にまとめます。

  • データが強整合性を必要としない
  • すでにRedisを利用している、もしくはRedisの採用経験があり低コストでイベントドリブンアーキテクチャを採用したい
  • ~20,000rpsのスループットを低レイテンシで処理する必要がある
  • データの処理順を担保する必要がある
  • Redisで障害が発生した場合にデータが揮発しても問題ない

上記から外れる場合は別途ミドルウェアの検討が必要となり下記が候補になると思います。

 

EXNOAではGAMESプラットフォームを一緒に発展させていくメンバーを募集しております。ご興味のある方はぜひ下記御覧の上ご応募ください!

リクルート - 合同会社EXNOA

シェア

関連する記事

関連する求人

関連するサービス