【新機能】Amazon SQSにFIFOが追加されました!(重複削除/単一実行/順序取得に対応)





https://dev.classmethod.jp/cloud/sqs-new-fifo/

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

SQSの大型アップデートです!

オンプレでエンタープライズな開発を行ったことがある方であれば、分散キューシステムの設計が大変だったと思います。実際のところは高額ライセンス商品を買うしか選択肢はなかったのではと。Amazon SQSの登場によって、今まで実装が大変だったノンコア機能のキューが、超安価に簡単に使えるようになったのは衝撃でした。これだけでクラウドを使う理由になりました。

そして、年月は流れ、この度SQSが進化しました!まずは、今までのSQSの課題についておさらいしたいと思います。

標準キュー

今までのSQSは、メディアエンコーディングや大量タスクの分散処理などに適していましたが、いくつかの用途においてフィットしなかったり、独自実装をする必要がありました。

順番が保証されない

SQSは高可用性を持った分散キューシステムですので、1つのエンドポイントに投げられたメッセージは複製され蓄積されます。そのため、メッセージを取り出した際に、一番古いキューデータを取るとは限りませんでした。メディアエンコーディングであれば実行順番は関係ありませんが、商品をカートに入れて購入して配送といった順番を持つことが必須のワークフローではキューの活用は難しかったです。

SS 2016-11-19 15.19.49

※できるだけFIFOになるように努力をしていますが保証していません。

メッセージが重複する可能性がある

SQSは高可用性を持った分散キューシステムですので、メッセージを取得するプログラムが複数同時に実行されると、タイミングの問題で、同じメッセージを取得してしまう可能性があります。もちろん、そうならないようにメッセージを取得すると取得マークが付いて、処理が終わったら削除、失敗したら戻すといった機能は以前から入っていますが、重複しないことを保証して居ませんでした。ですから、メディアエンコーディングであれば、2回実行しても結果は変わらないですが、商品購入プロセスを考えた場合、1回の購入処理を2回実行してしまう可能性があっては困るわけです。

SS 2016-11-19 15.21.50

呼び出し側が2回呼んだら2回実行

これはSQSの問題では無いのですが、例えばモバイルアプリなど通信が安定しない環境で何かしらSQSにメッセージを入れようとした場合、たまたま通信が切ればアプリ側で再実行してしまうことがあります。そうすると、2回実行されてしまうことがあります。これはソシャゲの設計などで良く考慮されていることで、Re-runnable設計といって、2回トランザクションが呼ばれても状態として異常にならないようにする設計方法です。

SS 2016-11-19 15.22.39

標準キューは、"必ず最低1回は実行されることを保証し、高速にタスクを実行したい時に使うサービス"と憶えておいてください。

FIFOキュー

今回の大型アップデートによってFIFOがサポートされました。つまり、メッセージを入れた順番で処理することが保証されます。

順番が保証されます

もう狂喜乱舞です。1,2,3の順番でメッセージを入れたら、必ず1,2,3の順番が保証されます。

SS 2016-11-19 15.27.45

複数回実行されません

もう狂喜乱舞です。例えば、商品の購入という手続きを2回実行しないことを保証します。

SS 2016-11-19 15.33.24

間違って同じ呼び出しをしたら削除します

もう狂喜乱舞です。呼び出す側が意図せずに2回呼んでしまうことって結構あるんですよね。これを排除するためにRe-runnable設計が必要でしたが、全てのシステムにおいてこれを考慮することは大変です。メッセージの重複をチェックする期間はデフォルトで5分ですので、あくまでも意図せず連続して2回送ってしまった場合の削除と考えておいてください。

SS 2016-11-19 15.36.23

FIFOキュー使ってみる

それでは、FIFOキューを使ってみましょう。とりあえずオレゴンあたりを選んでSQSを作成します。

SS 2016-11-19 16.04.31

FIFOキューの名前は接尾語として.fifoを書く必要があります(.lifoの登場の予感w)。

