新しいことにはウェルカム

技術 | 電子工作 | ガジェット | ゲーム のメモ書き

Sparkのよく使うコードメモ

SparkはPythonプログラムなので、かなり自由に書くことができます。

しかし、いつも大体やることは決まっているし、色んな書き方を知っても、かえって記憶に残りづらくなってしまうので、Sparkの個人的によく使うコードを、1目的1コードの形にまとめておきます。

基本的なSpark作業

  1. ストレージやDBからデータを読み込んで、DataFrameを作成する
  2. DataFrame加工
    • クエリによる加工
    • 関数による加工
  3. ストレージやDBに、加工したDataFrameを書き出す

データ読み込み

  • spark.readを使ってストレージのデータを読み込んでDataFrameを作成
  • ファイルフォーマットは主にCSV・JSON

基本

パス

  • listで複数パスを渡すことができる
  • blob形式でワイルドカードが使える
  • blob形式でサブディレクトリのワイルドカードが使える

パラメータ

df = spark.read.format("csv").\
    schema("col1 int, col2 string, col3 date").\
    option("timestampFormat", "yyyy/MM/dd HH:mm:ss").\
    option("header", true).\
    load("gs://xxxx/xxxx/*/*")

df = spark.read.format("json").\
    load(["gs://xxxx/xxxx", "gs://yyyy/yyy"])

AWS S3・Google Cloud Storage

  • S3は「s3://」ではなく「s3a://」
df = spark.read.format("csv").load("gs://xxxx/xxxx")
df = spark.read.format("json").load("s3a://xxxx/xxxx")

JDBCドライバー

  • RDBからDataFrameを作成できる
  • テーブルを読み込む方法と、クエリ結果を読み込む方法がある
_options = {
    "url": "jdbc:mysql://<server_name>:3306/<db_name>",
    "driver": "com.mysql.jdbc.Driver",
    "user": "xxxx",
    "password": "xxxx"
}

# テーブルを読み込む
options = _options.copy()
options["dbtable"] = "<table_name>"
df = spark.read.format("jdbc")\
    .options(**options)
    .load()

# クエリを読み込む
options = _options.copy()
options["query"] = "SELECT xxxx FROM xxxx"
df = spark.read.format("jdbc")\
    .options(**options)
    .load()

Text

  • テキストファイルからDataFrameを作成できる
  • 1行1レコードとする方法と、1ファイル1レコードとする方法がある
# 1行を1カラムとして読み込み(カラム名は「value」)
df = spark.read.text("gs://xxxx")

# 1ファイルを1カラムとして読み込み(カラム名は「value」)
df = spark.read.text("gs://xxxx", wholetext=True)

手動DataFrame作成

  • プログラムにデータ直書きしてDataFrameを作成できる
  • カラム名と型の指定、つまりスキーマの指示が別途必要
json = sc.parallelize((
    {"id":123, "name":"abc"},
    {"id":123, "name":"abc"}
))

df_json = spark.createDataFrame(json, "id integer, name string")

csv = sc.parallelize([
    ("123", "abc"),
    ("123", "abc")
])

df_csv = spark.createDataFrame(csv, "id integer, name string")

書き込み

  • DataFrame.writeを使って、DataFrameをファイルやDBに出力する

ファイル

  • 出力パスはフォルダを表す
  • ファイル名を指定して、単一ファイルに出力することはできない

パラメータ

  • パラメータはoption()options()で設定する
  • path・format・modeはoption()options()で設定できない
  • dateFormat:stringで日付の出力フォーマットを指定
  • timestampFormat:stringでtimestampの出力フォーマットを指定
  • compression:"gzip"。圧縮して出力できる
  • mode
    • append:追加
    • overwrite:上書き
      • 既にディレクトリとファイルがあると、ディレクトリの中のファイルを削除してくれる
  • CSV
    • header:True・False。ヘッダーあり・なし指定。デフォルトFalse
df.write.format("json")\
    .mode("overwrite")\
    .save("gs://xxxx/xxxx")

パーティショニング

  • カラムの値でディレクトリを分けて出力できる
  • 使用したカラムはデータから削除されるので注意
  • mode()で「overwrite」すると、ベースパス以下が書き直されるので注意(末端ディレクトリだけが書き直されるのではない)
