はじめに
dynamoDBの結果を定期的に確認したい。という要望があり、その実装を行いました。基本的な方針はAWS Lambdaで定期実行を行い、dynamoDB内のデータをcsv形式に保存して、google driveにアップロードするという流れです。
dynamoDBのデータをcsv等に出力することはAWS管理画面からの操作だけではできません。(表示している分しか出力できないという制限が存在する)また、定期実行も使いたかったので今回はpythonとAWS Lambdaを使います。
実装
dynamoDBのテーブルからデータを取得していきます。GUI操作では制限があると書きましたが、コードからの操作でも実は制限があり、5000件までしか取得できません。そこで、ループ処理を加えて全てのデータを取得していきます。
def get_all_data(dynamodb, table_name):
response = dynamodb.scan(TableName=table_name)
data = response['Items']
# レスポンスに LastEvaluatedKey が含まれなくなるまでループ処理を実行する
while 'LastEvaluatedKey' in response:
response = dynamodb.scan(TableName=table_name, ExclusiveStartKey=response['LastEvaluatedKey'])
if 'LastEvaluatedKey' in response:
print("LastEvaluatedKey: {}".format(response['LastEvaluatedKey']))
data.extend(response['Items'])
return data
次にgoogle driveへのアップロードは以下のコードで行います。<<your_parents>>には保存したいdriveのid名を設定します。
また、同一階層にサービスアカウントキーを配置してください。
google driveにアップロードするためには APIを使えるように設定する必要があります。google drive APIキーの取得方法はこちらが参考になります。今回はgoogle driveをアップロード先に選んでいますが、任意の環境に変更してください。
def uploadFileToGoogleDrive(fileName, localFilePath):
ext = os.path.splitext(localFilePath.lower())[1][1:]
if ext == "jpg":
ext = "jpeg"
mimeType = "image/" + ext
service = getGoogleService()
file_metadata = {"name": fileName, "mimeType": mimeType, "parents": [<<your_parents>>] }
media = MediaFileUpload(localFilePath, mimetype=mimeType, resumable=True)
file = service.files().create(body=file_metadata, media_body=media, fields='id').execute()
def getGoogleService():
scope = ['https://www.googleapis.com/auth/drive.file']
keyFile = 'service-account-key.json'
credentials = ServiceAccountCredentials.from_json_keyfile_name(keyFile, scopes=scope)
return build("drive", "v3", credentials=credentials, cache_discovery=False)
<<table_name>>と<<column_names>>には保存したいDynamoDBのテーブル情報を入力します。
import boto3
from boto3.session import Session
from googleapiclient.discovery import build
from googleapiclient.http import MediaFileUpload
from oauth2client.service_account import ServiceAccountCredentials
import csv
import json
import os
import ast
import datetime
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def lambda_handler(event, context):
session = Session()
dynamo = session.client("dynamodb")
table_names = [<<table_name>>]
labels_list = [[<<column_names>>]]
dt_now = datetime.datetime.now()
for table_name, labels in zip (table_names, labels_list):
get_data = get_all_data(dynamo, table_name)
with open('/tmp/upload.csv', 'w', encoding='utf_8_sig') as f:
writer = csv.DictWriter(f, fieldnames=labels)
writer.writeheader()
for elem in get_data:
keys = elem.keys()
values = elem.values()
values_list = []
for v in values:
dic = ast.literal_eval(str(v))
if type(list(dic.values())[0]) is list:
lis = [list(dic.values())[0][0]['N'], list(dic.values())[0][1]['N'], list(dic.values())[0][2]['N']]
values_list.append(lis)
else:
print(list(dic.values())[0])
values_list.append(list(dic.values())[0])
d = dict(zip(keys,values_list))
writer.writerow(d)
uploadFileToGoogleDrive(f'{table_name}_{dt_now.year}_{dt_now.month}_{dt_now.day}.csv', '/tmp/upload.csv')
return
結果
無事、google driveにアップロードすることができました。
おわりに
以上で任意のテーブルデータをcsv形式でgoogle driveに出力することができるようになりました。データの共有は必要な工程だと思うので、今後も出番があるかもしれません。