今回は優秀なBIツールである「Tableau」のインフラ環境を、EMRとRedShiftで作ってみましょう。
なお、「BI」という単語に馴染みがないかもしれませんが、「BI」とは『Business Inteligence』の略となります。
つまりは仕事上の意思決定に必要な情報、それらをわかりやすいように分析・表示してくれるツールが「BIツール」です。
もしBIツールを使いこなすことができれば、重要な決定を正しく下したり、提案の有効性を強力にアピールすることが出来ます。
それでは早速構築方法を見ていきましょう。
概要
以下のような流れで構築を進めていきます。
- Tableau分析をするためのインフラ基盤
- EMRでデータ変換を行う
- データ変換はEMR Python Libraryを使用する
- Redshift上にデータをロードする
構成図
全体の構成図は以下の通りです。
開発する環境
まずはAWSで簡易的な分析基盤を開発していきます。
- EMR(serverless)
- RedShift
- Lambda
- アップロードされたCSVをロードできる形式に変換する
- 変換された形式のファイルをRedshiftにロードする
※ なお今回の環境構築はTerraformを使用して構築しております。
作成するスクリプト
後述のリポジトリを利用するために、以下の開発が必要となります。
- アップロードされたCSVをロードできる形式にCSV形式にするPythonスクリプト
- 変換されたCSVをロードするLambda関数
どちらもリンクがサンプルになりますのでご利用ください。
解説
EMR
AWS EMR(Amazon Elastic MapReduce)は、Amazonが提供するビッグデータ解析サービスです。このサービスを使うことで、企業は大量のデータを迅速かつ効率的に分析することができます。データの解析を通じて、顧客の動向を把握したり、市場のトレンドを分析したりすることが可能になります。
利用料金は使った分だけ支払う形式なので、コストを抑えながら必要な時に必要なリソースを利用できる点が魅力です。また、他のAmazonのサービスと組み合わせることで、データの収集から解析までを一元管理することも可能です。このように、EMRはデータを活用してビジネスの意思決定を支援する強力なツールです。
RedShift
Amazon Redshiftは、AWSが提供するクラウドベースのデータウェアハウスサービスです。このサービスは、大量のデータを迅速かつ効率的に分析するためのもので、企業がより良い意思決定を行うための洞察を得ることを支援します。
またAmazon S3やDynamoDBなど、他のAWSサービスとスムーズに連携できます。
これらの特性により、Redshiftはマーケティングデータの分析、財務報告、顧客行動の分析など、多岐にわたるビジネスニーズに対応する強力なツールとなっています。
EMR Python Library
Amazon EMR ServerlessアプリケーションでPySparkジョブを実行するとき、依存関係として様々なPythonライブラリをパッケージ化することができます。しかし、標準のimageにはinstallされていませんので、Pythonのネイティブ機能を使用するか、仮想環境を構築する必要があります。
今回はDockerでImageを作成する手順を解説します。
構成ファイルは以下になります。
emr_python_library
├── Dockerfile
└── requirements.txt
Dockerfileの中身は以下になります。
FROM --platform=linux/x86_64 amazonlinux:2 AS base
RUN yum install -y python3
ENV VIRTUAL_ENV=/opt/venv
RUN python3 -m venv $VIRTUAL_ENV
ENV PATH="$VIRTUAL_ENV/bin:$PATH"
COPY requirements.txt .
RUN python3 -m pip install --upgrade pip && \
python3 -m pip install \
great_expectations==0.15.6 \
venv-pack==0.2.0
RUN python3 -m pip install -r requirements.txt
RUN mkdir /output && venv-pack -o /output/pyspark_ge.tar.gz
FROM scratch AS export
COPY --from=base /output/pyspark_ge.tar.gz /
requirements.txtではpysparkに必要なライブラリを用意します。
numpy==1.21.6
pandas==1.3.5
pyarrow==12.0.1
scipy==1.7.3
matplotlib==3.5.3
pyspark==3.4.3
以下の手順でpyspark_ge.tar.gzを作成します。
cd emr_python_library
docker build --output . .
これによりファイル“pyspark_ge.tar.gz“が作成されます。
emr_python_library
├── Dockerfile
├── pyspark_ge.tar.gz
└── requirements.txt
作成した“pyspark_ge.tar.gz“はS3にアップロードされるようにします。
resource "aws_s3_object" "script_venv_object" {
bucket = aws_s3_bucket.script_bucket.bucket
key = local.pyspark_env_archive # アップロードするファイルのS3上のパス
source = "./emr_python_library/${local.pyspark_env_archive}" # ローカルシステム上のファイルのパス
# オプション: ファイルの内容が変わったときだけアップロードをトリガーする
etag = filemd5("./emr_python_library/${local.pyspark_env_archive}")
}
EMRでのデータ変換
今回はS3にPUTされたCSVファイルEventTriggerが検知して、EMR上で変換スクリプトを実行します。変換後別のS3にPUTします。
以下はサンプルのスクリプトです。
import sys
import urllib
from pyspark.sql import SparkSession
from pyspark.sql.functions import \
col, \
split, \
concat_ws, \
concat, \
date_format, \
lit
def main():
input_bucket = sys.argv[1]
input_key = urllib.parse.unquote_plus(sys.argv[2])
input_path = f"s3://{input_bucket}/{input_key}"
output_bucket = sys.argv[3]
output_key = urllib.parse.unquote_plus(sys.argv[4])
output_path = f"s3://{output_bucket}/{output_key}"
print(f"Input path: {input_path}")
print(f"Output path: {output_path}")
spark = SparkSession.builder.appName("S3 to EMR Serverless Processing").getOrCreate()
# S3からCSVファイルを読み込む
df = spark.read.option("encoding", "Shift_JIS").csv(input_path, header=True, inferSchema=True)
# split_consultday = split(col("consultday"), " ")
# データ処理(例:列の追加)
# df_modified = df....
# 列名の変更
# df_modified = df_modified...
# データ処理(例:特定の列を選択)
processed_df = df_modified...
# 処理結果をS3に保存
processed_df.write.option("encoding", "UTF-8").csv(output_path, header=True, mode="overwrite")
spark.stop()
if __name__ == "__main__":
main()
Load Redshiftについて
変換されたCSVファイルがPUTされたら、別のEventTriggerでロード用のLambdaを実行します。
import boto3
import os
import urllib
def lambda_handler(event, context):
print(event)
print(context)
redshift = boto3.client('redshift-data')
cluster_identifier = os.environ.get('REDSHIFT_CLUSTER_IDENTIFIER')
database = os.environ.get('DATABASE_NAME')
db_user = os.environ.get('DATABASE_USER')
# イベントからファイル名とバケット名を取得
bucket = event['Records'][0]['s3']['bucket']['name']
key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'])
s3_path = f's3://{bucket}/{key}'
table_name = os.environ.get('TABLE_NAME')
redshift_iam_role_arn = os.environ.get('REDSHIFT_IAM_ROLE_ARN')
copy_query = f"""
COPY {database}.public.\"{table_name}\"
FROM '{s3_path}'
IAM_ROLE '{redshift_iam_role_arn}'
FORMAT AS CSV DELIMITER ',' QUOTE '"'
IGNOREHEADER 1
BLANKSASNULL
DATEFORMAT 'auto'
REGION AS 'ap-northeast-1'"""
print(f'copy_query: {copy_query}')
# Redshiftにクエリを送信
redshift.execute_statement(
ClusterIdentifier=cluster_identifier,
Database=database,
DbUser=db_user,
Sql=copy_query
)
これでTableauインフラ環境が構築できました!
その他参考資料など
本記事で紹介されたコードは、下記を参考に記述されています。
Using Python libraries with EMR Serverless
:https://docs.aws.amazon.com/ja_jp/emr/latest/EMR-Serverless-UserGuide/using-python-libraries.html
以下のgithubからもソースコードを見ることが出来ます。
ソースコード:https://github.com/TodoONada/redshift-emr-infra
また、弊社では『マッチングワン』という『低コスト・短期にマッチングサービスを構築できる』サービスを展開しており、今回ご紹介するコードは、その『マッチングワン』でも使われているコードとなります。
本記事で紹介したようなプロダクトを開発されたい場合は、是非お問い合わせください。