新しいことにはウェルカム

技術 | 電子工作 | ガジェット | ゲーム のメモ書き

MySQLのデータベースを、スキーマレスでパーティショニングしながらBigQueryにロードしてみる

BigQueryはデータ量が膨大でも、インフラの事は全く(本当に全く)気にしなくてよく、しかも早くて安いので、 データは全てBigQueryに入れてしまって、全部BigQueryで処理したくなってしまいます。

そんな訳で、MySQLのデータベースをまるまるBigQueryにロードしたいことがあって、その時の作業ポエムです。

構成・要件

MySQLの容量は数百ギガで、テーブル数は数百でした。

幸い、全てのテーブルで、レコード作成日時が同じカラム名で記録されていたので、追々の利便性も考えて、 レコード作成日でパーティショニングしたいなぁと考えました。

BigQueryにロードする時に面倒くさいこと

パーティショニングが面倒くさい

BigQueryは、テーブルを日付で分割して、日付に分けてデータをロードすることができます。

例えば、2019-01-01の部分にデータをロードするには下記のようになります。

bq load <data_set>.<table>$20190101 <data_source>

ただ、これが面倒くさい。一度にまとめてロードできず、1日づつロード処理する必要があります。

しかし、ロード先の日付が、データの日付カラムと同じ場合は、カラムを指定してロードすることができます。

下記のように日付カラムを指定すると、全期間のデータをまとめてロードしても、日付カラムの日付に基づいてパーティションに分けてくれます。

bq load --time_partitioning_field '<time_cloumn_name>' <data_set>.<table> <data_source>

今回はこれを使ったのですがとても便利です!

テーブル作成とスキーマ定義が面倒くさい

MySQLのテーブルと同じテーブルを、BigQuery側に作る必要があるのですが、これが超面倒くさいです…。

MySQLに既に情報があるのだから、それを参照して自動で作って欲しいです。

MySQLからBigQueryへの転送サービスはいくつかあったのですが、処理はテーブル単位でかつ、何らかのテーブル定義・スキーマ定義は書く必要がありました。

WebのGUIでテーブルを1つづつ登録していくとなると、気が遠くなりそうですし、途中で必ずミスる自信があります…。

ParquetでBigQueryにロードする

スクリプト等の力技で、MySQLのスキーマからBigQueryスキーマを作成して、BigQueryでのロード時に、作成したスキーマを指定するという方法もあるのですが、もっと楽できるなら楽したいです。

BigQueryのloadのヘルプ見ていると、「Parquet」という、カラムのデータ型も情報に持つデータフォーマットがあるのですが、それでのロードが可能とありました。

試してみたところよさげだったので、今回そちらを使うことにしました。

「Parquet」でロードすると、下記のようなメリットがあります。

  • テーブル作成が不要
    • ロードする時にテーブルが無いと、自動で作成してくれます
  • スキーマ定義が不要
    • カラム名・型は、Parquetから取得して、自動で作成してくれます
  • スキーマ更新が不要
    • 新しくカラムが追加されると、Parquet読み込み時に自動で追加してくれます

つまり、テーブル名だけ指定して、Parquetファイルをロードすると、後はよしなにテーブルを作ってロードしてくれます。

カラムの事はノータッチ!もちろん、日付カラム名を指定しての自動パーティショニング機能も使えます!

MySQLのテーブルを、一旦ParquetでGoogle Cloud Storageに出力して、それをBigQueryにロードすることにしました。

BigQuery側の操作はロードコマンドを呼ぶだけなので、一番面倒くさかった、テーブル作成とスキーマ定義の煩わしさから開放されました!

MySQLからParquetで出力する

MySQLからParquet形式でのデータ出力は、Sparkで行うことにしました。Sparkはプログラミングが必要なので多少とっつきにくいのですが、 今回のように右から左にデータを流すくらいなら、簡単に書けます。

例えば、MySQLのテーブルを読み込んで、StoregeにParquetで出力して、StorageからBigQueryにロードは下記のようになります。

import subprocess

optons = {
    "url":"jdbc:mysql://<sv_name>:3306/<db_name>",
    "driver":"com.mysql.jdbc.Driver",
    "dbtable":"<table_name>"
    "user":"xxxx",
    "password":"xxxx"
}

df = spark.read.format("jdbc").options(**options).load()
df.write.mode("overwrite").format("parquet").\
    save("gs://xxxx/yyyy/<table_name>")

args = [
    "bq", "load",
    "--source_format", "PARQUET",
    "--time_partitionting_field", "<time_column_name>",
    <dataset>.<table_name>, 
    "gs://xxxx/yyyy/<table_name>/"
]

subprocess.run(args=args)

SparkのPython用BiqQueryコネクターはないので、直接bqコマンドでロードしています…。(Googleのサンプルもそうでした)

テーブルの数だけ実行する

後はテーブル一覧を「information_schema」から取得して、テーブルの数だけ先程の処理を実行するだけです。

import subprocess

OPTIONS = {
    "url":"jdbc:mysql://<sv_name>:3306/{}",
    "driver":"com.mysql.jdbc.Driver",
    "user":"xxxx",
    "password":"xxxx"
}

def getTableList():
    options = OPTIONS.copy()
    options["url"] = options["url"].format("information_schema")
    options["query"] = "SELECT table_name FROM tables WHERE table_schema='<db_name>'"

    df = spark.read.format("jdbc").options(**options).load()
    return list(map(lambda x: x["table_name"], df.collect()))

def main():
    for table in getTableList():
        options = OPTIONS.copy()
        options["url"] = options["url"].format("<db_name>")
        options["dbtable"] = table

        df = spark.read.format("jdbc").options(**options).load()

        df.write.mode("overwrite").format("parquet").\
            save("gs://xxxx/yyyy/{}".format(table))

        args = [
            "bq", "load",
            "--source_format", "PARQUET",
            "--time_partitionting_field", "<time_column_name>",
            <dataset>.<table_name>, 
            "gs://xxxx/yyyy/{}/*".format(table)
        ]

        subprocess.run(args=args)

main()

これでテーブルがいつくあろうと関係なく、スキーマ無しでMySQLのデータベースがまるっとBigQueryに入ります!

感想など

データ集計はBigQueryで十分満足しているので、あえてSparkを使うつもりはないのですが、Sparkは、あるサービスのデータを別のサービスに変換したい時などにも使えて便利ですね。

例えば、SparkにはAWS S3、Google Cloud Storageのコネクターがあるので、load("s3a://~")してsave("gs://~")するだけで、 簡単にAWS S3からGoogle Cloud Storageにデータ移動ができてしまいます。

これでBigQueryのコネクターがあれば最高なんですけどね。

MySQLなどのデータベースからデータを取ってくる場合、読み込み時にタイムアウトになることがあるので、「spark.executor.heartbeatInterval」と「spark.network.timeout」の値は大き目にしておいた方がいいです。