読者です 読者をやめる 読者になる 読者になる

Gunosyデータ分析ブログ

Gunosyで働くデータエンジニアが知見を共有するブログです。

Amazon Kinesis AnalyticsとES/Kibana4でリアルタイムダッシュボード構築

こんにちは。開発・運用推進部の小出です。 猫にイヤホンを噛み切られること数回、最近のBGMはもっぱら環境音です。 「耳からうどんが出ているようにしか見えない」という噂のBluetoothイヤホンが気になっています。

今回は、Amazon Kinesis AnalyticsとElasticsearch/Kibana4を利用したリアルタイムダッシュボード構築についてです。

Amazon Kinesis Analytics とは

Amazon Kinesis Analytics(ストリーミングデータに対して標準 SQL クエリを最も簡単に実行)| AWS https://d0.awsstatic.com/amazonkinesis/KinesisAnalytics_LP_Digital%20Marketing.png Amazon Kinesis Analyticsは、ストリームデータをSQLで分析できるサービスです。 SourceとしてKinesis Streams/Firehose、クエリ結果のDestinationとしてKinesis Streams/Firehoseを選択できます。

さらにKinesis Firehoseは、Amazon Elasticsearch Serviceに出力でき(便利!)、Amazon Elasticsearch ServiceはKibana4も付いてくる(お得!)ので、今回はこれを利用してダッシュボードを構築してみます。

ダッシュボードを構築してみる

Gunosyでは、ログコレクタとしてfluentdを採用しているので、そこからKinesis Firehoseへログを送ります。

GitHub - awslabs/aws-fluent-plugin-kinesis: Fluent Plugin for Amazon Kinesis

f:id:y-koid:20161108104241p:plain

Source StreamとMapping

下記のようなJSON形式のログが送られているとします。

{"article_id": 12345, "user_id": 12345, "action": "click"}
{"article_id": 12345, "user_id": 12345, "action": "impression"}

Kinesis Analyticsのマネジメントコンソールにて、SourceとなるKinesis Firehoseを選択すると、 マッピングを指定することができます。

JSONの場合、下記のようにRaw Pathを指定し、カラム名とJSONのキーを対応させます。

f:id:y-koid:20161108110639p:plain

Query

Kinesis Analyticsでは、Window関数としてTumbling WindowSliding Windowが用意されています。今回はTumbling Windowを利用し、記事IDをベースに1分毎に集計を行います。

/* Destination Stream */
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
  TIME TIMESTAMP, ARTICLE_ID INTEGER, IMPRESSION INTEGER, CLICK INTEGER
);

/* Tumbling WindowでのGROUP BY (1分毎) */
CREATE OR REPLACE PUMP "STREAM_PUMP" AS
INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM
  ROWTIME AS TIME,
  ARTICLE_ID,
  SUM(CASE ACTION WHEN 'impression' THEN 1 ELSE 0 END) AS IMPRESSION,
  SUM(CASE ACTION WHEN 'click' THEN 1 ELSE 0 END) AS CLICK
FROM "SOURCE_SQL_STREAM_001"
GROUP BY
  ARTICLE_ID,
  FLOOR("SOURCE_SQL_STREAM_001".ROWTIME TO MINUTE);

クエリの結果は、下記のイメージのようになります。

TIME ARTICLE_ID IMPRESSION CLICK
2016-11-09 12:01:00.0 12345 5 0
2016-11-09 12:01:00.0 12346 10 2
2016-11-09 12:01:00.0 12347 8 1
2016-11-09 12:02:00.0 12345 7 2
2016-11-09 12:02:00.0 12346 10 3
2016-11-09 12:02:00.0 12347 5 0

Destination

クエリの結果を、Kinesis Firehose(&Elasticsearch Service)に向けると、あっという間にKibana4でダッシュボードができます。

例えば、Visualize -> Data Tableで、記事ID別のimpression/clickの表が作れます。

f:id:y-koid:20161110195505p:plain

ログデータを拡充する

Amazon Kinesis Analyticsでは、Reference Data Sourceとして、 S3に配置した静的ファイル(JSONまたはCSV)を参照することができます。

f:id:y-koid:20161108104308p:plain

以前のエントリにもありますが、Gunosyではユーザの性別・年齢といったデモグラフィック情報や、 興味・関心のある領域などを推定することで、記事・広告配信のアルゴリズムに活用しています。 今回は、これらのデータをCSVとしてS3に配置し、ログデータの拡充を行ってみます。

data.gunosy.io

Reference DataとMapping

下記のようなCSV(user_id, gender, segment_id)を用意し、S3に配置します。

