상황이해

aws에서 시큐리티그룹을 다루다보면, 시큐리티 그룹 자체를 inbound source에 넣는 일이 자주있다. 이것이 무엇을 의미하는지 매번 헷갈렸으므로 정리

inbound source에 security group 자체를 넣는게 무슨 의미냐고?

해당 security 그룹을 사용하는 모든 리소스에 관해서, inbound를 허용한다는 뜻이다.


가령 다음과 같은 인바운드 설정이 있다고 한다.

Type Protocol Port range Source etc
HTTPS TCP 443 sg-0abdeb09f385a2d  

 

이때 위 인바운드 규칙을 사용하는 리소스는 sg-0abdeb09f385a2d을 사용하는 모든 리소스로부터의 인바운드를 허용하게 된다. 이것이 갖는 장점은 다음과 같다. 

 

ec2 가  sg-0abdeb09f385a2d를 공통적으로 사용하는 리소스라고 하자. 이때, sg-0abdeb09f385a2d를 사용하는 ec2가 오토스케일링으로 늘어난다고 해도, 위의 인바운드 설정에 의해 새로 생긴 ec2로부터의 인바운드도 모두 허용할 수 있게 된다.

 

 

 

 

 

 




Amazon API GatewayからAWS Lambdaにパラメータを渡す方法

この記事は最終更新日から1年以上が経過しています。

概要

Amazon API GatewayからAWS Lambdaにパラメータを渡す方法のまとめ。

URLに含まれるPathパラメータを渡す

/users/{userId} - GETのような形でURLパラメータに含まれている場合は画面右上の「統合リクエスト」より以下のようなマッピングテンプレートを追加する。
Content-Typeにはapplication/jsonを指定する事。

マッピングテンプレート
{
  "userId": "$input.params('userId')"
}

対応するLambdaのコードは下記の通り。
今回のサンプルコードではDynamoDBからURLパラメータで指定したuserIdに紐づく値を取得して返却する。

DynamoDBには下記のようなデータが入っている。

API Gatewayのテスト呼出で以下のようにPathパラメータを指定する。

ExpressionAttributeValuesの中のevent['userId'] で指定したuserIdが展開される。

aws-lambda-sample/find-dynamodb.js
'use strict';

const AWS = require('aws-sdk');
const dynamo = new AWS.DynamoDB.DocumentClient();

exports.handler = (event, context, callback) => {

  const params = {
    TableName: 'users',
    FilterExpression : 'id = :val',
    ExpressionAttributeValues : {':val': event['userId']}
  };

  dynamo.scan(params, (error, data) => {
    if (error) {
      callback(
        Error('Fail. err:' + error)
      );
    } else {
      callback(null, data);
    }
  });
};
response
{
  "Items": [
    {
      "id": "d684f349-fb73-4b3d-8a8f-61feb9fb0714",
      "email": "hoge-huga9999@gmail.com"
    }
  ],
  "Count": 1,
  "ScannedCount": 11
}

POSTでJSONを送信する

次は登録用のエンドポイントに対して、JSON形式でリクエストする例を見てみる。
今回利用するのは /users - POSTでリクエストパラメータとして"email"を受け取り、受け取った値をDynamoDBに登録する。

「統合リクエスト」のマッピングテンプレートに以下を追加する。

マッピングテンプレート
{
  "email": $input.json("$.email")
}

対応するLambdaのコードは下記の通り。
IDには適当にUUIDバージョン4形式の値を入れておく。

aws-lambda-sample/save-dynamodb.js
'use strict';

const AWS = require('aws-sdk');
const dynamo = new AWS.DynamoDB.DocumentClient();
const uuid = require('node-uuid');

exports.handler = (event, context, callback) => {

  const params = {
    TableName: 'users',
    Item: {
      id: uuid.v4(),
      email: event['email']
    }
  };

  dynamo.put(params, (error) => {
    if (error) {
      callback(
        new Error('Fail. err:' + error)
      );
    } else {
      callback(null, params);
    }
  });
};

リクエストヘッダーにContent-Type: application/json;を設定し任意のメールアドレスを指定する。

以下のレスポンスが返却され、DynamoDBにもデータが登録されている。

レスポンス
{
  "TableName": "users",
  "Item": {
    "id": "f26b7ce0-efaf-4e64-b891-7c23cae3a321",
    "email": "hoge-hoge-1234@gmail.com"
  }
}

今回利用したサンプルコード

githubに上げてあります。

その他のパラメータの送信方法

見つかり次第こちらに記事に追記していきます。


 

 

https://www.slideshare.net/keisuke69/tune-up-aws-lambda/9

 

 

 

 

 

 

S3バケットの内容をLambda関数で読み込んでみる(Python)

Lambda関数からS3を操作する練習です。
S3にファイルをアップロードしたタイミングでLambda関数が実行されるように設定します。
アップロードされたタイミングで、バケット名やファイルの一覧を取得する関数を書いてみます。

S3バケットを作成する

「バケットを作成する」をクリックします。
S3 Management Console 2018-01-06 09-29-55.png

バケット名とリージョンを入力します。
S3 Management Console 2018-01-06 09-31-18.png

プロパティの設定は特に何もせずに「次へ」をクリックします。
次の「アクセス許可の設定」はユーザーに対する設定のみです。とりあえずそのまま「次へ」をクリックします。

S3 Management Console 2018-01-06 09-35-11.png

Lambdaにファイルをアップロードする

適当なファイルをアップロードしてみます。
S3 Management Console 2018-01-06 09-38-15.png

ドラッグアンドドロップでファイルのアップロードができます。
S3 Management Console 2018-01-06 09-39-58.png

「アップロード」をクリックします。
S3 Management Console 2018-01-06 09-41-26.png

以下のようにアップロードしたファイルが表示されます。
S3 Management Console 2018-01-06 09-42-26.png

IAMに権限を設定する

Lambdaを操作するIAMにS3へのアクセス権限を与えます。
IAM Management Console 2018-01-06 09-57-31.png

AmazonS3FullAccessポリシーはフルアクセス権を与えてしまうので、どのS3バケットにもアクセスできてしまうことには注意が必要です。今回はサンプルなのでFullAccessを与えますが、実運用に入る際は、インラインポリシーを設定してポリシーを絞るのがいいと思います。
IAM Management Console 2018-01-06 09-58-50.png

Lambda関数を作成する

設計図s3-get-object-pythonから作成します。
Lambda Management Console 2018-01-06 10-04-43.png

Lambda Management Console 2018-01-06 10-07-20.png Lambda Management Console 2018-01-06 10-07-37.png

boto3で何ができるかは本家のドキュメントをじっくり読むのがいいと思います。
ネットの情報は古くなってしまったものも多く、単純に真似するだけでは動かないものもありました。
「Boto 3 Docs documentation」
http://boto3.readthedocs.io/en/latest/reference/services/s3.html#client

こちらのサンプルはちゃんと動いてわかりやすかったです。
https://github.com/bloomberg/chef-bcs/blob/master/cookbooks/chef-bcs/files/default/s3-example-boto3.py

以下のような関数を作ってみました。

  • S3バケット名を取得する
  • S3バケット内のファイルの一覧を取得する
  • S3バケット内のファイルの詳細情報を表示する

などを行っています。

from __future__ import print_function

import json
import urllib
import boto3

print('Loading function')

#http://boto3.readthedocs.io/en/latest/reference/services/s3.html#client
s3 = boto3.client('s3')


def lambda_handler(event, context):


    bucket_name = event['Records'][0]['s3']['bucket']['name']
    print('==== bucket information =====')
    print(bucket_name)
    print('=============================')


    #https://github.com/bloomberg/chef-bcs/blob/master/cookbooks/chef-bcs/files/default/s3-example-boto3.py
    print('==== your bucket list ====')
    buckets = s3.list_buckets()
    for bucket in buckets['Buckets']:
        print(bucket.get('Name'))

    print('==== file list in bucket ====')
    # https://github.com/boto/boto3/issues/134
    AWS_S3_BUCKET_NAME = 'example.read.000'
    s3_resource = boto3.resource('s3')
    bucket = s3_resource.Bucket(AWS_S3_BUCKET_NAME)
    result = bucket.meta.client.list_objects(Bucket=bucket.name, Delimiter='/')
    for o in result.get('Contents'):
        print(o.get('Key'))


    print('==== file details ====')
    # https://docs.aws.amazon.com/ja_jp/AmazonS3/latest/dev/UsingObjects.html
    GET_OBJECT_KEY_NAME = 'sample.png'
    object = s3.get_object(Bucket=AWS_S3_BUCKET_NAME, Key=GET_OBJECT_KEY_NAME)
    print('ContentType ->' + str(object.get('ContentType')))
    print('ContentLength ->' + str(object.get('ContentLength')))


    print('==== uploaded file ====')
    for rec in event['Records']:
        print(rec['s3']['object']['key'])
    print('=============================')

    print("Received event: " + json.dumps(event, indent=2))