ここで注目するのは、"メッセージグループID"と、"メッセージ重複削除ID"です。メッセージグループID毎にFIFOでメッセージを受け取ります。メッセージ重複削除IDは、指定したIDが短時間(デフォルト5分)内に再送された場合には、後から来たメッセージを重複として削除する機能です。このIDを自分で管理して付与することもできます。自分で管理するのであればUUID等を付与すれば良いですし、コンテンツがユニークであることが約束されているのであればSHA1でも良いかもしれません。なお、キュー作成において、コンテンツベースの重複削除(ContentBasedDeduplication)をTrueにすることで、メッセージ重複削除IDの自前作成は不要になります。勝手にSHA256でハッシュ化して付与してくれます。

SS 2016-11-19 16.03.16

メッセージを取り出す時の工夫もあります。ワーカーが受信要求試行ID(Receive Request Attempt ID)を指定して受信することで、同じメッセージを受信することができます。ネットワークの問題などが発生した際に、もう1回取れるようにするためにIDを指定します。これは実際のところはUUIDで良いかと思います。受信に失敗したら同じUUIDで取得します。

コマンドラインでFIFOキュー使ってみる

概念が分かりましたので実際にCLIを使ってやってみましょう。FIFOキューを作成する際に属性を指定しますので、属性情報を書いたファイルを用意します。

キューの作成

1
2
3
4
5
$ cat create-queue.json
{
  "FifoQueue": "true",
  "ContentBasedDeduplication": "true"
}

次に、FIFOキューを作成して、URLを変数に入れておきます。