101,female,1
102,male,2
103,female,3
...

先程のSource Streamの場合と異なり、今日現在のところ、Reference DataのMappingはマネジメントコンソールで行えず、CLIによる操作のみとなっています。 (もちろん、Source StreamのMappingをCLIで操作することは可能です)

$ aws kinesisanalytics add-application-reference-data-source \
  --application-name <my-application-name> \
  --current-application-version-id <version-id> \
  --reference-data-source '{
  "TableName": "REFERENCE_DATA_SOURCE",
  "S3ReferenceDataSource": {
    "BucketARN": "arn:aws:s3:::<my-bucket-name>",
    "FileKey": "mydata.csv",
    "ReferenceRoleARN": "arn:aws:iam::<account-id>:role/..."
  },
  "ReferenceSchema": {
    "RecordFormat": {
      "RecordFormatType": "CSV",
      "MappingParameters": {
        "CSVMappingParameters": {"RecordRowDelimiter": ",", "RecordColumnDelimiter": "\n"}
      }
    },
    "RecordEncoding": "UTF-8",
    "RecordColumns": [
      {"Name": "USER_ID", "Mapping": "0", "SqlType": "INTEGER"},
      {"Name": "GENDER",  "Mapping": "1", "SqlType": "VARCHAR(32)"},
      {"Name": "SEGMENT_ID", "Mapping": "2", "SqlType": "INTEGER"}
    ]
  }
}'

Query

上で設定したReference Dataは、"TableName"で指定した名前で呼び出すことができます。(今回の場合は"REFERENCE_DATA_SOURCE")

Souce StreamにユーザデータをJOINしたTemporary Streamを作成し、このTemporary Streamに対しTumbling WindowでのGROUP BYを行います

/* Destination Stream */
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
  TIME TIMESTAMP, GENDER VARCHAR(32), SEGMENT_ID INTEGER, ARTICLE_ID INTEGER, 
  IMPRESSION INTEGER, CLICK INTEGER
);

/* Temporary Stream */
CREATE OR REPLACE STREAM "TMP_SQL_STREAM" (
  GENDER VARCHAR(32), SEGMENT_ID INTEGER, ARTICLE_ID INTEGER
);

/* Tumbling WindowでのGROUP BY (1分毎) */
CREATE OR REPLACE PUMP "STREAM_PUMP" AS
INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM
  ROW_TIME AS TIME,
  GENDER, SEGMENT_ID, ARTICLE_ID,
  SUM(CASE ACTION WHEN 'impression' THEN 1 ELSE 0 END) AS IMPRESSION,
  SUM(CASE ACTION WHEN 'click' THEN 1 ELSE 0 END) AS CLICK
FROM "TMP_SQL_STREAM"
GROUP BY
  GENDER, SEGMENT_ID, ARTICLE_ID,
  FLOOR("TMP_SQL_STREAM".ROWTIME TO MINUTE);

/* REFERENCE DATAをJOINし、Temporary Streamへ */
CREATE OR REPLACE PUMP "TMP_PUMP" AS
INSERT INTO "TMP_SQL_STREAM"
SELECT STREAM
  R.GENDER, R.SEGMENT_ID, S.ARTICLE_ID, S.ACTION
FROM      "SOURCE_SQL_STREAM_001" S
LEFT JOIN "REFERENCE_DATA_SOURCE" R
  ON S.USER_ID = R.USER_ID;

クエリの結果は、下記のイメージのようになります。

TIME GENDER SEGMENT_ID ARTICLE_ID IMPRESSION CLICK
2016-11-09 12:01:00.0 male 1 12345 2 0
2016-11-09 12:01:00.0 female 3 12345 3 0
2016-11-09 12:01:00.0 male 1 12346 4 0
2016-11-09 12:01:00.0 male 2 12346 4 0
2016-11-09 12:01:00.0 female 4 12346 2 2

Destination

先程同様にクエリの結果を、Kinesis Firehose(&Elasticsearch Service)に向けると、あっという間にKibana4でダッシュボードができます。

Pie Chartにしたり、FilterをかけてStacked Area Chartにすると良いかもしれません。

まとめ

このように、S3上の静的データを参照しログデータを拡充することで、生ログだけではできなかったセグメント別等のリアルタイムなモニタリングが可能になります。

おまけ:AmazonES&Kibana4のダッシュボード共有

Amazon ES&Kibana4のアクセス制限に独自認証を使いたい…!という場合に、認証を求めるプロキシとリクエストに署名をするプロキシを組み合わせる、等の方法があります

github.com github.com