テストを実行すると以下のようなログが表示されます。

START RequestId: bdbee72e-f28b-11e7-a401-0d0509334621 Version: $LATEST
==== bucket information =====
sourcebucket
=============================
==== your bucket list ====
example.read.000
learnjs.benrady.com
==== file list in bucket ====
sample.png
sample2.png
==== file details ====
ContentType ->image/png
ContentLength ->82361
==== uploaded file ====
HappyFace.jpg
=============================

S3にオブジェクトをプットしても同じように関数が実行されます。


title: [AWS] Lambda + API Gatewayでサーバレスを始める 1
tags: AWS lambda APIGateway サーバレスアーキテクチャ
author: naoki_koreeda

slide: false

Lambda + API Gatewayでサーバレスを始める 1(https://qiita.com/naoki_koreeda/items/c2a32198c86e8d9a5bb6)

とりあえず、AWS Lambda と API Gateway でサクッとサーバレスをやってみます。
マネジメントコンソールから全部の実装・設定をやってしまいます。
AWS SAM とかは使ってないので、そのあたり(バージョン管理やCLIからのデプロイなど)は別の記事などで補完をお願いします。

ロールの作成

Lambda ファンクション作成時にロールを設定する必要があるので、あらかじめ IAM でロールを作成しておきます。

IAM > ロール > 新しいロールの作成

ロールタイプの選択

AWS Lambda を選択。

スクリーンショット 2017-07-03 9.31.23.png

ポリシーのアタッチ

ここで実行時に必要な権限をアタッチします(DynamoDBとかS3とかのアクセス権限)。
今回は特に何も必要ないので、AWSLambdaBasicExecutionRole をアタッチし、CloudWatch Logs へのアクセス権限のみ付与します。
チェックを入れて「次のステップへ」クリック。

スクリーンショット 2017-07-03 9.45.00.png

確認

ロール名を入力し(必要ならDescriptionを入力し)、ロールの作成をクリック。

スクリーンショット 2017-07-03 9.49.13.png

Lambda ファンクションの作成

先に API Gateway の設定をすることもできますが、ここでは先に Lambda を書きます。

Lambda > Lambda関数の作成 > Blank Function > (トリガーは設定せずに)次へ

関数の設定

関数名を入力し、ランタイムを選択します。
今回は Node.js 6.10 にします。

スクリーンショット 2017-07-03 9.51.41.png

先ほど作成したロールを選択して次へ。

スクリーンショット 2017-07-03 10.07.41.png

確認

内容確認して、「関数の作成」クリック。

スクリーンショット 2017-07-03 10.09.54.png

コードの変更

ひとまずJSONを返却するようにします。

exports.handler = (event, context, callback) => {
    callback(null, {"name": "hoge"});
};

API Gateway の設定

API Gateway > APIの作成

API名を入力して、「APIの作成」クリック。

スクリーンショット 2017-07-03 14.04.15.png

リソース作成

リソースのアクションから、メソッドの作成を選択。

スクリーンショット 2017-07-03 14.07.43.png

GETを選択肢、チェックマークをクリックして確定。
セットアップ画面になるので、Lambdaを作成したリージョン※1とLambdaファンクション名※2を入力して保存。
※1 : コンソールの上部で表示されているリージョンで作成されています。東京なら「ap-northeast-1」を選択。
※2 : 最初の文字を入力すれば、候補が表示されます。

スクリーンショット 2017-07-03 14.10.22.png

リソースが作成されたので、ひとまずテストを実行してみます。

スクリーンショット 2017-07-03 14.16.49.png

「テスト」ボタンをクリックすると、成功していることがわかります。
ステータスが200で、JSONが返ってきていることを確認してください。

スクリーンショット 2017-07-03 14.18.15.png

APIのデプロイ

リソースのアクションから「APIにデプロイ」を選択。

スクリーンショット 2017-07-03 14.44.41.png

「新しいステージ」を選択し、ステージ名を入力。
ここでは「prod」と入力。

スクリーンショット 2017-07-03 14.45.51.png

デプロイすると、APIエンドポイントが作成されます。

スクリーンショット 2017-07-03 14.48.14.png

CORSの設定

CORSの設定は、 API Gateway リソースのアクションから設定できます。

スクリーンショット 2017-07-03 14.54.42.png

設定を変更したあとは、再度APIをデプロイする必要があります。

パスパラメータ

「/prod/:id」ってしたい時の設定方法。

リソース > アクション > リソースの作成

スクリーンショット 2017-07-03 15.15.36.png

新たに子リソースを追加します。
その際、リソースパスの文字列を{}で括ります。

スクリーンショット 2017-07-03 15.17.37.png

リソース作成後、{id}にGETメソッドを作成します。
Lambdaの設定をします。

スクリーンショット 2017-07-03 15.23.44.png

APIのテスト実行画面で、パスが入力可能になっていることを確認します。

スクリーンショット 2017-07-03 15.25.03.png

Lambdaからパラメータを取得できるように設定します。
統合リクエストをクリック。

スクリーンショット 2017-07-03 15.31.25.png

本文マッピングテンプレート > マッピングテンプレートの追加
「application/json」と入力してチェックマーククリックで確定。

スクリーンショット 2017-07-03 15.32.54.png

以下のように、マッピングを記述します。

スクリーンショット 2017-07-03 15.36.18.png

{
  "id": "$input.params('id')"
}

Lambdaでパラメータを取得できるようになったので、以下のように変更します。
パラメータはeventから取得できます。

exports.handler = (event, context, callback) => {
    callback(null, {"id": event.id, "name": "hoge"});
};

テストを実行し、入力値が返ってきていることを確認してください。

スクリーンショット 2017-07-03 15.42.11.png

クエリパラメータ

「/prod?name=:name」ってしたい時の設定方法。

メソッドリクエストから設定します。

スクリーンショット 2017-07-03 15.45.10.png

URL クエリ文字列パラメータに「name」を入力し、チェックマークで確定します。

スクリーンショット 2017-07-03 15.46.17.png

パスパラメータの時と同様、統合リクエストからマッピングテンプレートを設定します。

スクリーンショット 2017-07-03 15.48.54.png

Lambdaを変更します。

exports.handler = (event, context, callback) => {
    callback(null, {"id": event.id, "name": event.name});
};

テストで、クエリ文字列が入力可能になっています。
入力値が返ってきていることを確認してください。
※idはクエリ文字列で入力できないのでundefinedになり、nameだけがJSONで返却されます。

スクリーンショット 2017-07-03 15.57.57.png

文字化けするときは

日本語が文字化けする場合、メソッドレスポンスで文字コードを設定します。

スクリーンショット 2017-07-03 16.02.49.png

200のレスポンス本文の「application/json」を編集(ペンマークをクリック)します。

スクリーンショット 2017-07-03 16.05.20.png

「;charset=UTF-8」を追記し、保存します。

スクリーンショット 2017-07-03 16.07.20.png

その2に続く→

'Cloud > AWS' 카테고리의 다른 글

lambda lifecycle  (0) 2019.05.27
Lambda에서 파이썬으로 S3연결하기  (0) 2019.05.23
서버리스 아키텍쳐(Serverless)란?  (0) 2019.05.14
[AWS]kinesisまとめ  (0) 2019.03.13
事例からAmazon Kinesisとは何なのかを学ぶ  (0) 2019.03.13

서버리스 아키텍쳐(Serverless)란?(https://velopert.com/3543)

2018년 2월 11일 velopertdev.log / serverless / tech.log4 Comments

 

서버리스(Serverless)를 직역하자면, “서버가 없다” 라는 의미가 있습니다. 하지만, 사실상 서버가 없는건 아닙니다. 그저, 특정 작업을 수행하기 위해서 컴퓨터를 혹은 가상머신에 서버를 설정하고, 이를 통하여 처리 하는 것이 아님을 의미합니다.

그 대신에, BaaS (Backend as a Service) 혹은 FaaS (Function as a Service) 에 의존하여 작업을 처리하게 됩니다. BaaS 를 제공하는 서비스 중에선, Firebase, Parse (지금은 서비스종료 됨), Kinvey 등이 있고, FaaS 를 제공하는 서비스 중에선, AWS Lambda, Azure Functions, Google Cloud Functions 등이 있습니다.

앞으로 작성 될 튜토리얼은 주로 FaaS 에 대하여 다뤄보게 될 것이며, 이번 포스트에서는 FaaS 는 기존의 기술들과 어떠한 차별성이 있는지 다뤄보겠습니다.

기존의 기술들

자체적 시스템 설계

시스템에서 필요한 모든 인프라를 직접 관리하는 것을 의미합니다. 전산실 이라는 키워드를 생각하시면 이해하기 쉬울 것입니다. 공간, 하드웨어, 네트워크, 운영체제, 모두 직접 관리를 해주는 거죠. 이 방식에서의 가장 큰 문제점은 시스템이 엄청 커지면 전산실을 유지 할 관리자가 필요 할 것이고, 이 인력에 대한 비용이 나간다는 점입니다.

IaaS (Infrastructure as a Service)

그리고, AWS, Azure 등의 서비스가 만들어지기 시작했죠. 더 이상 서버자원, 네트워크, 전력 등의 인프라를 모두 직접 구축 할 필요 없어졌습니다. 이러한 인프라를 가상화하여 관리하기 쉽게 해주는 서비스 서비스를 통하여, 관리자패널에서 인프라를 구성하고, 사용하면 되죠. 사용자는 가상머신을 만들고, 네트워크를 설정하고, 하드웨어도 설정하고, 거기에 운영체제를 설치해서 애플리케이션을 구동 할 수 있습니다. 사용량을 쉽게 모니터링 할 수 있습니다.

PaaS (Platform as a Service)

IaaS 에서 한번 더 추상화된 모델이라고 생각하시면 됩니다. 네트워크, 그리고 런타임까지 제공이됩니다. 사용자는 이제 애플리케이션만 배포하면 바로 구동시킬 수 있습니다. 대표적으로 AWS Elastic Beanstalk, Azure App Services 등이 있습니다. 이를 사용하면 Auto Scaling 및 Load Balancing 도 손쉽게 적용 할 수 있습니다.

Serverless! – BaaS 와 FaaS

Serverless, 즉 서버가 없다는 의미입니다. 이는 BaaS 와 FaaS, 이렇게 두가지로 나뉘어질 수있는데요, 서버가 없다는건, 그냥 표현일 뿐, 사실 작업을 처리하는 서버는 존재하긴 합니다. 다만, “서버의 존재”에 대해서 신경쓰지 않아도 됩니다. 서버가 어떤 사양으로 돌아가고있는지, 서버의 갯수를 늘려야 할지, 네트워크는 어떤걸 사용할지, 이런걸 설정할 필요가 없습니다.

BaaS (Backend as a Service)

보통, 우리가 모바일 혹은 웹 애플리케이션을 만들게 될 때, 백엔드 서버개발을 진행하게 됩니다. 엄청 단순하게 생각하자면, 계산기, 혹은 그림판 수준으로 프론트엔드 쪽 코드로만 충분히 이뤄질 수 있다면, 백엔드 서버를 만들 필요가 없겠죠. 하지만, 데이터를 저장하고, 다른 기기에서도 접근하고, 공유하기 위해서는, 백엔드 개발은 필수적입니다.

서버 개발을 하다보면, 고려할 사항이 꽤 많죠. 유저가 늘어나게 된다면 서버의 확장도 고려해야 하고, 보안성 또한 고려해야 하죠. 그래서 탄생한 서비스가, Firebase 같은 BaaS 입니다. 이 시스템에서는, 앱 개발에 있어서 필요한 다양한 기능들 (데이터베이스, 소셜서비스 연동, 파일시스템 등)을 API로 제공해 줌으로서, 개발자들이 서버 개발을 하지 않고서도 필요한 기능을 쉽고 빠르게 구현 할 수 있게 해주고, 비용은 사용 한 만큼 나가게 되죠. 서버의 이용자가 순식간에 늘어나게 되어도, 따로 대비를 안해주어도 알아서 확장이 됩니다.

현존하는 BaaS 중 가장 대중화 되어있는것은 Firebase 이기 때문에, 이 포스트에선 Firebase 를 사용한다는 가정하에 장점과 단점들을 나열해보도록 하겠습니다.

BaaS를 사용함에 있어서, 가장 큰 장점은 개발 시간의 단축 (회사 입장으로서 생각한다면, 인건비), 서버 확장 작업의 불필요함입니다. 백엔드에 대해서 지식이 별로 없더라도, 아주 빠른 속도로 개발이 가능합니다. 특히, Firebase 에서는 실시간 데이터베이스를 사용하여 데이터가 새로 생성되거나, 수정되었을 때 소켓을 사용하여 클라이언트에게 바로 반영시켜주는 기능이 있는데요, 이러한 기능은 직접 개발하게 된다면 구조 설정에 꽤 많은 시간이 필요 할 수도 있는데 이를 단지 코드 몇줄만으로 구현 할 수 있게 해주는 멋진 기능들을 지니고 있습니다. 추가적으로, 일정 사용량 만큼 무료로 사용 할 수 있기 때문에 토이 프로젝트, 소규모 프로젝트의 경우 백엔드로서 매우 유용하게 사용 할 수 있습니다.

하지만, 무조건 장점만 있는것은 아닙니다. BaaS를 사용함으로서, 발생하는 대표적인 단점으로는 다음과 같은것들이 있습니다.

1. 클라이언트 위주의 코드

이 부분은 어떻게 관리하냐에 따라 다르긴 하겠지만, BaaS 를 사용함으로서, 백엔드 로직들이 클라이언트쪽에 구현이 됩니다. 예를들어 이메일 발송, 결제 처리 등의 작업들을 클라이언트에서 수행하고 싶지는 않겠죠?

Firebase 의 경우에는 서버쪽에서도 사용이 가능하긴 합니다. Firebase SDK 를 불러와서 사용하는거죠. 일부 로직을 직접 서버측에서 구현할 바에, 그냥 모든 로직을 직접 구현하는건 어떨까요?

추가적으로, 데이터단의 로직이 변경되면… 클라이언트 코드의 수정이 이뤄지게됩니다. 그러면, 재배포를 해야되겠죠? 웹어플리케이션이라면 JS 를 새로 받아야하는데, 이건 별로 큰 일은 아니지만, 모바일 앱이라면, 앱 업데이트를 해야합니다. 그리고 상황에 따라서 구버전 사용자를 강제 업데이트해야 하는일이 발생 할 수도 있겠죠.

2.가격

Firebase 의 경우엔 초반엔 무료입니다. 이는 소규모 프로젝트에는 정말 매력적으로 다가 올 수 있는 장점인데요, 하지만 앱의 규모가 커지면, 가격이 꽤 비쌉니다. 실시간 데이터베이스에 10G 가 쌓이고, 한달 전송되는 데이터의 양이 20G 정도면 데이터베이스 비용으로만 $70 가 발생합니다.

참고로 클라우드 컴퓨팅 호스팅을 해주는 서비스 Vultr 의 가격대를 보면, $10 면 40GB SSD, 2000GB 월 대역폭을 사용 할 수 있습니다.

따라서, 사용자가 별로 없을 것 같은 서비스면 Firebase 는 정말 좋은 선택입니다. 하지만, 서비스의 규모가 커질수록 직접 구현을 했을 때 대비 지출되는 비용이 늘어날 것입니다.

3. 복잡한 쿼리가 불가능함

Firebase 는 데이터베이스가 하나의 큰 JSON 객체로 구조화 되어있습니다. RDBMS 의 테이블, 관계 같은 개념이 존재하지 않기 때문에 최대한 데이터베이스 모델을 비정규화 하여 사용하는 것이 좋습니다. 예를들어서 게시글을 조회한다고 가정해봅시다.

Firebase 를 통하여 다음과 같은 작업은 할 수 있습니다

  • 2018년 1월 1일 ~1월 31일 사이에 작성된 글 조회
  • velopert 가 작성한 글 조회

그런데 이런건 못합니다.

  • velopert 가 1월 1일 이후에 작성한 글 조회

그래서, 이러한 작업을 하려면 모델링을 할 때 다음과 같이 username_date 라는 필드를 만들어야 합니다.

{ username: 'velopert', date: '20180101', username_date: 'velopert_20180101' }

그렇게 하고 나서, username_date 를 가지고 쿼리를 해야합니다.
불가능한건 아닌데… 이렇게 까지 해야하나 싶죠. 저는 맘에 안듭니다.

저의 경우엔 Firebase 를 사용하여 특정 유저를 팔로우하고, 팔로우한 유저의 글을 모아보는 기능을 구현하다가 Firebase 만으로는 해결 할 수 없는 한계를 느껴 때려치고 백엔드 개발을 처음부터 다시 한 경험이 있습니다.

정리

정리를 하자면, Firebase 는 어떠한 개발자들은 매우 강력한 도구로서 사용 하는 반면에, 저를 비롯한 어떠한 개발자들은 Firebase 의 구조를 불편해하고, 한계를 느끼는 사람들도 있습니다. 그리고 분명히, 그러한 한계는 이런저런 꼼수를 사용하여 극복할 수 있다고 생각하지만, 굳이 그렇게해야하나 싶습니다.

실시간 데이터베이스가 정말로 필요한 서비스라면, 일부 기능에서 Firebase 를 사용하는것은 정말 좋은 선택일 수도 있습니다. 비용이 비싸질 수도 있지만 그러한 기능을 구현하기 위해서 들어가는 많은 개발 공수가 절약 될 수도 있습니다. 참고로, 실시간 기능을 최소 공수로 구현하고 싶다면 Feather.js 라는 것도 있으니 참고해보시길 바랍니다! (아, 물론 이건 서버가 필요합니다.)

FaaS (Function as a Service)

FaaS 는 프로젝트를 여러개의 함수로 쪼개서 (혹은 한개의 함수로 만들어서), 매우 거대하고 분산된 컴퓨팅 자원에 여러분이 준비해둔 함수를 등록하고, 이 함수들이 실행되는 횟수 (그리고 실행된 시간) 만큼 비용을 내는 방식을 말합니다.

우리가 등록한 함수는 특정 이벤트가 발생했을때 실행됩니다.

  • 주기적으로 실행되게끔 설정 할 수 있습니다. 5분마다, 1시간마다, 하루마다 이런식으로 말이죠. 크롤링 작업, 주기적 처리를 할 때 좋습니다.
  • 웹 요청을 처리 할 수도 있습니다. 예를 들어서 특정 URL 로 들어오면 어떠한 작업을 하게끔 할 수 있죠. 이를 통하여 백엔드 API 를 구성 할 수도 있습니다.
  • 콘솔을 통하여 직접 호출 할 수도 있습니다.

PaaS 와의 주요 차이점

서버 시스템에 대해서 신경쓰지 않아도 된다는 점이 PaaS 와 유사하기도 한데요, 가장 중요한 차이점은, PaaS 의 경우엔, 전체 애플리케이션을 배포하며, 일단 어떠한 서버에서 당신의 애플리케이션이 24시간동안 계속 돌아가고 있다는 점 입니다.

반면 FaaS 는, 애플리케이션이 아닌 함수를 배포하며, 계속 실행되고 있는 것이 아닌, 특정 이벤트가 발생 했을 때 실행되며, 실행이 되었다가 작업을 마치면 (혹은 최대 타임아웃 시간을 지나면) 종료됩니다.

장점

  1. 비용: 특정 작업을 하기 위하여 서버를 준비하고 하루종일 켜놓는것이 아니라, 필요할때만 함수가 호출되어 처리되며 함수가 호출된 만큼만 비용이 드므로, 비용이 많이 절약됩니다.
  2. 인프라 관리: 네트워크, 장비 이런것들에 대한 구성 작업을 신경 쓸 필요 없습니다.
  3. 인프라 보안: 리눅스 업데이트, 최근 발생한 Intel Meltdown 취약점 보안패치, 이런것들 또한 신경 쓸 필요 없습니다.
  4. 확장성: FaaS 는 확장성 면에서 매우 뛰어납니다. 일반적으로, FaaS 를 사용하지 않는다면, 다양한 트래픽에 유연한 대응을 하기 위하여 우리는 AWS 의 Auto Scaling 같은 기술을 사용합니다. 이를 통하여 CPU 사용량, 네트워크 처리량에 따라 서버의 갯수를 늘리는 방식으로 처리를 분산시키는데요, FaaS 를 사용하게 되면 이렇게 특정 조건에 따라 자동으로 확장되는 것이 아닙니다. 그냥, 확장됩니다. 함수가 1초에 1개가 호출되면 1개가 호출되는것이고, 100,000,00 개가 호출되면 100,000,00 개가 호출되는것입니다. 그리고 호출된 횟수 만큼 돈을 내는거죠.

단점

  1. 제한: 모든 코드를 함수로 쪼개서 작업하다보니, 함수에서 사용 할 수 있는 자원에 제한이 있습니다. 하나의 함수가 한번 호출 될 때, AWS 에서는 최대 1500MB 의 메모리까지 사용 가능하며, 처리시간은 최대 300 초 까지 사용 가능합니다. 때문에, 웹소켓 같이 계속 켜놔야 하는것은 사용하지 못합니다. 그 대신에, AWS IoT, Pusher 등의 서비스를 사용하면 됩니다.
  2. FaaS 제공사에 강한 의존: AWS, Azure, Google 등의 FaaS 제공사에 강한 의존을 하게 됩니다. 즉, 갑자기 이 회사들이 망해버리면…? 정말 골치 아프겠죠. 물론 가능성은 매우 희박합니다.
  3. 로컬 데이터 사용 불가능: 함수들은 무상태적(stateless)입니다. 때문에, 데이터를 로컬 스토리지에서 읽고 쓸 수 없습니다. 그 대신에, AWS 라면 S3, Azure 라면 Storage 를 이용 할 수 있습니다.
  4. 비교적 신기술: FaaS 는 비교적 새로운 기술 입니다. 물론 AWS 에서 Lambda 는 2014년에 등장하긴 했지만요, 주관적으로 보기엔, 2016년쯤 사용률이 올라가기 시작했으며, 이제 기업에서 사용한 사례들도 여럿 등장하며 자리를 잡아가고 있습니다. 2018년, 아직까지는, 해외에서는 관련 자료들을 볼 수 있는 반면, 국내에서는 관련 자료를 찾아보기가 힘듭니다. 아마 2020년 쯤에는 조금 더 국내에서도 관련 자료를 많이 찾아 볼 수 있을 것이라 예상합니다.

Use case

그렇다면, FaaS 는 어떤 용도로 사용 될 수 있을까요? 다음 예제들을 살펴보면 감이 잡힐 것입니다.

  • Backend: 서비스의 백엔드를 FaaS 로 구현 할 수 있습니다.
  • Crawler: 주기적으로 페이지를 긁어서 수집 할 수 있습니다.
  • 파일 처리: 파일을 업로드하고, 화질/사이즈를 조정하고, S3 같은 스토리지에 저장하는 기능을 구현 할 수 있습니다.
  • 로그 분석 / 실시간 모니터링: 예를 들어, 특정 컴퓨팅 자원이 CPU 사용량이 70% 에 도달 했을 때, Slack 등을 통하여 알림을 받고 싶다면 AWS 의 Cloudwatch/CloudTrail 과 연동하여 알림을 받을 수 있습니다.
  • 자동화 작업들: Netflix 의 경우엔, 동영상이 됐을 때, 인코딩하고, 검증하고, 태깅하고, 공개하는 작업들을 Lambda 를 통하여 자동화 시켰습니다. 그리고 또, 백업 관련 작업도 Lambda 로 처리했다고 합니다.
  • 수많은 것들을 할 수 있습니다!

FaaS 를 어떻게 배워야 할까?

아직까지는, 국내에 자료가 많지 않아서 공부하기가 힘들 수 도 있습니다. 하지만 걱정하지 마세요! 저를 따라오시면 됩니다.:D 우리는 앞으로, Hello World 부터 시작해서, 프로덕션에서도 사용 할 수 있을정도로 학습을 해보도록 하겠습니다.

다음 튜토리얼: AWS Lambda 로 하는 Hello World!

[AWS]kinesisまとめhttps://qiita.com/yutachaos/items/2b0b8d9bfe76a597953c

AWSのkinesisについて学んだ事をまとめます。
kinesisの概要やベストプラクティス、豆知識取得のためにどうぞ。

1 kinesisとは

AWS kinesisには大きく分けて3つの機能があります。

  • Amazon Kinesis Data Streams
  • Amazon Kinesis Data Firehose
    どちらもコンピュータから送られてくるログやイベントデータ等の大量のデータを高速に別のサービスに転送するためのサービス。 違いや使い分けについてはコチラのサイトで詳しく解説されていました。 https://dev.classmethod.jp/cloud/aws/difference-between-kinesis-streams-and-kinesis-firehose/
    一言で言うなら Amazon Kinesis Data Streamsのほうが速い。 Amazon Kinesis Data Firehoseのほうが設定が少なくて実装が楽。 って感じです。
  • Amazon Kinesis Data Analytics
    コンピュータやAmazon Kinesis Data Streams、Amazon Kinesis Data Firehoseから送信されてくるデータをSQLを使って処理できるサービス

今回はこのうちのAmazon Kinesis Data Streamsについて詳しく説明します。

2 Amazon Kinesis Data Streamsとは

最近、IOTが発達してセンサを通じて大量のデータを収集することができるようになってきました。この収集されたデータは一度保存され、解析してその後のマーケティング等に活かされますが、この収集したデータを保存する部分は一見簡単そうに見えますが、送られてくるデータが膨大なのでそれらを制御するためにはハードウェアの性能面で考えることが多く、実は非常に難しいらしいです。
これを解決してくれるのがAmazon Kinesis Data Streamsです。
Amazon Kinesis Data Streamsはセンサ等のコンピュータかから送られてくるデータを別のサービスまで届けるためのサービスです。

下の図はAmazon Kinesis Data Streamsの処理の流れを記載したものです。
image.png

  • プロデューサ
    Amazon Kinesis Data Streamsにデータを送信するもの。 センサやPC,スマホ等が該当すします。
  • Amazon Kinesis Data Streams
    プロデューサからデータを受け取って管理し、コンシューマへ受け渡すもの。 シャードと呼ばれるもので構成されます。
  • コンシューマ
    Amazon Kinesis Data Streamsへデータを受け取るリクエストを送信してデータを取得し、処理を行う。

Amazon Kinesis Data Streamsに送信されたデータはすべてパーティションキーによってシャードに割り当てられ、シーケンス番号が振られます。そしてコンシューマからのリクエストに応じてシャードに入っているデータをコンシューマに送信します。(パーティションキーとかシャードとかイメージがつかない場合、下記にあるCLIコードを見ること!)

この役割を見るとキューと同じような働きをしていることが分かります。
確かにこの役割ならキューっぽいなと思いましたが、キューとは少し違う点があります。(AWSのキューサービスであるSQSとの比較になります)

  • データの削除
    キューからデータを取り出した際、処理が完了すれば取り出したデータは削除し、キューの先頭は別のデータになりますが、Amazon Kinesis Data Streamsでは一度取り出したデータも削除せずに保持し続けます。保持する期間は設定で変えられますがデフォルト24時間、最大7日です。 コンシューマがデータを取得する際に取得する範囲を指定して最新のデータを取得します。データを一時的に保管するキャッシュのような働きをしていることになります。 つまり、複数のアプリケーションで同じデータを参照したり時間がたってから再度同じデータを参照するということが可能になります。
  • 拡張性 Amazon Kinesis Data Streamsでは送信されてくるデータの量に応じてシャードをauto Scallingすることが可能です。 ただし、シャードの数は一度に2倍にまでしか増やせません。 現在1つで4つに増やしたいなら1⇒2⇒4と段階を踏む必要があります。

他にも色々差はありますが、命令などの単純なメッセージを大量に、簡単に処理したいならSQS、センサなどのコンピュータから取得したデータを変換して高速に処理したいならAmazon Kinesis Data Streamsという使い分けになるかと思います。




主要な概念(https://aws.amazon.com/jp/kinesis/data-streams/faqs/)

Q: シャードとは何ですか?

シャードとは、Amazon Kinesis データストリームの基本的なスループットの単位です。1 シャードは、1 MB/秒のデータ入力と 2 MB/秒のデータ出力の能力を提供します。1 つのシャードは 1 秒当たり最大 1,000 件の PUT レコードをサポートできます。データストリームを作成するときに、必要なシャードの数を指定します。例えば、2 シャードのデータストリームを作成できます。このデータストリームは、データ入力のスループットが 2 MB/秒、データ出力のスループットが 4 MB/秒で、1 秒間に最大 2,000 件の PUT レコードに対応できます。Amazon Kinesis Data Streams でのシャードレベルのメトリクスをモニタリングし、データストリームのリシャーディングによって、データストリームに対してデータスループットの変化に応じたシャードの追加や削除を動的に実行できます。

Q: レコードとは何ですか?

レコードとは、Amazon Kinesis データストリームに保存されるデータの単位です。レコードは、シーケンス番号、パーティションキー、データ BLOB で構成されています。データ BLOB はデータプロデューサーがデータストリームに追加する、処理対象のデータです。データ BLOB (Base64 エンコーディング前のデータペイロード) の最大サイズは、1 メガバイト (MB) です。

Q: パーティションキーとは何ですか?

パーティションキーは、レコードを分離してデータストリームの異なるシャードにルーティングするために使用されます。パーティションキーは、データを Amazon Kinesis データストリームに追加するときにデータプロデューサーによって指定されます。例えば、2 つのシャード (シャード 1 とシャード 2) で構成されるデータストリームがあるものとします。2 つのパーティションキー (キー A とキー B) を使用し、キー A のレコードはすべてシャード 1 に追加され、キー B のレコードはすべてシャード 2 に追加されるように、データプロデューサーを設定できます。

Q: シーケンス番号とは何ですか?

シーケンス番号とは、各レコードの一意の識別子です。データプロデューサーが PutRecord または PutRecords オペレーションを呼び出してデータを Amazon Kinesis データストリームに追加すると、Amazon Kinesis によってシーケンス番号が割り当てられます。同じパーティションキーのシーケンス番号は一般的に、時間の経過とともに大きくなります。PutRecord または PutRecords リクエスト間の期間が長くなるほど、シーケンス番号は大きくなります。

3 CLIから動かしてみる

3.1 Amazon Kinesis Data Streams作成

今回はシャードを1つに指定して作成します。

aws kinesis create-stream --stream-name myKinesis --shard-count 1

作成が完了するまでに少し時間がかかります。
作成の進捗状況は下記のコマンドで確認できます。

aws kinesis describe-stream --stream-name myKinesis
{
    "StreamDescription": {
        "KeyId": null,
        "EncryptionType": "NONE",
        "StreamStatus": "CREATING",
        "StreamName": "myKinesis",
        "Shards": [],
        "StreamARN": "arn:aws:kinesis:ap-northeast-1:[AWSアカウント名]:stream/myKinesis",
        "EnhancedMonitoring": [
            {
                "ShardLevelMetrics": []
            }
        ],
        "StreamCreationTimestamp": 1535294220.0,
        "RetentionPeriodHours": 24
    }
}
StreamStatus": "CREATING

となっているところから作成中であることが分かります。これがACTIVEになれば使用可能です。

3.2 シャードにデータを送信

プロデューサの役割の部分をCLIから実行します。

aws kinesis put-record --stream-name myKinesis --partition-key 1 --data test1
aws kinesis put-record --stream-name myKinesis --partition-key 1 --data test2
aws kinesis put-record --stream-name myKinesis --partition-key 1 --data test3
aws kinesis put-record --stream-name myKinesis --partition-key 1 --data test4
aws kinesis put-record --stream-name myKinesis --partition-key 1 --data test5
aws kinesis put-record --stream-name myKinesis --partition-key 1 --data test6

1つでは面白くないので6つデータを送信しました。
今回はシャードは1つしかないのでパーティションIDは全て固定にしています。

3.3 シャードからデータを取得

Amazon Kinesis Data Streamsからデータを取り出すにはまずシャードからイテレータを取得し、そのイテレータを使用してデータを取得します。
まず、イテレータを取得するには下記のコマンドをたたきます。

コマンド.
aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name myKinesis
結果.
{
    "ShardIterator": "AAAAAAAAAAEzWKGdMB5IsMccYVqjI8Gc3Uscb8QEr16kvDkQLwP0ie9tXvxzFap+/RHYReSWMirhhkF5uAlGZB7y11zGYeXvfDM5J6xtUCO1KN//6byM9358Swjc7GgajNLTUeogmYvfc2Kv5tr8nMkH7jYSNX/72YeqPzIbg/paHrgJ4s0pq384c4Sm5pZ1CbqN1L+/cNR2DvNwPKoGfMUKXWvq5uTN"
}
  • shard-id
    3.1でシャードを作成した際に表示されるものです。
  • shard-iterator-type
    イテレータの取得方法(=取得するデータの選択)です。 今回はTRIM_HORIZONを指定しているので最も古いデータ、つまり最初に送信したデータを取得できるハズです。
shard-iterator-typeデータの選択方法
AT_SEQUENCE_NUMBERあるシーケンス番号
AFTER_SEQUENCE_NUMBERあるシーケンス番号の後
TRIM_HORIZON最も古いレコード
LATEST最も新しいレコード
AT_TIMESTAMP指定したタイムスタンプ

次に、取得したイテレータを使用してデータを取得します。
取得には先ほど取得したイテレータを指定します。

コマンド.
aws kinesis get-records --shard-iterator AAAAAAAAAAEzWKGdMB5IsMccYVqjI8Gc3Uscb8QEr16kvDkQLwP0ie9tXvxzFap+/RHYReSWMirhhkF5uAlGZB7y11zGYeXvfDM5J6xtUCO1KN//6byM9358Swjc7GgajNLTUeogmYvfc2Kv5tr8nMkH7jYSNX/72YeqPzIbg/paHrgJ4s0pq384c4Sm5pZ1CbqN1L+/cNR2DvNwPKoGfMUKXWvq5uTN
結果.
{
    "Records": [
        {
            "Data": "dGVzdDE=",
            "PartitionKey": "1",
            "ApproximateArrivalTimestamp": 1535294570.285,
            "SequenceNumber": "49587668589917956789172803442841175182217281049410404354"
        },
        {
            "Data": "dGVzdDI=",
            "PartitionKey": "1",
            "ApproximateArrivalTimestamp": 1535294570.609,
            "SequenceNumber": "49587668589917956789172803442842384108036895747304587266"
        },
        {
            "Data": "dGVzdDM=",
            "PartitionKey": "1",
            "ApproximateArrivalTimestamp": 1535294570.946,
            "SequenceNumber": "49587668589917956789172803442843593033856510376479293442"
        },
        {
            "Data": "dGVzdDQ=",
            "PartitionKey": "1",
            "ApproximateArrivalTimestamp": 1535294571.296,
            "SequenceNumber": "49587668589917956789172803442844801959676125005653999618"
        },
        {
            "Data": "dGVzdDU=",
            "PartitionKey": "1",
            "ApproximateArrivalTimestamp": 1535294571.634,
            "SequenceNumber": "49587668589917956789172803442846010885495739703548182530"
        },
        {
            "Data": "dGVzdDY=",
            "PartitionKey": "1",
            "ApproximateArrivalTimestamp": 1535294571.974,
            "SequenceNumber": "49587668589917956789172803442847219811315354332722888706"
        }
    ],
    "NextShardIterator": "AAAAAAAAAAEnpgqTc/Nrj4KalazIdRKsGWSm2TXpU5VF3oDdWYhGOUruuUAciH7Tp8blZaDo5IR2DcqnJJLg8ZK2lRvjpQYUs3w6NHp1JLUXYW1r23NmA302g9bs24uB/NVZiG1cn4gby4F4VAZF/R8S2TzpBQ1b8yM48MyQ1ciB0wv+cTr9WW/E1KIyYCnLEoCjVG5xvrqIapK+rtRceeaIBaohci6b",
    "MillisBehindLatest": 0
}

Dataの部分が3.2で登録したtest1~test6です。
別の文字列が表示されているのはこの文字データがBase64でエンコードされているのにCLIではBase64でのデコードに対応していなくて文字化けしているからです。
https://www.base64decode.org/
でデコードしてみると上からtest1~test6が取得できていることが書くんできました。
イテレータの取得部分の他のshard-iterator-typeも試してみます。

3.3.1 AT_SEQUENCE_NUMBER

shard-iterator-typeでAT_SEQUENCE_NUMBERを選択した場合、
--starting-sequence-numberでシーケンス番号を指定する必要があります。今回は上記のコマンドで確認したtest6のシーケンス番号を使用しています。

aws kinesis get-shard-iterator \
> --shard-id shardId-000000000000 \
> --shard-iterator-type AT_SEQUENCE_NUMBER \
> --starting-sequence-number 49587668589917956789172803442847219811315354332722888706 \
> --stream-name myKinesis
aws kinesis get-records --shard-iterator AAAAAAAAAAFDecRqc9xwLrMgT3p5w6BYtQbgnZ9DT+6pbDPS5h+PC/rYsDnh18IgYsp1P1B5/7wSth2aNAYdlxzFTk/dOksSLqzDguhwQRReXSbx84HzenWEEtSVMbxhZhyQTBkWHiVDHI0cImB68VpfvD/t8XWB19eGTX/rhMEA95WPo9lYvfgDbtwEqvVo7C/BUjPrKOwdSvN7413zek3berarZGB/
{
    "Records": [
        {
            "Data": "dGVzdDY=",
            "PartitionKey": "1",
            "ApproximateArrivalTimestamp": 1535294571.974,
            "SequenceNumber": "49587668589917956789172803442847219811315354332722888706"
        }
    ],
    "NextShardIterator": "AAAAAAAAAAFdLvQy3u8K8lNN48+Tm98iigVQw+zryw3qiWsXNgYxYNc2pCkiUUL82FikKaVPXJu7VCfWRhIE9LSGq9IWgez/Y1fuNsE3n1PklGKLPviU2TVDQKNX0yd719mcxn1C3/HdHcwltKRVU5Vq+e/UhG/Jc/JKbgAGJX7PXOdQaVs/0vOtAFs7nonF66wGtzj0WSecKAIj04Gs94VCC+KZlOso",
    "MillisBehindLatest": 0
}

取得できた「dGVzdDY=」をデコードするとtest6でしたのでOKです。

3.3.2 AFTER_SEQUENCE_NUMBER

指定したシーケンス番号より後ろのデータを取得します。
今回はtest5のシーケンスを指定しました。

aws kinesis get-shard-iterator \
> --shard-id shardId-000000000000 \
> --shard-iterator-type AFTER_SEQUENCE_NUMBER \
> --starting-sequence-number 49587668589917956789172803442846010885495739703548182530 \
> --stream-name myKinesis
{
    "ShardIterator": "AAAAAAAAAAFT6p+3tJmPenVZGq7zSiMyDgLxKaaaVlKIirbJACeWyZEFMsGCgp0rZ4Bd0JJPg0Tq3vaFEgHKuco4hwD8ceEnIzmav+zD7j2apORI4SnrP/hMk4rcdJqBGeUI5NTvArWj4RZZdBq7Rim/InT1JFoHuJEj3hz/Q648yf0V9U/IhGFY0JeE7/q8/NbhWa/9VoawMzpJL1oifdqxs3u/9HFJ"
}

test6の1件のみ取得できました。どうやら「<=」ではなくて「<」のようですね。
試しにtest4のシーケンス番号で再取得したところ、test5とtest6が取得できました。

3.3.3 LATEST

aws kinesis get-shard-iterator \
> --shard-id shardId-000000000000 \
> --shard-iterator-type LATEST  \
> --stream-name myKinesis
{
    "ShardIterator": "AAAAAAAAAAGZ0WyWxGIfp5nt7S/u8/QMtMGUB572kQKQ0Z05DgmntfXxDBOzkEtx21Hwzon8hkeUskKzQc+VXB20sr6HptV7PoL42b24uPYvO7VLtNFN7PxQvkC4yDZ8MyIrtqZPvIb2J+c7x7kNQfDoOoZ+zWzqdF0RKqMFUv6XqKNsIu9dkgw8gVQkZds8i1TmnnSVogSsSHOKFF71RL+/mHG79jzW"
}
ws kinesis get-records --shard-iterator AAAAAAAAAAGZ0WyWxGIfp5nt7S/u8/QMtMGUB572kQKQ0Z05DgmntfXxDBOzkEtx21Hwzon8hkeUskKzQc+VXB20sr6HptV7PoL42b24uPYvO7VLtNFN7PxQvkC4yDZ8MyIrtqZPvIb2J+c7x7kNQfDoOoZ+zWzqdF0RKqMFUv6XqKNsIu9dkgw8gVQkZds8i1TmnnSVogSsSHOKFF71RL+/mHG79jzW
{
    "Records": [],
    "NextShardIterator": "AAAAAAAAAAFpTCYWQp1BHiAn31vL7VRa961mOzKcxrOWwRohkk1qWwxRwS31rJMOsrecMbNSDt+jUl4x/7rwknia0puoQN+1OlN/TP2zRLy9MxVYKwdEVvfa3AefvacSwJRIdUw55ThPjDbYHXcv5eoFrLYXuF8Tx7P/UXeCoxPgHnrjNKFm85yBqVRzEtQ/0oyXpozBtRle5ZRt0vugI4FaHEKmql8F",
    "MillisBehindLatest": 0
}

・・・最も新しいデータを取得するのでtest6が返ってくると思っていましたが、空のレコードが返されました。
どうやらこれはイテレータ作成後に登録されたデータを取得できるようです。

再度test7のデータを登録してすぐに取得してみます。

aws kinesis put-record --stream-name myKinesis --partition-key 1 --data test
{
"ShardId": "shardId-000000000000",
"SequenceNumber": "49587668589917956789172803446554995300073657925510365186"
}
[ec2-user@ip-172-31-46-179 ~]$ aws kinesis get-records --shard-iterator AAAAAAAAAAHCuFdM4uajLL1mQfq3Q6rzzthfjeRIqZPCS8uGUGtQmG7XHPBonkLd84y/NH4qNkxNw4CmNr++atrAtNuizv/R0Og0vzNb6vFMFY4BZxVMYJMe+f/uDucPVgoc+Fe7949Zrdoz5UhPFcgivmhGTYHOAMyVQv53ZW6C1veudzrOmwZLWOiZJeMexLjLY3ZmG+LhBlM5jeN8cFTTUb
{
"Records": [
{
"Data": "dGVzdDc=",
"PartitionKey": "1",
"ApproximateArrivalTimestamp": 1535298005.259,
"SequenceNumber": "49587668589917956789172803446554995300073657925510365186"
}
],
"NextShardIterator": "AAAAAAAAAAFrcL8UAgxv0zr7GU3HTlmPJN7ct9yjiALTZUbTn9AwJbskyLw2AVODFjw+BhMcsapClDEx05JFTiAmpdyjwF6kwrtSGyUoEsTg7bpqBz8FVPF6qsfjAp9p9rrim8qSFbsg3M0wg14PPdZUw6+dIcZwS+sWnIiJeD73qc66o0X+DIJcXZWQFp4g+PLrHsGJYhXFHezTk9FnpAXwAT",
"MillisBehindLatest": 0
}

次に一番新しいputデータのみ取得なのかイテレータ作成後のputしたすべてのデータなのかを確かめるために再度test8,test9を登録して取得します。

aws kinesis put-record --stream-name myKinesis --partition-key 1 --data test8
{
    "ShardId": "shardId-000000000000",
    "SequenceNumber": "49587668589917956789172803446556204225893276128097861634"
}
[ec2-user@ip-172-31-46-179 ~]$ aws kinesis put-record --stream-name myKinesis --partition-key 1 --data test9
{
    "ShardId": "shardId-000000000000",
    "SequenceNumber": "49587668589917956789172803446557413151712890757272567810"
}
[ec2-user@ip-172-31-46-179 ~]$ aws kinesis get-records --shard-iterator AAAAAAAAAAHCuFdM4uajLL1mQfq3Q6rzzthfjeRIqZPCS8uGUGtQmG7XHPBonkLd84y/NH4qNkxNw4CmNr++atrAtNuizv/R0Og0vzNb6vFMFY4BZxVMYJMe+f/uDucPVgoc+Fe7949Zrdoz5UhPFcgivmhGTYHOAMyVQv53ZW6C1veudzrOmwZLWOiZJeMexLjLY3ZmG+LhBlM5jeN8cFTTUb
{
    "Records": [
        {
            "Data": "dGVzdDc=",
            "PartitionKey": "1",
            "ApproximateArrivalTimestamp": 1535298005.259,
            "SequenceNumber": "49587668589917956789172803446554995300073657925510365186"
        },
        {
            "Data": "dGVzdDg=",
            "PartitionKey": "1",
            "ApproximateArrivalTimestamp": 1535298057.158,
            "SequenceNumber": "49587668589917956789172803446556204225893276128097861634"
        },
        {
            "Data": "dGVzdDk=",
            "PartitionKey": "1",
            "ApproximateArrivalTimestamp": 1535298057.511,
            "SequenceNumber": "49587668589917956789172803446557413151712890757272567810"
        }
    ],
    "NextShardIterator": "AAAAAAAAAAFUdQEYAJlATCR29tR8KwFoR+g2E1Ob76fC9BYpvDTG36jTznu43K+Ih2gUGkCPqgXewV6l5pVusIqx66FmLdnodiD7Z9ApPWLIoKbR60tYDLAY89NIFS6wGR5+uBbXnWDSjPiPujW/2BSkHin6ejJ3349/jSIS8vIJnVec+rOlDom2Ju3t8oEbNq/kCmCNg8uJs720GZjDA1emHs",
    "MillisBehindLatest": 0
}

test7,test8,test9の3つが取得できたことからイテレータ作成後登録されたすべてのデータを取得するようです。

3.3.4 AT_TIMESTAMP

最後にタイムスタンプを指定して取得する方法も検証します。
```
aws kinesis get-shard-iterator \

--shard-id shardId-000000000000 \
--shard-iterator-type AT_TIMESTAMP \
--timestamp 1535298057.000 \
--stream-name myKinesis
{
"ShardIterator": "AAAAAAAAAAFyDCXfppx0RgwhyjDQmpqDfac1AkAxTzp/6hO6MrUic+qVY40YqkPfB22UFxWzI1TkdIrHSDlTEEuvHtMsrFtncVMp38yC6o9DsjA8PSPzFvOtdtet1aU+qBLigMDTghhs3pV5n1gYk+mGcRz0I+h07rDGu6fniS9X6azY8jiU1f/XXihlMYXcTUn4wTT3Jny8onJVXRDcufCpfQK9czmV4bFvhfXBrdn9/qv6iWv6Eg=="
}

aws kinesis get-records --shard-iterator AAAAAAAAAAFyDCXfppx0RgwhyjDQmpqDfac1AkAxTzp/6hO6MrUic+qVY40YqkPfB22UFxWzI1TkdIrHSDlTEEuvHtMsrFtncVMp38yC6o9DsjA8PSPzFvOtdtet1aU+qBLigMDTghhs3pV5n1gYk+mGcRz0I+h07rDGu6fniS9X6azY8jiU1f/XXihlMYXcTUn4wTT3Jny8onJVXRDcufCpfQK9czmV4bFvhfXBrdn9/qv6iWv6Eg==
{
"Records": [
{
"Data": "dGVzdDg=",
"PartitionKey": "1",
"ApproximateArrivalTimestamp": 1535298057.158,
"SequenceNumber": "49587668589917956789172803446556204225893276128097861634"
},
{
"Data": "dGVzdDk=",
"PartitionKey": "1",
"ApproximateArrivalTimestamp": 1535298057.511,
"SequenceNumber": "49587668589917956789172803446557413151712890757272567810"
},
{
"Data": "dGVzdDEw",
"PartitionKey": "1",
"ApproximateArrivalTimestamp": 1535298159.77,
"SequenceNumber": "49587668589917956789172803446558622077532512395833901058"
}
],
"NextShardIterator": "AAAAAAAAAAHA3J8ih9fTSCchEetKnzqKeTXCK9b4RwE9zKMujqZMlGTvK7hps5FftFYQU+X07UEZkFkyGECq4h57BTGkr1gyuH38aCKJ/DnWkjtc3lkUQ65HD6Yt/bKhDmZ14J741Tw/Zz+p7bba+5oT3QS2REJy1chlKaJD4p/sHw0SfqOzc/rg3oj7sKjR121WNSAt0LR778S1h+mE+5vnV/igqZ7a",
"MillisBehindLatest": 0
}
```

タイムスタンプをtest8とtest9の間に指定しました。
結果としてtest8,9,10が取得できているので想定通りです。
ちなみに、test8のタイムスタンプ1535298057.158に対して1535298057.159でイテレータを取得したところ、test8も取得できてしまいました。
1535298057.168でも1535298057.268でも1535298057.500でもtest8は取得できてしまいました。
閾値がどこにあるのかはわかりませんが、少しくらいなら指定したタイムスタンプより前でも取得できてしまうようです。

4 ベストプラクティス

最後に、Amazon Kinesis Data Streamsを使用する上でのベストプラクティスを紹介します。

4.1 パーティションキー戦略

Amazon Kinesis Data Streamsでは複数のシャードを用意して処理を分散することができますが、
どのシャードを使用するのか決定するのはプロデューサが発行するパーティションキーです。
上記のCLIの例ではシャードに送信するデータのパーティションキーはすべて固定で1にしていました。
もしシャードを複数用意したとしてもこのような設定をしてしまうと結局すべてのデータが1つのシャードに集中してしまうことになります。
実際は固定値で設定せずにきちんとばらけるように設定する必要がありますが、分け方には2つの考え方があります。

ホットシャード回避のために広範囲にパーティションキーを確保する

パーティションキーに偏りがあり、データが集中してしまったシャードをホットシャードといいます。
このホットシャードの発生を防ぐためにパーティションキーを広範囲の値を持つようにする、タイムスタンプなどを使ってランダムなキーを生成するという工夫をする考え方

データに対応したシャードを設定

例えばこのセンサから取得したデータは全てこのシャードに送信するようにするなど、データをより有意義に使用できるようにパーティションキーを設定する方法。そのシャードに入っているデータがどこから取得したのかわかるのでその後のデータ分析がやりやすくなったりする。

4.2 適切なシャード数の設定

ストリームが処理できるデータ容量はシャードの数に比例します。なのでプロディーサが送信してくるデータの量やコンシューマが読み取るデータの量を考慮してシャード数を設定する必要があります。
シャードの1つ当たりのスペックは以下のように設定されています。

  • 書き込み(プロデューサからデータを受け取る) 1MB/秒
  • 読み込み(コンシューマがデータを読み取る) 2MB/秒

つまり、プロデューサが1秒間に10MBのデータを送ってくる場合はシャードは最低でも10個は用意しておく必要があります。
また、コンシューマが1秒に10MBのデータを読み込むならシャードは最低5個は用意する必要があるわけです。
予期せぬデータ量の増加に備えてシャードは余分に用意しておくことが推奨されています。

4.3 重複データの削除

プロデューサからストリームへのデータ送信は成功したけどストリームからプロデューサへの成功通知が送信できなかったりするとプロデューサは同じデータを再送しようとするのでストリームに同じデータが登録される可能性があります。
これを防ぐためにデータにプライマリキーを持たせてコンシューマで重複を削除するという手法が推奨されています。

4.4 プロデューサ・コンシューマの起動順序

ベストプラクティスからは少し離れてしまいますが、コンシューマがイテレータを取得する際の設定項目であるデータの取得範囲(shard-iterator-type)がLATESTである場合はコンシューマアプリケーションを起動してからプロデューサアプリケーションを起動する必要があります。
逆に。shard-iterator-typeがTRIM_HORIZONならば先にプロデューサを起動します。


事例からAmazon Kinesisとは何なのかを学ぶ



この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

はじめに

以前、Amazon Kinesisを使う案件があり技術調査の手伝いをしたことがありました。AWSのサイトには「大規模なストリーミングデータをリアルタイムで処理する完全マネージド型サービス」とありますが、その時は何のためのサービスなのか触ってもいまいちピンときませんでした。その後、分からなかった点を担当者にいろいろ聞いてみたのですが、案件が事例化されましたので教えてもらったことをブログに書いてみようと思います。株式会社あきんどスシロー様の案件です。以下が構成図になります。

「うまいすしを、腹一杯。うまいすしで、心も一杯。」

kinesushi

処理の流れ

事例の構成図からどのような処理が行われているかを担当者に確認しました。

  1. 回転寿司のすし皿に付いているICタグの情報を管理端末が読み取ります。
  2. 管理端末が読み取った情報はインターネットを経由してAmazon KinesisへPutされます。Putされるデータは一日当たり平均で数百万件だそうです。
  3. 次はKinesis-appという箇所です。ここはEC2になります。管理端末から送信されたデータをAWS CLIを使ってGetします。GetしたデータをCSVに出力しS3にアップロードします。
  4. S3からRedshiftにCOPYコマンドでデータをロードしています。3と4の処理はcronで定期的に行われます。
  5. Redshiftにロードされた最新のデータをBIツールのTableauで可視化して分析します。

疑問点を聞いてみる

処理の流れを確認したところでいくつか疑問が湧いたので確認してみました。

一度、Getされたデータはどうなるのか?

KinesisにPutされたデータはその後24時間はGetすることができます。24時間以内であれば何度でも取得できます。データ取得時に何も条件を指定しない場合は24時間以内のデータすべてを取得しますが、取得したデータにはiteratorというシーケンス番号が振られているので最後に取得したデータのiteratorを条件にそれ以降のデータを取得することができます。AWS CLIの場合は以下のようなコマンドになります。

1
aws kinesis get-records --shard-iterator <value>

Amazon Kinesisを導入しない場合どうなってしまうのか?

EC2でアプリケーションサーバ立てて管理端末からのリクエストを直接処理するのと何が違うのか聞いてみたところ、大量のリクエストの受け入れ口としてあるとないとではコストが全然違うからだそうです。Kinesis並みの処理をEC2でするためには相当な台数が必要になりコストが上がってしまうということですね。

Amazon SQSではダメなんでしょうか?

話を聞いていたらAmazon SQSみたいなものかな、と思ったのでSQSではなくKinesisを採用した理由を確認したところ以下のような理由でKinesisを採用しているそうです。

  • SQSにはKinesisほどのスケーラビリティが備わっていない。
  • SQSはデータにシーケンス番号は振られておらず、またデータが重複する可能性がある。
  • Kinesisの方が低コスト。

調べてみると他にも以下のような点が違うようです。

  • SQSはリクエストで送れるデータが256バイトのテキストデータのみ。Kinesisは50キロバイトのBLOB。

以下のページにも詳しく書かれていました。
Amazon Kinesis のよくある質問

まとめ

最初は「ストリーミングデータをリアルタイムで処理」という言葉から難しそうなイメージがあったのですが、色々聞いてみて キューのような使い方をするサービス、という印象をうけました。担当者と話していて「コストが安い。」という言葉を何回も口にしていたのでとてもリーズナブルなサービスのようです。私もぜひ案件で使ってみたいと思いました。今回は触れませんでしたがKinesisにはStreamShard(シャード)Partition Keyなどの用語があります。こちらに関しては以下の記事が分かりやすいのでお勧めです。興味のある方はぜひご覧ください。

+ Recent posts