1
2
3
4
5
$ REGION=us-west-2
$ QURL=$(aws sqs create-queue --region $REGION --queue-name myq.fifo
  --attributes file://create-queue.json | jq -r .QueueUrl)
$ echo $QURL

新規作成したキューの設定情報です

SS 2016-11-20 20.15.13

可視性タイムアウトが30秒であることを押さえておいてください。

メッセージの追加

FIFOキューを作成できました。次に、1件メッセージを送ります。

1
2
3
4
5
6
7
$ aws sqs send-message --queue-url  $QURL --region $REGION
  --message-body helloworld --message-group-id group1
{
    "MD5OfMessageBody": "5d41402abc4b2a76b9719d911017cZZZ",
    "SequenceNumber": "18825507834392815111",
    "MessageId": "8f190a7c-d85f-4246-bb19-42cc2fdc8zzz"
}

ここで全く同じBODY内容で時間を置かずに送ってみます。

1
2
3
4
5
6
7
$ aws sqs send-message --queue-url  $QURL --region $REGION
  --message-body helloworld --message-group-id group1
{
    "MD5OfMessageBody": "5d41402abc4b2a76b9719d911017czzz",
    "SequenceNumber": "18825508188058863111",
    "MessageId": "b78be0b9-d026-44de-bd28-31d918b96bbb"
}

そしてキュー内メッセージの数をカウントしてみます。

1
2
3
4
5
6
7
$ aws sqs get-queue-attributes --queue-url $QURL --region $REGION
  --attribute-names ApproximateNumberOfMessages
{
    "Attributes": {
        "ApproximateNumberOfMessages": "1"
    }
}

メッセージを2件入れたのですが、message-deduplication-idが同じだったので後から入ったものが削除されています。なお、同じmessage-deduplication-idだったとしても、5分経過していれば別物として扱いますので注意してください。

続いて、同じmessage-group-idで、異なるメッセージBODYを入れます。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
$ aws sqs send-message --queue-url  $QURL --region $REGION
  --message-body helloworld2 --message-group-id group1
{
    "MD5OfMessageBody": "5d41402abc4b2a76b9719d911017c592",
    "SequenceNumber": "18825508280833007616",
    "MessageId": "0df51fae-17c7-44b9-b60f-f667416241d7"
}
$ aws sqs send-message --queue-url  $QURL --region $REGION
  --message-body helloworld3 --message-group-id group1
{
    "MD5OfMessageBody": "5d41402abc4b2a76b9719d911017c592",
    "SequenceNumber": "18825508282470639616",
    "MessageId": "2ec94b13-3aeb-48b3-beca-5aa87b3b1247"
}
$ aws sqs get-queue-attributes --queue-url $QURL --region $REGION
  --attribute-names ApproximateNumberOfMessages
{
    "Attributes": {
        "ApproximateNumberOfMessages": "3"
    }
}

次に、異なるmessage-group-idで、異なるBODYを入れます。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
$ aws sqs send-message --queue-url  $QURL --region $REGION
  --message-body morining --message-group-id group2
{
    "MD5OfMessageBody": "5d41402abc4b2a76b9719d911017c592",
    "SequenceNumber": "18825508313601775616",
    "MessageId": "020a5d1b-66a7-4735-b14a-10d6e9bdc667"
}
$ aws sqs send-message --queue-url  $QURL --region $REGION
  --message-body morining2 --message-group-id group2
{
    "MD5OfMessageBody": "5d41402abc4b2a76b9719d911017c592",
    "SequenceNumber": "18825508315499503616",
    "MessageId": "da1cecb5-12ab-4603-880c-df10162c5b66"
}
$ aws sqs send-message --queue-url  $QURL --region $REGION
  --message-body morining3 --message-group-id group2
{
    "MD5OfMessageBody": "5d41402abc4b2a76b9719d911017c592",
    "SequenceNumber": "18825508317191407616",
    "MessageId": "b2e7a84c-f666-49c1-b54a-baf285d58f1c"
}
$ aws sqs get-queue-attributes --queue-url $QURL --region $REGION
  --attribute-names ApproximateNumberOfMessages
{
    "Attributes": {
        "ApproximateNumberOfMessages": "6"
    }
}

これで、2つのグループに3件ずつのメッセージが入りました。

メッセージの取得

次にワーカーとしてメッセージを取得してみます。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
$ UUID=$(uuidgen)
$ echo $UUID
1CFC2457-0767-4D4C-9819-C4C22E24071E
 
$ aws sqs receive-message --queue-url $QURL --region $REGION
  --attribute-names All --receive-request-attempt-id $UUID
{
    "Messages": [
        {
            "Body": "morining",
            "Attributes": {
                "ApproximateFirstReceiveTimestamp": "1119638944600",
                "SequenceNumber": "15511131597872111616",
                "SenderId": "ZZZAJXUQ7RG7FZPNOUMRM",
                "MessageDeduplicationId": "56367252a69f6afa4877157f27c3e464e0d13deee67578b154042986cf1f6c15",
                "SentTimestamp": "1479638766260",
                "ApproximateReceiveCount": "2",
                "MessageGroupId": "group2"
            },
            "ReceiptHandle": "AQEBNw8oUaQB0ANhOjNAqWyHS2ZMZUu0rkYrYJMeSl111kCo0v7JPILPLJXA04mWxAXpb9MGnh31pFxzsbOCadtN/sIZYV8thAzZKa5OkF59I5BNMZ+Kqg8HUGorSTGOG9QIan9OcpEemzrcbheGUFk23kkUyzYKLrACCjqcwZaw4yeoPlxyJOglnB3Vya2fJT61uIAdPH2OfV9o85z5txsOCBjFi91nPE1Lu9gzZTYxzPmkzEOp2MGext7plF23p1hKZiFRSaaLi/IvqBRtzMeXGg==",
            "MD5OfBody": "380ef125db2b45da5c643d59165b8706",
            "MessageId": "8a22f17d-6a4e-4887-85a5-84976393cef0"
        }
    ]
}

受信要求試行ID(Receive Request Attempt ID)を指定していますので、間を置かずに連続して何度やっても同じメッセージが取得できます。ただし少し待ってから実行するとエラーになります。以下は、エラーになったときの表示です。おそらく可視性タイムアウトを過ぎたため、他ワーカーも取れる状況になってしまったためのエラーかと思います。あくまでもネットワークエラーなども例外時に用いましょう。

1
An error occurred (InvalidParameterValue) when calling the ReceiveMessage operation: Value YYYYYYYY for parameter receiveRequestDeduplicationId is invalid. Reason: receiveRequestDeduplicationId is invalid.

受信要求試行ID(Receive Request Attempt ID)を別にしてメッセージを取得してみます。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
$ UUID=$(uuidgen)
 
$ aws sqs receive-message --queue-url $QURL --region $REGION
  --attribute-names All --receive-request-attempt-id $UUID
{
    "Messages": [
        {
            "Body": "helloworld",
            "Attributes": {
                "ApproximateFirstReceiveTimestamp": "1479638590679",
                "SequenceNumber": "18825531333333379616",
                "SenderId": "AIDAJXUQZZZ7FZPNOUMRM",
                "MessageDeduplicationId": "936a185caaa266bb9cbe981e9e05cb78cd732b0b3280eb944412bb6f8f8f07af",
                "SentTimestamp": "1479638589763",
                "ApproximateReceiveCount": "5",
                "MessageGroupId": "group1"
            },
            "ReceiptHandle": "AQEB+8sEIDFkOpnHJLbbi/nZgUY4jfwP+y/m4hrf8Qrg0F90Cc3NXROYi1ghcXFVhDjvPNR1TsHmRAeqU+rq0knEyPnX8OOwUtiMty9FEXNiYIAfDq+GCl9509wQUIVwk8rvH02smCRo9RgWXGtqs17XtUvRDnYEyuX8sCMGGbqYTZfgsbtxrSCL2elg5SOV3JNVn3h/iW8UOINgv0xNXMUmYTNYyuoDoJlKRt/9En1I1y9gy+Fdo6+WLOl6rpAFVbBvtFnoOYUj3ecIuXOrktyLdg==",
            "MD5OfBody": "fc5e038d38a57032085441e7fe7010b0",
            "MessageId": "8f932728-5cf1-43a8-be6d-ac3ba8830f7c"
        }
    ]
}

今度は他のメッセージグループIDが取れました。

なお、連続してメッセージを取得しようとすると、グループ数を超えた回数目から何も取得できなくなります。これは可視性タイムアウトが過ぎるまで続きます。

1
2
3
$ aws sqs receive-message --queue-url $QURL --region $REGION --attribute-names All
$ aws sqs receive-message --queue-url $QURL --region $REGION --attribute-names All
$ aws sqs receive-message --queue-url $QURL --region $REGION --attribute-names All

メッセージの削除

取得したメッセージのReceiptHandleを指定してメッセージを削除します。

1
2
3
4
5
6
$ UUID=$(uuidgen)
$ RH=$(aws sqs receive-message --queue-url $QURL --region $REGION
  --attribute-names All --receive-request-attempt-id $UUID
  | jq -r .Messages[].ReceiptHandle)
 
$ aws sqs delete-message --queue-url $QURL --region $REGION --receipt-handle $RH

キューの先頭から1つメッセージを削除しましたので、改めてメッセージを取得すると、次のメッセージが取得されました。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
$ UUID=$(uuidgen)
$ aws sqs receive-message --queue-url $QURL --region $REGION
   --attribute-names All --receive-request-attempt-id $UUID
{
    "Messages": [
        {
            "Body": "morining2",
            "Attributes": {
                "ApproximateFirstReceiveTimestamp": "1479639406758",
                "SequenceNumber": "18825531598680815616",
                "SenderId": "AIDAJXUQ7RG7FZPNOUMRM",
                "MessageDeduplicationId": "c37bc60771b7c18187d9c455edd64b7b35115f351400e432d4eb02318c6918e1",
                "SentTimestamp": "1479638769419",
                "ApproximateReceiveCount": "1",
                "MessageGroupId": "group2"
            },
            "ReceiptHandle": "AQEBsgBd4i7HFqT1Wu59/46FiMnDUYjBuM8TTtLyqBOGDuT3s4fQ04H2W7UGrS5VSVl9fBM/cqs8vpLGLVygPXpdqMiQ38O5EaHdcaEDUKmGCZOj1qIlJojj09RJxwSqKgN6c5HPmzTJhAZdJSBlgaGQSlZFG6xf/0F92m2udWI+6WgONx1vXUxVThF96Fv04vt4fpODtl6Zx+rv/yeb0zOFed2VUQlhC3nJZV5Ezibu7QqxAKEH1l8f0SrcyIihdc8Z8sgq5wiCq9W3Xj+gtzflLg==",
            "MD5OfBody": "d6b5eaab8446ed73a2aef0232643d46e",
            "MessageId": "7be742e7-adb9-4c7b-b265-e2eb4c413dd3"
        }
    ]
}

高速にFIFOするために

ここまで実際に動かしてみて気づいたと思うのですが、なんとなく動きが遅い気がします。おそらく、様々な整合性を取るために裏でチェックが走っているからだと思います。また、グループの先頭しかメッセージを取れませんので、100万件のメッセージが入っていても、グループが1つであれば、ワーカは2つ以上あっても意味がありません。例えば、メッセージの重複だけを排除したいだけで、順序は必要なく、高いスループットでワーカーによる処理を行いたい場合は、メッセージ毎にユニークなUUID等でグループIDを付ければOKです。

こんなときどうなる?

削除したメッセージと同じメッセージ重複削除IDのときの動作

同じメッセージを連続して送ったら重複したメッセージが削除されます。そして、1つ目のメッセージのみ残ります。この状態で、1つ目のメッセージを削除した後に、もう1回同じメッセージを送ったらどうなるでしょうか。答えは、削除された1つ目のみが残っている状態となり、後から入ってくるメッセージは新規に登録されません。短い時間内にキュー内のメッセージが処理されて削除されてしまったとしても、5分以内であれば後から入ってきた重複したメッセージを削除済み扱いとなります。

異なるメッセージグループIDと同じメッセージ重複削除IDの動作

メッセージ重複削除ID(例えば、ContentBasedDeduplicationがTrueにおけるコンテンツハッシュ値)が全く同じ場合において、メッセージグループIDが異なる場合、別物としてキューに蓄積されるか気になります。答えは、メッセージ重複削除IDが同じであれば同じものとして扱われ、連続してキューに送った場合には、1つ目のメッセージのみキューに残ります。もちろん、5分経過すれば、メッセージグループIDが同じか異なるか関係なく、別メッセージとしてキューに残ります。

まとめ

要点をまとめます。

  • 画像変換のような、最低でも必ず1回実行されればOKで、複数回実行されても問題ない処理の場合は今まで通り標準キューを使いましょう。最も安くて早いです。
  • メッセージの重複を避けたいだけの場合は、FIFOキューを使い、メッセージグループID(Message Group ID)をユニークにしましょう。
  • メッセージの順序を守り、重複を避けたい場合は、FIFOキューを使い、適切なメッセージグループIDを使いましょう。
  • メッセージ重複削除ID(Message Deduplication ID)を指定することで、メッセージの重複条件を指定することができます。キュー作成時にコンテンツハッシュ値(SHA256)を指定することもできます。
  • メーッセージ受け取り時は、受信要求試行ID(Receive Request Attempt ID)を用いて万が一時に備えた同じメッセージを取得することもできます。

Amazon SQSのFIFOキューは、今まで独自実装で実現していた業務的な流れの制御を、SQSの仕様でカバーできるようになり、システムをとてもシンプルに開発することができるようになりました。"メッセージグループID"と、"メッセージ重複削除ID"の活用によって、どのような単位で処理の順番を保証するか制御できるようになりました。世の中の多くの業務システムはSQS必須になると予想しています。ぜひご活用ください。















+ Recent posts