新しいことにはウェルカム

技術 | 電子工作 | ガジェット | ゲーム のメモ書き

Sparkのよく使うコードメモ

SparkはPythonプログラムなので、かなり自由に書くことができます。

しかし、いつも大体やることは決まっているし、色んな書き方を知っても、かえって記憶に残りづらくなってしまうので、Sparkの個人的によく使うコードを、1目的1コードの形にまとめておきます。

基本的なSpark作業

  1. ストレージやDBからデータを読み込んで、DataFrameを作成する
  2. DataFrame加工
    • クエリによる加工
    • 関数による加工
  3. ストレージやDBに、加工したDataFrameを書き出す

データ読み込み

  • spark.readを使ってストレージのデータを読み込んでDataFrameを作成
  • ファイルフォーマットは主にCSV・JSON

基本

パス

  • listで複数パスを渡すことができる
  • blob形式でワイルドカードが使える
  • blob形式でサブディレクトリのワイルドカードが使える

パラメータ

df = spark.read.format("csv").\
    schema("col1 int, col2 string, col3 date").\
    option("timestampFormat", "yyyy/MM/dd HH:mm:ss").\
    option("header", true).\
    load("gs://xxxx/xxxx/*/*")

df = spark.read.format("json").\
    load(["gs://xxxx/xxxx", "gs://yyyy/yyy"])

AWS S3・Google Cloud Storage

  • S3は「s3://」ではなく「s3a://」
df = spark.read.format("csv").load("gs://xxxx/xxxx")
df = spark.read.format("json").load("s3a://xxxx/xxxx")

JDBCドライバー

  • RDBからDataFrameを作成できる
  • テーブルを読み込む方法と、クエリ結果を読み込む方法がある
_options = {
    "url": "jdbc:mysql://<server_name>:3306/<db_name>",
    "driver": "com.mysql.jdbc.Driver",
    "user": "xxxx",
    "password": "xxxx"
}

# テーブルを読み込む
options = _options.copy()
options["dbtable"] = "<table_name>"
df = spark.read.format("jdbc")\
    .options(**options)
    .load()

# クエリを読み込む
options = _options.copy()
options["query"] = "SELECT xxxx FROM xxxx"
df = spark.read.format("jdbc")\
    .options(**options)
    .load()

Text

  • テキストファイルからDataFrameを作成できる
  • 1行1レコードとする方法と、1ファイル1レコードとする方法がある
# 1行を1カラムとして読み込み(カラム名は「value」)
df = spark.read.text("gs://xxxx")

# 1ファイルを1カラムとして読み込み(カラム名は「value」)
df = spark.read.text("gs://xxxx", wholetext=True)

手動DataFrame作成

  • プログラムにデータ直書きしてDataFrameを作成できる
  • カラム名と型の指定、つまりスキーマの指示が別途必要
json = sc.parallelize((
    {"id":123, "name":"abc"},
    {"id":123, "name":"abc"}
))

df_json = spark.createDataFrame(json, "id integer, name string")

csv = sc.parallelize([
    ("123", "abc"),
    ("123", "abc")
])

df_csv = spark.createDataFrame(csv, "id integer, name string")

書き込み

  • DataFrame.writeを使って、DataFrameをファイルやDBに出力する

ファイル

  • 出力パスはフォルダを表す
  • ファイル名を指定して、単一ファイルに出力することはできない

パラメータ

  • パラメータはoption()options()で設定する
  • path・format・modeはoption()options()で設定できない
  • dateFormat:stringで日付の出力フォーマットを指定
  • timestampFormat:stringでtimestampの出力フォーマットを指定
  • compression:"gzip"。圧縮して出力できる
  • mode
    • append:追加
    • overwrite:上書き
      • 既にディレクトリとファイルがあると、ディレクトリの中のファイルを削除してくれる
  • CSV
    • header:True・False。ヘッダーあり・なし指定。デフォルトFalse
df.write.format("json")\
    .mode("overwrite")\
    .save("gs://xxxx/xxxx")

パーティショニング

  • カラムの値でディレクトリを分けて出力できる
  • 使用したカラムはデータから削除されるので注意
  • mode()で「overwrite」すると、ベースパス以下が書き直されるので注意(末端ディレクトリだけが書き直されるのではない)
