BigQueryはデータ量が膨大でも、インフラの事は全く(本当に全く)気にしなくてよく、しかも早くて安いので、 データは全てBigQueryに入れてしまって、全部BigQueryで処理したくなってしまいます。
そんな訳で、MySQLのデータベースをまるまるBigQueryにロードしたいことがあって、その時の作業ポエムです。
構成・要件
MySQLの容量は数百ギガで、テーブル数は数百でした。
幸い、全てのテーブルで、レコード作成日時が同じカラム名で記録されていたので、追々の利便性も考えて、 レコード作成日でパーティショニングしたいなぁと考えました。
BigQueryにロードする時に面倒くさいこと
パーティショニングが面倒くさい
BigQueryは、テーブルを日付で分割して、日付に分けてデータをロードすることができます。
例えば、2019-01-01の部分にデータをロードするには下記のようになります。
bq load <data_set>.<table>$20190101 <data_source>
ただ、これが面倒くさい。一度にまとめてロードできず、1日づつロード処理する必要があります。
しかし、ロード先の日付が、データの日付カラムと同じ場合は、カラムを指定してロードすることができます。
下記のように日付カラムを指定すると、全期間のデータをまとめてロードしても、日付カラムの日付に基づいてパーティションに分けてくれます。
bq load --time_partitioning_field '<time_cloumn_name>' <data_set>.<table> <data_source>
今回はこれを使ったのですがとても便利です!
テーブル作成とスキーマ定義が面倒くさい
MySQLのテーブルと同じテーブルを、BigQuery側に作る必要があるのですが、これが超面倒くさいです…。
MySQLに既に情報があるのだから、それを参照して自動で作って欲しいです。
MySQLからBigQueryへの転送サービスはいくつかあったのですが、処理はテーブル単位でかつ、何らかのテーブル定義・スキーマ定義は書く必要がありました。
WebのGUIでテーブルを1つづつ登録していくとなると、気が遠くなりそうですし、途中で必ずミスる自信があります…。
ParquetでBigQueryにロードする
スクリプト等の力技で、MySQLのスキーマからBigQueryスキーマを作成して、BigQueryでのロード時に、作成したスキーマを指定するという方法もあるのですが、もっと楽できるなら楽したいです。
BigQueryのloadのヘルプ見ていると、「Parquet」という、カラムのデータ型も情報に持つデータフォーマットがあるのですが、それでのロードが可能とありました。
試してみたところよさげだったので、今回そちらを使うことにしました。
「Parquet」でロードすると、下記のようなメリットがあります。
- テーブル作成が不要
- ロードする時にテーブルが無いと、自動で作成してくれます
- スキーマ定義が不要
- カラム名・型は、Parquetから取得して、自動で作成してくれます
- スキーマ更新が不要
- 新しくカラムが追加されると、Parquet読み込み時に自動で追加してくれます
つまり、テーブル名だけ指定して、Parquetファイルをロードすると、後はよしなにテーブルを作ってロードしてくれます。
カラムの事はノータッチ!もちろん、日付カラム名を指定しての自動パーティショニング機能も使えます!
MySQLのテーブルを、一旦ParquetでGoogle Cloud Storageに出力して、それをBigQueryにロードすることにしました。
BigQuery側の操作はロードコマンドを呼ぶだけなので、一番面倒くさかった、テーブル作成とスキーマ定義の煩わしさから開放されました!
MySQLからParquetで出力する
MySQLからParquet形式でのデータ出力は、Sparkで行うことにしました。Sparkはプログラミングが必要なので多少とっつきにくいのですが、 今回のように右から左にデータを流すくらいなら、簡単に書けます。
例えば、MySQLのテーブルを読み込んで、StoregeにParquetで出力して、StorageからBigQueryにロードは下記のようになります。
import subprocess optons = { "url":"jdbc:mysql://<sv_name>:3306/<db_name>", "driver":"com.mysql.jdbc.Driver", "dbtable":"<table_name>" "user":"xxxx", "password":"xxxx" } df = spark.read.format("jdbc").options(**options).load() df.write.mode("overwrite").format("parquet").\ save("gs://xxxx/yyyy/<table_name>") args = [ "bq", "load", "--source_format", "PARQUET", "--time_partitionting_field", "<time_column_name>", <dataset>.<table_name>, "gs://xxxx/yyyy/<table_name>/" ] subprocess.run(args=args)
SparkのPython用BiqQueryコネクターはないので、直接bqコマンドでロードしています…。(Googleのサンプルもそうでした)
テーブルの数だけ実行する
後はテーブル一覧を「information_schema」から取得して、テーブルの数だけ先程の処理を実行するだけです。
import subprocess OPTIONS = { "url":"jdbc:mysql://<sv_name>:3306/{}", "driver":"com.mysql.jdbc.Driver", "user":"xxxx", "password":"xxxx" } def getTableList(): options = OPTIONS.copy() options["url"] = options["url"].format("information_schema") options["query"] = "SELECT table_name FROM tables WHERE table_schema='<db_name>'" df = spark.read.format("jdbc").options(**options).load() return list(map(lambda x: x["table_name"], df.collect())) def main(): for table in getTableList(): options = OPTIONS.copy() options["url"] = options["url"].format("<db_name>") options["dbtable"] = table df = spark.read.format("jdbc").options(**options).load() df.write.mode("overwrite").format("parquet").\ save("gs://xxxx/yyyy/{}".format(table)) args = [ "bq", "load", "--source_format", "PARQUET", "--time_partitionting_field", "<time_column_name>", <dataset>.<table_name>, "gs://xxxx/yyyy/{}/*".format(table) ] subprocess.run(args=args) main()
これでテーブルがいつくあろうと関係なく、スキーマ無しでMySQLのデータベースがまるっとBigQueryに入ります!
感想など
データ集計はBigQueryで十分満足しているので、あえてSparkを使うつもりはないのですが、Sparkは、あるサービスのデータを別のサービスに変換したい時などにも使えて便利ですね。
例えば、SparkにはAWS S3、Google Cloud Storageのコネクターがあるので、load("s3a://~")
してsave("gs://~")
するだけで、
簡単にAWS S3からGoogle Cloud Storageにデータ移動ができてしまいます。
これでBigQueryのコネクターがあれば最高なんですけどね。
MySQLなどのデータベースからデータを取ってくる場合、読み込み時にタイムアウトになることがあるので、「spark.executor.heartbeatInterval」と「spark.network.timeout」の値は大き目にしておいた方がいいです。