新しいことにはウェルカム

技術 | 電子工作 | ガジェット | ゲーム のメモ書き

Sparkのよく使うコードメモ

Sparkのよく使うコードの個人的メモ書きです。

データ読み込み

基本

  • パス
    • listで複数渡すことができる
    • blob形式でワイルドカードが使える
    • blob形式でサブディレクトリのワイルドカードが使える
  • パラメータ
df = spark.read.format("csv").\
    schema("col1 int, col2 string, col3 date").\
    option("timestampFormat", "yyyy/MM/dd HH:mm:ss").\
    option("header", true).\
    load("gs://xxxx/xxxx/*/*")

df = spark.read.format("json").\
    load(["gs://xxxx/xxxx", "gs://yyyy/yyy"])

AWS S3・Google Cloud Storage

  • S3は「s3://」ではなく「s3a://」
df = spark.read.format("csv").load("gs://xxxx/xxxx")
df = spark.read.format("json").load("s3a://xxxx/xxxx")

JDBCドライバー

_options = {
    "url": "jdbc:mysql://<server_name>:3306/<db_name>",
    "driver": "com.mysql.jdbc.Driver",
    "user": "xxxx",
    "password": "xxxx"
}

# テーブルを読み込む
options = _options.copy()
options["dbtable"] = "<table_name>"
df = spark.read.format("jdbc")\
    .options(**options)
    .load()

# クエリを読み込む
options = _options.copy()
options["query"] = "SELECT xxxx FROM xxxx"
df = spark.read.format("jdbc")\
    .options(**options)
    .load()

Text

# 1行を1カラムとして読み込み(カラム名は「value」)
df = spark.read.text("gs://xxxx")

# 1ファイルを1カラムとして読み込み(カラム名は「value」)
df = spark.read.text("gs://xxxx", wholetext=True)

手動DataFrame作成

json = sc.parallelize((
    {"id":123, "name":"abc"},
    {"id":123, "name":"abc"}
))

df_json = spark.createDataFrame(json, "id integer, name string")

csv = sc.parallelize([
    ("123", "abc"),
    ("123", "abc")
])

df_csv = spark.createDataFrame(csv, "id integer, name string")

書き込み

ファイル

  • 出力パスはフォルダを指定。単一ファイル出力はできない
  • path・format・modeはoption()options()で設定できない
  • パラメータ
    • 共通
      • dateFormat:stringで書き込みフォーマットを指定
      • timestampFormat:stringで書き込みフォーマットを指定
      • compression:"gzip"。圧縮する場合のフォーマット
    • CSV
      • header:True・False。ヘッダーあり・なし指定。デフォルトFalse
    • mode
      • append:追加
      • overwrite:上書き
        • 既にディレクトリとファイルがあると、ディレクトリの中のファイルを削除してくれる
df.write.format("json").\
    .mode("overwrite").\
    .save("gs://xxxx/xxxx")

パーティショニング

  • カラムの値でディレクトリを分けて出力できる
  • 使用したカラムはデータから削除されるので注意
  • mode()で「overwrite」すると、ベースパス以下が書き直されるので注意(末端ディレクトリだけが書き直されるのではない)
# 「gs://xxxx/xxxx/col1=yyy/col2=zzz/*」で出力される
# 「overwrite」だと「gs://xxxx/xxxx/*」の全ファイル・フォルダは一旦削除されてから出力される
df.write.format("json").\
    .mode("overwrite").\
    .partitionBy("col1", "col2").\
    .save("gs://xxxx/xxxx")

JDBCドライバー

  • テーブルが無い場合は作成してくれる
options = {
    "url":"jdbc:mysql://<server_name>:3306/<db_name>",
    "driver":"com.mysql.jdbc.Driver",
    "dbtable":"<table_name>",
    "user":"xxxx",
    "password":"xxxx"
}

df.write.format("jdbc").\
    mode("append").\
    options(**options).\
    save()

クエリ

  • DataFrameからビューを作成するとそのビュー名でクエリが書ける
  • 大抵の処理はクエリで完結する
    • 複雑な処理はクエリで書いた方が楽
  • ビューは元DataFrameの参照ではなくコピー
  • ビュー名とDataFrame名が同じでも可(別物)
  • 使える関数一覧
df.createOrReplaceTempView("<view_name>")
df = spark.sql("""
    SELECT
        *
    FROM <view_name>
    LIMIT 10
    """

クエリを使わない書き方

カラム追加

df.withCloumn("new_col", df["col1"])
df.withCloumn("new_col", df["col2"]*2)

# 定数はそのままでは設定できない
# litを介して定数を設定する
from pypark.sql.functions import lit
df.withColumn("new_col", lit("TEST"))

フィルター

df.filter("col1=123")
df.filter("col1=123 AND col2=111 ...")

カラムを削る

# col1、col2のみにする
df.select("col1", "col2")
df.select(df["col1"], df["col2"])

# col1、col2を削除する
df.drop("col1", "col2")
df.drop(df["col1"], df["col2"])

型変換

df.select(df["id"].cast("string"))

データ取得

  • df.collect()で、全結果を確定・集計して、Row型の配列で返す
  • Rowから要素を取り出すのはRow["<col_name>"]
  • Rowからマップに変換はRow.asDict()
  • df.toPandas()で、全結果を確定・集計して、Pandas DataFrameを返す
colArray = map(lambda x: [x["col1"], x["col2"]], df.collect())
dictArray = map(lambda x: x.asDict(), df.collect())
pd = df.toPandas()

デバッグ向け

# 中身表示
df.show(<個数>)

# スキーマ表示
df.printSchema()

# 個数取得
df.count()

その他

アプリケーション雛形

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('app_name').getOrCreate()
sc = spark.sparkContext

個数が0かを高速にチェックする

  • df.count()だと全データを集めるので時間がかかる。df.head(1)に要素があるかないかをチェックすると早い
if len(df.head(1)):
    print("df has a recode.")

RDDとは?

  • RDDとは1行のレコードのこと。各セルの値の配列。型は文字列
  • RDDはイミュータブル(変更できない)。変換は新しいRDDを生成することにより行われる

カラムに格納されたJSONデータを、テーブル(DataFrame)に展開する

  • 方針
    • カラム内のJSONでDataFrameを作成する
  • 手順
    • df.rddでRDD(Rowの配列)が返される
    • RowからRow[<カラム名>]で、カラムのデータを抽出する
  • schema等、sparl.readのJSON読み込みのパラメータが同じように使える
df_to = spark.read.json(
    df_from.rdd.map(lambda x: x[<json_column>])
)

リファレンス

参考記事