SparkはPythonプログラムなので、かなり自由に書くことができます。
しかし、いつも大体やることは決まっているし、色んな書き方を知っても、かえって記憶に残りづらくなってしまうので、Sparkの個人的によく使うコードを、1目的1コードの形にまとめておきます。
基本的なSpark作業
- ストレージやDBからデータを読み込んで、DataFrameを作成する
- DataFrame加工
- クエリによる加工
- 関数による加工
- ストレージやDBに、加工したDataFrameを書き出す
データ読み込み
spark.read
を使ってストレージのデータを読み込んでDataFrameを作成- ファイルフォーマットは主にCSV・JSON
基本
パス
- listで複数パスを渡すことができる
- blob形式でワイルドカードが使える
- blob形式でサブディレクトリのワイルドカードが使える
パラメータ
- パラメータは
option()
、options()
で設定する - path・format・schemaは
option()
、options()
で設定できない - フォーマットはformatで指定
- CSV・JSON
- schema:stringでデータ型を指定
- 型名は「class pyspark.sql.types.xxxxTypes」の「xxxx」を小文字にしたもの
- https://spark.apache.org/docs/2.4.0/api/python/pyspark.sql.html#pyspark.sql.types.DataType
- schema:stringでデータ型を指定
- CSV
- header:True・False。ヘッダーあり・なし指定。デフォルトFalse
- dateFormat:stringで日付の読み込みフォーマットを指定できる
- 「YYYY/mm/dd」などのフォーマットが読み込めるようになる
- https://docs.oracle.com/javase/jp/6/api/java/text/SimpleDateFormat.html
- timestampFormat:stringでtimestampの読み込みフォーマットを指定できる
df = spark.read.format("csv").\ schema("col1 int, col2 string, col3 date").\ option("timestampFormat", "yyyy/MM/dd HH:mm:ss").\ option("header", true).\ load("gs://xxxx/xxxx/*/*") df = spark.read.format("json").\ load(["gs://xxxx/xxxx", "gs://yyyy/yyy"])
AWS S3・Google Cloud Storage
- S3は「s3://」ではなく「s3a://」
df = spark.read.format("csv").load("gs://xxxx/xxxx") df = spark.read.format("json").load("s3a://xxxx/xxxx")
JDBCドライバー
- RDBからDataFrameを作成できる
- テーブルを読み込む方法と、クエリ結果を読み込む方法がある
_options = { "url": "jdbc:mysql://<server_name>:3306/<db_name>", "driver": "com.mysql.jdbc.Driver", "user": "xxxx", "password": "xxxx" } # テーブルを読み込む options = _options.copy() options["dbtable"] = "<table_name>" df = spark.read.format("jdbc")\ .options(**options) .load() # クエリを読み込む options = _options.copy() options["query"] = "SELECT xxxx FROM xxxx" df = spark.read.format("jdbc")\ .options(**options) .load()
Text
- テキストファイルからDataFrameを作成できる
- 1行1レコードとする方法と、1ファイル1レコードとする方法がある
# 1行を1カラムとして読み込み(カラム名は「value」) df = spark.read.text("gs://xxxx") # 1ファイルを1カラムとして読み込み(カラム名は「value」) df = spark.read.text("gs://xxxx", wholetext=True)
手動DataFrame作成
- プログラムにデータ直書きしてDataFrameを作成できる
- カラム名と型の指定、つまりスキーマの指示が別途必要
json = sc.parallelize(( {"id":123, "name":"abc"}, {"id":123, "name":"abc"} )) df_json = spark.createDataFrame(json, "id integer, name string") csv = sc.parallelize([ ("123", "abc"), ("123", "abc") ]) df_csv = spark.createDataFrame(csv, "id integer, name string")
書き込み
DataFrame.write
を使って、DataFrameをファイルやDBに出力する
ファイル
- 出力パスはフォルダを表す
- ファイル名を指定して、単一ファイルに出力することはできない
パラメータ
- パラメータは
option()
、options()
で設定する - path・format・modeは
option()
、options()
で設定できない - dateFormat:stringで日付の出力フォーマットを指定
- timestampFormat:stringでtimestampの出力フォーマットを指定
- compression:"gzip"。圧縮して出力できる
- mode
- append:追加
- overwrite:上書き
- 既にディレクトリとファイルがあると、ディレクトリの中のファイルを削除してくれる
- CSV
- header:True・False。ヘッダーあり・なし指定。デフォルトFalse
df.write.format("json")\ .mode("overwrite")\ .save("gs://xxxx/xxxx")
パーティショニング
- カラムの値でディレクトリを分けて出力できる
- 使用したカラムはデータから削除されるので注意
mode()
で「overwrite」すると、ベースパス以下が書き直されるので注意(末端ディレクトリだけが書き直されるのではない)
# 「gs://xxxx/xxxx/col1=yyy/col2=zzz/*」で出力される # 「overwrite」だと「gs://xxxx/xxxx/*」の全ファイル・フォルダは一旦削除されてから出力される df.write.format("json")\ .mode("overwrite")\ .partitionBy("col1", "col2")\ .save("gs://xxxx/xxxx")
JDBCドライバー
- RDBにDataFrameを出力できる
- テーブルが無い場合は作成してくれる
options = { "url":"jdbc:mysql://<server_name>:3306/<db_name>", "driver":"com.mysql.jdbc.Driver", "dbtable":"<table_name>", "user":"xxxx", "password":"xxxx" } df.write.format("jdbc").\ mode("append").\ options(**options).\ save()
クエリ
SQL文で、DataFrameを作成できる機能(かなり便利)
まずDataFrameからビューを作成する。
すると、SQL文の中でそのビューを使うことができる。
df.createOrReplaceTempView("<view_name>") df = spark.sql(""" SELECT * FROM <view_name> LIMIT 10 """
- 大抵の処理はクエリで完結する
- 複雑な処理はクエリで書いた方が楽
- ビューは元DataFrameの参照ではなくコピー
- ビュー名とDataFrame名が同じでも可(別物)
- クエリ内で使える関数一覧
クエリを使わない書き方
カラム追加
df.withCloumn("new_col", df["col1"]) df.withCloumn("new_col", df["col2"]*2) # 定数はそのままでは設定できない # litを介して定数を設定する from pypark.sql.functions import lit df.withColumn("new_col", lit("TEST"))
フィルター
df.filter("col1=123") df.filter("col1=123 AND col2=111 ...")
カラムを削る
# col1、col2のみにする df.select("col1", "col2") df.select(df["col1"], df["col2"]) # col1、col2を削除する df.drop("col1", "col2") df.drop(df["col1"], df["col2"])
型変換
df.select(df["id"].cast("string"))
データ取得
プログラム内でDataFrameのデータを使いたい時のやり方
df.collect()
- 全結果を確定・集計して、Row型の配列で返す
Row
から要素を取り出すのはRow["<col_name>"]
Row
からマップに変換はRow.asDict()
df.toPandas()
- 全結果を確定・集計して、Pandas DataFrameを返す
colArray = map(lambda x: [x["col1"], x["col2"]], df.collect()) dictArray = map(lambda x: x.asDict(), df.collect()) pd = df.toPandas()
デバッグ向け
# 中身表示 df.show(<個数>) # スキーマ表示 df.printSchema() # 個数取得 df.count()
その他
アプリケーション雛形
Jupyter・Zeppelinで作成したプログラムを、Jobで実行できるアプリケーションにする方法
Jupyter・Zeppelinでは、事前に「spark」「sc」が作成されているので、「spark」「sc」を作成するコードを追加する。
from pyspark.sql import SparkSession spark = SparkSession.builder.appName('app_name').getOrCreate() sc = spark.sparkContext
個数が0かを高速にチェックする
- df.count()だと全データを集めるので時間がかかる。df.head(1)に要素があるかないかをチェックすると早い
if len(df.head(1)): print("df has a recode.")
RDDとは?
- RDDの実態は、1行のレコードの配列
- 1行のレコードは、各セルの値の配列。型は全て文字列
- RDDはヘッダー情報を持たない。データのみでできている
- RDDはイミュータブル(変更できない)。変換は新しいRDDを生成することにより行われる
カラムに格納されたJSONデータを、テーブル(DataFrame)に展開する
方針
- カラム内のJSONでDataFrameを作成する
手順
- df.rddでRDD(Rowの配列)が返される
- RowからRow[<カラム名>]で、カラムのデータを抽出する
- 抽出されたデータはJSONなので、
spark.read.json()
でJSONを読み込んでDataFrameを作成する- その際、schema等、
sparl.read
のJSON読み込みのパラメータが同じように使える
- その際、schema等、
df_to = spark.read.json(
df_from.rdd.map(lambda x: x[<json_column>])
)
リファレンス
- DataFrame
- DataFrame.read
- DataFrame.write
- DataType
- SQL Functions
- JDBC Driver・classPath
参考記事
- https://stackoverflow.com/questions/32707620/how-to-check-if-spark-dataframe-is-empty
- https://stackoverflow.com/questions/32788322/how-to-add-a-constant-column-in-a-spark-dataframe