# 「gs://xxxx/xxxx/col1=yyy/col2=zzz/*」で出力される
# 「overwrite」だと「gs://xxxx/xxxx/*」の全ファイル・フォルダは一旦削除されてから出力される
df.write.format("json")\
    .mode("overwrite")\
    .partitionBy("col1", "col2")\
    .save("gs://xxxx/xxxx")

JDBCドライバー

  • RDBにDataFrameを出力できる
  • テーブルが無い場合は作成してくれる
options = {
    "url":"jdbc:mysql://<server_name>:3306/<db_name>",
    "driver":"com.mysql.jdbc.Driver",
    "dbtable":"<table_name>",
    "user":"xxxx",
    "password":"xxxx"
}

df.write.format("jdbc").\
    mode("append").\
    options(**options).\
    save()

クエリ

SQL文で、DataFrameを作成できる機能(かなり便利)

まずDataFrameからビューを作成する。

すると、SQL文の中でそのビューを使うことができる。

df.createOrReplaceTempView("<view_name>")
df = spark.sql("""
    SELECT
        *
    FROM <view_name>
    LIMIT 10
    """
  • 大抵の処理はクエリで完結する
    • 複雑な処理はクエリで書いた方が楽
  • ビューは元DataFrameの参照ではなくコピー
  • ビュー名とDataFrame名が同じでも可(別物)
  • クエリ内で使える関数一覧

クエリを使わない書き方

カラム追加

df.withCloumn("new_col", df["col1"])
df.withCloumn("new_col", df["col2"]*2)

# 定数はそのままでは設定できない
# litを介して定数を設定する
from pypark.sql.functions import lit
df.withColumn("new_col", lit("TEST"))

フィルター

df.filter("col1=123")
df.filter("col1=123 AND col2=111 ...")

カラムを削る

# col1、col2のみにする
df.select("col1", "col2")
df.select(df["col1"], df["col2"])

# col1、col2を削除する
df.drop("col1", "col2")
df.drop(df["col1"], df["col2"])

型変換

df.select(df["id"].cast("string"))

データ取得

プログラム内でDataFrameのデータを使いたい時のやり方

  • df.collect()
    • 全結果を確定・集計して、Row型の配列で返す
  • Rowから要素を取り出すのはRow["<col_name>"]
  • Rowからマップに変換はRow.asDict()
  • df.toPandas()
    • 全結果を確定・集計して、Pandas DataFrameを返す
colArray = map(lambda x: [x["col1"], x["col2"]], df.collect())
dictArray = map(lambda x: x.asDict(), df.collect())
pd = df.toPandas()

デバッグ向け

# 中身表示
df.show(<個数>)

# スキーマ表示
df.printSchema()

# 個数取得
df.count()

その他

アプリケーション雛形

Jupyter・Zeppelinで作成したプログラムを、Jobで実行できるアプリケーションにする方法

Jupyter・Zeppelinでは、事前に「spark」「sc」が作成されているので、「spark」「sc」を作成するコードを追加する。

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('app_name').getOrCreate()
sc = spark.sparkContext

個数が0かを高速にチェックする

  • df.count()だと全データを集めるので時間がかかる。df.head(1)に要素があるかないかをチェックすると早い
if len(df.head(1)):
    print("df has a recode.")

RDDとは?

  • RDDの実態は、1行のレコードの配列
  • 1行のレコードは、各セルの値の配列。型は全て文字列
  • RDDはヘッダー情報を持たない。データのみでできている
  • RDDはイミュータブル(変更できない)。変換は新しいRDDを生成することにより行われる

カラムに格納されたJSONデータを、テーブル(DataFrame)に展開する

方針

  • カラム内のJSONでDataFrameを作成する

手順

  • df.rddでRDD(Rowの配列)が返される
  • RowからRow[<カラム名>]で、カラムのデータを抽出する
  • 抽出されたデータはJSONなので、spark.read.json()でJSONを読み込んでDataFrameを作成する
    • その際、schema等、sparl.readのJSON読み込みのパラメータが同じように使える
df_to = spark.read.json(
    df_from.rdd.map(lambda x: x[<json_column>])
)

リファレンス

参考記事