新しいことにはウェルカム

技術 | 電子工作 | ガジェット | ゲーム のメモ書き

MySQLのデータベースを、スキーマレスでパーティショニングしながらBigQueryにロードしてみる

BigQueryはデータ量が膨大でも、インフラの事は全く(本当に全く)気にしなくてよく、しかも早くて安いので、 データは全てBigQueryに入れてしまって、全部BigQueryで処理したくなってしまいます。

そんな訳で、MySQLのデータベースをまるまるBigQueryにロードしたいことがあって、その時の作業ポエムです。

構成・要件

MySQLの容量は数百ギガで、テーブル数は数百でした。

幸い、全てのテーブルで、レコード作成日時が同じカラム名で記録されていたので、追々の利便性も考えて、 レコード作成日でパーティショニングしたいなぁと考えました。

BigQueryにロードする時に面倒くさいこと

パーティショニングが面倒くさい

BigQueryは、テーブルを日付で分割して、日付に分けてデータをロードすることができます。

例えば、2019-01-01の部分にデータをロードするには下記のようになります。

bq load <data_set>.<table>$20190101 <data_source>

ただ、これが面倒くさい。一度にまとめてロードできず、1日づつロード処理する必要があります。

しかし、ロード先の日付が、データの日付カラムと同じ場合は、カラムを指定してロードすることができます。

下記のように日付カラムを指定すると、全期間のデータをまとめてロードしても、日付カラムの日付に基づいてパーティションに分けてくれます。

bq load --time_partitioning_field '<time_cloumn_name>' <data_set>.<table> <data_source>

今回はこれを使ったのですがとても便利です!

テーブル作成とスキーマ定義が面倒くさい

MySQLのテーブルと同じテーブルを、BigQuery側に作る必要があるのですが、これが超面倒くさいです…。

MySQLに既に情報があるのだから、それを参照して自動で作って欲しいです。

MySQLからBigQueryへの転送サービスはいくつかあったのですが、処理はテーブル単位でかつ、何らかのテーブル定義・スキーマ定義は書く必要がありました。

WebのGUIでテーブルを1つづつ登録していくとなると、気が遠くなりそうですし、途中で必ずミスる自信があります…。

ParquetでBigQueryにロードする

スクリプト等の力技で、MySQLのスキーマからBigQueryスキーマを作成して、BigQueryでのロード時に、作成したスキーマを指定するという方法もあるのですが、もっと楽できるなら楽したいです。

BigQueryのloadのヘルプ見ていると、「Parquet」という、カラムのデータ型も情報に持つデータフォーマットがあるのですが、それでのロードが可能とありました。

試してみたところよさげだったので、今回そちらを使うことにしました。

「Parquet」でロードすると、下記のようなメリットがあります。

  • テーブル作成が不要
    • ロードする時にテーブルが無いと、自動で作成してくれます
  • スキーマ定義が不要
    • カラム名・型は、Parquetから取得して、自動で作成してくれます
  • スキーマ更新が不要
    • 新しくカラムが追加されると、Parquet読み込み時に自動で追加してくれます

つまり、テーブル名だけ指定して、Parquetファイルをロードすると、後はよしなにテーブルを作ってロードしてくれます。

カラムの事はノータッチ!もちろん、日付カラム名を指定しての自動パーティショニング機能も使えます!

MySQLのテーブルを、一旦ParquetでGoogle Cloud Storageに出力して、それをBigQueryにロードすることにしました。

BigQuery側の操作はロードコマンドを呼ぶだけなので、一番面倒くさかった、テーブル作成とスキーマ定義の煩わしさから開放されました!

MySQLからParquetで出力する

MySQLからParquet形式でのデータ出力は、Sparkで行うことにしました。Sparkはプログラミングが必要なので多少とっつきにくいのですが、 今回のように右から左にデータを流すくらいなら、簡単に書けます。

例えば、MySQLのテーブルを読み込んで、StoregeにParquetで出力して、StorageからBigQueryにロードは下記のようになります。

import subprocess

optons = {
    "url":"jdbc:mysql://<sv_name>:3306/<db_name>",
    "driver":"com.mysql.jdbc.Driver",
    "dbtable":"<table_name>"
    "user":"xxxx",
    "password":"xxxx"
}

df = spark.read.format("jdbc").options(**options).load()
df.write.mode("overwrite").format("parquet").\
    save("gs://xxxx/yyyy/<table_name>")

args = [
    "bq", "load",
    "--source_format", "PARQUET",
    "--time_partitionting_field", "<time_column_name>",
    <dataset>.<table_name>, 
    "gs://xxxx/yyyy/<table_name>/"
]

subprocess.run(args=args)

SparkのPython用BiqQueryコネクターはないので、直接bqコマンドでロードしています…。(Googleのサンプルもそうでした)

テーブルの数だけ実行する

後はテーブル一覧を「information_schema」から取得して、テーブルの数だけ先程の処理を実行するだけです。

import subprocess

OPTIONS = {
    "url":"jdbc:mysql://<sv_name>:3306/{}",
    "driver":"com.mysql.jdbc.Driver",
    "user":"xxxx",
    "password":"xxxx"
}

def getTableList():
    options = OPTIONS.copy()
    options["url"] = options["url"].format("information_schema")
    options["query"] = "SELECT table_name FROM tables WHERE table_schema='<db_name>'"

    df = spark.read.format("jdbc").options(**options).load()
    return list(map(lambda x: x["table_name"], df.collect()))

def main():
    for table in getTableList():
        options = OPTIONS.copy()
        options["url"] = options["url"].format("<db_name>")
        options["dbtable"] = table

        df = spark.read.format("jdbc").options(**options).load()

        df.write.mode("overwrite").format("parquet").\
            save("gs://xxxx/yyyy/{}".format(table))

        args = [
            "bq", "load",
            "--source_format", "PARQUET",
            "--time_partitionting_field", "<time_column_name>",
            <dataset>.<table_name>, 
            "gs://xxxx/yyyy/{}/*".format(table)
        ]

        subprocess.run(args=args)

main()

これでテーブルがいつくあろうと関係なく、スキーマ無しでMySQLのデータベースがまるっとBigQueryに入ります!

感想など

データ集計はBigQueryで十分満足しているので、あえてSparkを使うつもりはないのですが、Sparkは、あるサービスのデータを別のサービスに変換したい時などにも使えて便利ですね。

例えば、SparkにはAWS S3、Google Cloud Storageのコネクターがあるので、load("s3a://~")してsave("gs://~")するだけで、 簡単にAWS S3からGoogle Cloud Storageにデータ移動ができてしまいます。

これでBigQueryのコネクターがあれば最高なんですけどね。

MySQLなどのデータベースからデータを取ってくる場合、読み込み時にタイムアウトになることがあるので、「spark.executor.heartbeatInterval」と「spark.network.timeout」の値は大き目にしておいた方がいいです。

関連カテゴリー記事

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com