# 「gs://xxxx/xxxx/col1=yyy/col2=zzz/*」で出力される
# 「overwrite」だと「gs://xxxx/xxxx/*」の全ファイル・フォルダは一旦削除されてから出力される
df.write.format("json")\
    .mode("overwrite")\
    .partitionBy("col1", "col2")\
    .save("gs://xxxx/xxxx")

JDBCドライバー

  • RDBにDataFrameを出力できる
  • テーブルが無い場合は作成してくれる
options = {
    "url":"jdbc:mysql://<server_name>:3306/<db_name>",
    "driver":"com.mysql.jdbc.Driver",
    "dbtable":"<table_name>",
    "user":"xxxx",
    "password":"xxxx"
}

df.write.format("jdbc").\
    mode("append").\
    options(**options).\
    save()

クエリ

SQL文で、DataFrameを作成できる機能(かなり便利)

まずDataFrameからビューを作成する。

すると、SQL文の中でそのビューを使うことができる。

df.createOrReplaceTempView("<view_name>")
df = spark.sql("""
    SELECT
        *
    FROM <view_name>
    LIMIT 10
    """
  • 大抵の処理はクエリで完結する
    • 複雑な処理はクエリで書いた方が楽
  • ビューは元DataFrameの参照ではなくコピー
  • ビュー名とDataFrame名が同じでも可(別物)
  • クエリ内で使える関数一覧

クエリを使わない書き方

カラム追加

df.withCloumn("new_col", df["col1"])
df.withCloumn("new_col", df["col2"]*2)

# 定数はそのままでは設定できない
# litを介して定数を設定する
from pypark.sql.functions import lit
df.withColumn("new_col", lit("TEST"))

フィルター

df.filter("col1=123")
df.filter("col1=123 AND col2=111 ...")

カラムを削る

# col1、col2のみにする
df.select("col1", "col2")
df.select(df["col1"], df["col2"])

# col1、col2を削除する
df.drop("col1", "col2")
df.drop(df["col1"], df["col2"])

型変換

df.select(df["id"].cast("string"))

データ取得

プログラム内でDataFrameのデータを使いたい時のやり方

  • df.collect()
    • 全結果を確定・集計して、Row型の配列で返す
  • Rowから要素を取り出すのはRow["<col_name>"]
  • Rowからマップに変換はRow.asDict()
  • df.toPandas()
    • 全結果を確定・集計して、Pandas DataFrameを返す
colArray = map(lambda x: [x["col1"], x["col2"]], df.collect())
dictArray = map(lambda x: x.asDict(), df.collect())
pd = df.toPandas()

デバッグ向け

# 中身表示
df.show(<個数>)

# スキーマ表示
df.printSchema()

# 個数取得
df.count()

その他

アプリケーション雛形

Jupyter・Zeppelinで作成したプログラムを、Jobで実行できるアプリケーションにする方法

Jupyter・Zeppelinでは、事前に「spark」「sc」が作成されているので、「spark」「sc」を作成するコードを追加する。

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('app_name').getOrCreate()
sc = spark.sparkContext

個数が0かを高速にチェックする

  • df.count()だと全データを集めるので時間がかかる。df.head(1)に要素があるかないかをチェックすると早い
if len(df.head(1)):
    print("df has a recode.")

RDDとは?

  • RDDの実態は、1行のレコードの配列
  • 1行のレコードは、各セルの値の配列。型は全て文字列
  • RDDはヘッダー情報を持たない。データのみでできている
  • RDDはイミュータブル(変更できない)。変換は新しいRDDを生成することにより行われる

カラムに格納されたJSONデータを、テーブル(DataFrame)に展開する

方針

  • カラム内のJSONでDataFrameを作成する

手順

  • df.rddでRDD(Rowの配列)が返される
  • RowからRow[<カラム名>]で、カラムのデータを抽出する
  • 抽出されたデータはJSONなので、spark.read.json()でJSONを読み込んでDataFrameを作成する
    • その際、schema等、sparl.readのJSON読み込みのパラメータが同じように使える
df_to = spark.read.json(
    df_from.rdd.map(lambda x: x[<json_column>])
)

リファレンス

参考記事

関連カテゴリー記事

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com