こんにちは、プラットフォーム開発部のmafuyukです。
普段はDMMで保持している会員情報やビッグデータなどから、ユーザーに対して効果的な訴求を行えるプッシュ通知基盤の開発をしています。
プッシュ通知の一部基盤ではAWSを利用しているのですがその中でログ収集、可視化のためにAmazon Elasticsearch Service(以後 ES)を利用しています。
今回はESが2017/06/01にElasticsearch version 5.3 対応したことによって便利な機能が使えるようになったので、その中からCuratorを利用したインデックスのローテートをLambdaから行う方法をご紹介します。
他に使えるようになった機能について気になった方はAWS公式サイトを参照ください。
Elasticsearchについて
Elasticsearchは全文検索エンジンで、データベースとしても機能します。
利用用途としてはKibanaでのログの可視化を行う際に、Elasticsearchにデータを溜め込みKibanaから検索を走らせるという用途が多いです。
以下のRDBMSとの対比表をみるとなんとなく構造が見えてくるかと思います。
RDBMSとの対比
Elasticsearch | RDBMS |
---|---|
ドキュメント | レコード |
フィールド | カラム |
インデックス | データベース |
タイプ | テーブル |
今回やること
Lambdaの関数内でCuratorのインデックス削除APIを呼び出してESの古いインデックスを削除します。
(インデックスを消す理由としてはデータが増えすぎた際の容量圧迫を回避するためです。ES自体にはローテートの仕組みがないため、別にローテートする仕組みを用意しないと容量圧迫が起きてしまいます。)
LambdaとESはIAMロールでの認証を通します。
実際にやってみる
LambdaとES間疎通
Lambdaに付与するロールの作成
ロールには以下のポリシーを付与してください
{
"Version": "2012-10-17",
"Statement": [
{
"Action": "sts:AssumeRole",
"Resource": "*",
"Effect": "Allow"
}
]
}
Lambdaに付与するロールを許可するようにESのポリシーを更新
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"AWS": "arn:aws:iam::************:role/your-lambda-role"
},
"Action": "es:*",
"Resource": "arn:aws:es:us-west-2:************:domain/test-es53/*"
}
]
}
lambdaデプロイ
zipファイル作成
以下のコマンドを実行してzipファイルを用意します。 index.pyの内容に関しては適宜、置き換えを行ってください。
$ mkdir RotateIndexForES
$ cd RotateIndexForES
$ pip install elasticsearch-curator -t .
$ pip install requests-aws4auth -t .
$ vim index.py
$ zip -r ../RotateIndexForES.zip *
index.py
from __future__ import print_function
from elasticsearch import Elasticsearch, RequestsHttpConnection
from requests_aws4auth import AWS4Auth
import boto3
import curator
endpoint = 'search-***************.us-west-2.es.amazonaws.com' # ESのエンドポイント
region = 'us-west-2'
rotation_period = 1 # ローテートを行う範囲
prefix = 'your-prefix-' # ローテートしたいインデックスのprefix
def lambda_handler(event, context):
credentials = get_credential()
awsauth = AWS4Auth(
credentials['access_key'],
credentials['secret_key'],
region,
'es',
session_token=credentials['token']
)
es = Elasticsearch(
hosts=[{'host': endpoint, 'port': 443}],
http_auth=awsauth,
use_ssl=True,
verify_certs=True,
connection_class=RequestsHttpConnection
)
runCurator(es)
def get_credential():
sts_client = boto3.client('sts')
assumedRoleObject = sts_client.assume_role(
RoleArn="arn:aws:iam::************:role/your-lambda-role",
RoleSessionName="Access_to_ES_from_lambda"
)
credentials = assumedRoleObject['Credentials']
return {
'access_key': credentials['AccessKeyId'],
'secret_key': credentials['SecretAccessKey'],
'token': credentials['SessionToken']
}
def runCurator(es):
ilo = curator.IndexList(es)
ilo.filter_by_regex(kind='prefix', value=prefix)
ilo.filter_by_age(source='creation_date', direction='older', unit='days', unit_count=rotation_period)
delete_indices = curator.DeleteIndices(ilo)
delete_indices.do_action()
※ Pythonのversionは3.6を利用するので、ローカルもPython versionを3.6にしてください
Lambda関数の作成
GUIからlambda関数を作成しましょう。各設定値は以下にしてください。
- ランタイム: Python3.6
- ハンドラ : index.lambda_handler
- ロール : your-lambda-role
zipファイルのアップロードとテスト実行
作成したzipファイルをLambdaにアップロードしテストの実行を行うとインデックスが削除されていると思います。
以下に例を載せてみました。今回はrotation_periodの値を1にしたので前日のインデックスだけ残るようにしました。
Before
After
まとめ
以上でCuratorを利用したESのインデックスの削除が行えるようになりました。
今回作成したLambdaにCloudwatch Eventsをトリガにすることでserverlessなインデックスローテートが完成します。
ESのインデックスローテートを行っていない方は、是非Curatorでのインデックスローテートをお試しください。
参考
- http://curator.readthedocs.io/en/latest/objectclasses.html#indexlist
- https://elasticsearch-py.readthedocs.io/en/master/
DMMでは一緒に開発部門を盛り上げてくれるエンジニアを募集しています!
詳しくはコチラからご覧ください!