こんにちは。開発・運用推進部の小出です。 猫にイヤホンを噛み切られること数回、最近のBGMはもっぱら環境音です。 「耳からうどんが出ているようにしか見えない」という噂のBluetoothイヤホンが気になっています。
今回は、Amazon Kinesis AnalyticsとElasticsearch/Kibana4を利用したリアルタイムダッシュボード構築についてです。
Amazon Kinesis Analytics とは
Amazon Kinesis Analytics(ストリーミングデータに対して標準 SQL クエリを最も簡単に実行)| AWS Amazon Kinesis Analyticsは、ストリームデータをSQLで分析できるサービスです。 SourceとしてKinesis Streams/Firehose、クエリ結果のDestinationとしてKinesis Streams/Firehoseを選択できます。
さらにKinesis Firehoseは、Amazon Elasticsearch Serviceに出力でき(便利!)、Amazon Elasticsearch ServiceはKibana4も付いてくる(お得!)ので、今回はこれを利用してダッシュボードを構築してみます。
ダッシュボードを構築してみる
Gunosyでは、ログコレクタとしてfluentdを採用しているので、そこからKinesis Firehoseへログを送ります。
GitHub - awslabs/aws-fluent-plugin-kinesis: Fluent Plugin for Amazon Kinesis
Source StreamとMapping
下記のようなJSON形式のログが送られているとします。
{"article_id": 12345, "user_id": 12345, "action": "click"} {"article_id": 12345, "user_id": 12345, "action": "impression"}
Kinesis Analyticsのマネジメントコンソールにて、SourceとなるKinesis Firehoseを選択すると、 マッピングを指定することができます。
JSONの場合、下記のようにRaw Pathを指定し、カラム名とJSONのキーを対応させます。
Query
Kinesis Analyticsでは、Window関数としてTumbling WindowとSliding Windowが用意されています。今回はTumbling Windowを利用し、記事IDをベースに1分毎に集計を行います。
/* Destination Stream */ CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( TIME TIMESTAMP, ARTICLE_ID INTEGER, IMPRESSION INTEGER, CLICK INTEGER ); /* Tumbling WindowでのGROUP BY (1分毎) */ CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM ROWTIME AS TIME, ARTICLE_ID, SUM(CASE ACTION WHEN 'impression' THEN 1 ELSE 0 END) AS IMPRESSION, SUM(CASE ACTION WHEN 'click' THEN 1 ELSE 0 END) AS CLICK FROM "SOURCE_SQL_STREAM_001" GROUP BY ARTICLE_ID, FLOOR("SOURCE_SQL_STREAM_001".ROWTIME TO MINUTE);
クエリの結果は、下記のイメージのようになります。
TIME | ARTICLE_ID | IMPRESSION | CLICK |
---|---|---|---|
2016-11-09 12:01:00.0 | 12345 | 5 | 0 |
2016-11-09 12:01:00.0 | 12346 | 10 | 2 |
2016-11-09 12:01:00.0 | 12347 | 8 | 1 |
2016-11-09 12:02:00.0 | 12345 | 7 | 2 |
2016-11-09 12:02:00.0 | 12346 | 10 | 3 |
2016-11-09 12:02:00.0 | 12347 | 5 | 0 |
… |
Destination
クエリの結果を、Kinesis Firehose(&Elasticsearch Service)に向けると、あっという間にKibana4でダッシュボードができます。
例えば、Visualize -> Data Tableで、記事ID別のimpression/clickの表が作れます。
ログデータを拡充する
Amazon Kinesis Analyticsでは、Reference Data Sourceとして、 S3に配置した静的ファイル(JSONまたはCSV)を参照することができます。
以前のエントリにもありますが、Gunosyではユーザの性別・年齢といったデモグラフィック情報や、 興味・関心のある領域などを推定することで、記事・広告配信のアルゴリズムに活用しています。 今回は、これらのデータをCSVとしてS3に配置し、ログデータの拡充を行ってみます。
Reference DataとMapping
下記のようなCSV(user_id, gender, segment_id)を用意し、S3に配置します。
101,female,1 102,male,2 103,female,3 ...
先程のSource Streamの場合と異なり、今日現在のところ、Reference DataのMappingはマネジメントコンソールで行えず、CLIによる操作のみとなっています。 (もちろん、Source StreamのMappingをCLIで操作することは可能です)
$ aws kinesisanalytics add-application-reference-data-source \ --application-name <my-application-name> \ --current-application-version-id <version-id> \ --reference-data-source '{ "TableName": "REFERENCE_DATA_SOURCE", "S3ReferenceDataSource": { "BucketARN": "arn:aws:s3:::<my-bucket-name>", "FileKey": "mydata.csv", "ReferenceRoleARN": "arn:aws:iam::<account-id>:role/..." }, "ReferenceSchema": { "RecordFormat": { "RecordFormatType": "CSV", "MappingParameters": { "CSVMappingParameters": {"RecordRowDelimiter": ",", "RecordColumnDelimiter": "\n"} } }, "RecordEncoding": "UTF-8", "RecordColumns": [ {"Name": "USER_ID", "Mapping": "0", "SqlType": "INTEGER"}, {"Name": "GENDER", "Mapping": "1", "SqlType": "VARCHAR(32)"}, {"Name": "SEGMENT_ID", "Mapping": "2", "SqlType": "INTEGER"} ] } }'
Query
上で設定したReference Dataは、"TableName"で指定した名前で呼び出すことができます。(今回の場合は"REFERENCE_DATA_SOURCE")
Souce StreamにユーザデータをJOINしたTemporary Streamを作成し、このTemporary Streamに対しTumbling WindowでのGROUP BYを行います
/* Destination Stream */ CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( TIME TIMESTAMP, GENDER VARCHAR(32), SEGMENT_ID INTEGER, ARTICLE_ID INTEGER, IMPRESSION INTEGER, CLICK INTEGER ); /* Temporary Stream */ CREATE OR REPLACE STREAM "TMP_SQL_STREAM" ( GENDER VARCHAR(32), SEGMENT_ID INTEGER, ARTICLE_ID INTEGER ); /* Tumbling WindowでのGROUP BY (1分毎) */ CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM ROW_TIME AS TIME, GENDER, SEGMENT_ID, ARTICLE_ID, SUM(CASE ACTION WHEN 'impression' THEN 1 ELSE 0 END) AS IMPRESSION, SUM(CASE ACTION WHEN 'click' THEN 1 ELSE 0 END) AS CLICK FROM "TMP_SQL_STREAM" GROUP BY GENDER, SEGMENT_ID, ARTICLE_ID, FLOOR("TMP_SQL_STREAM".ROWTIME TO MINUTE); /* REFERENCE DATAをJOINし、Temporary Streamへ */ CREATE OR REPLACE PUMP "TMP_PUMP" AS INSERT INTO "TMP_SQL_STREAM" SELECT STREAM R.GENDER, R.SEGMENT_ID, S.ARTICLE_ID, S.ACTION FROM "SOURCE_SQL_STREAM_001" S LEFT JOIN "REFERENCE_DATA_SOURCE" R ON S.USER_ID = R.USER_ID;
クエリの結果は、下記のイメージのようになります。
TIME | GENDER | SEGMENT_ID | ARTICLE_ID | IMPRESSION | CLICK |
---|---|---|---|---|---|
2016-11-09 12:01:00.0 | male | 1 | 12345 | 2 | 0 |
2016-11-09 12:01:00.0 | female | 3 | 12345 | 3 | 0 |
2016-11-09 12:01:00.0 | male | 1 | 12346 | 4 | 0 |
2016-11-09 12:01:00.0 | male | 2 | 12346 | 4 | 0 |
2016-11-09 12:01:00.0 | female | 4 | 12346 | 2 | 2 |
… |
Destination
先程同様にクエリの結果を、Kinesis Firehose(&Elasticsearch Service)に向けると、あっという間にKibana4でダッシュボードができます。
Pie Chartにしたり、FilterをかけてStacked Area Chartにすると良いかもしれません。
まとめ
このように、S3上の静的データを参照しログデータを拡充することで、生ログだけではできなかったセグメント別等のリアルタイムなモニタリングが可能になります。
おまけ:AmazonES&Kibana4のダッシュボード共有
Amazon ES&Kibana4のアクセス制限に独自認証を使いたい…!という場合に、認証を求めるプロキシとリクエストに署名をするプロキシを組み合わせる、等の方法があります