データ処理を行う時、ついつい面倒くさくて、全てのデータをメモリに読み込んでから処理しがちです。
データサイズが小さい時は問題ないのですが、巨大なデータを扱う場合は、メモリを大量に消費しますし、1つのデータの読み込みや書き込みが完了するまで、他の読み書き作業はブロックされるので、時間がかかってしまいます。
そんな時は、少しづつデータを読み込こみながらその都度処理する、いわゆるストリーム処理をすると、メモリはバッファ分だけで済みますし、データの読み書きは並行実行されるので、早く処理が終わり便利です。
ストリーム処理が非同期処理と相性がいいのか、Node.jsには標準でストリーム関連の機能が入っていて、外部ライブラリでもストリーム処理ができるようになっているものが多いです。
Node.jsではストリーム処理はよく出てくるので、使い方を知っておくと何かと便利なのですが、非同期関数を扱う分、全体を読み込んでから処理する場合と比べて、多少とっつきにくいかなぁと思います。
ここでは、そんなNode.jsのstream(ストリーム)の使い方をメモしておこうと思います。
読み込みストリーム
元となるデータがあって、そこから少しづつデータを読み出す機能を提供します。
- 読み込んだデータは
data
イベントで取得します - データが最後に到達すると
end
イベントが発行されます
例)ファイル読み込み
import * as fs from "fs"; (async () => { await new Promise((resolve, reject) => { const rs = fs.createReadStream('input.txt'); // Read Stream rs.on('data', (chunk) => { console.log(chunk.toString()); }); rs.on('end', () => { console.log("READ STRAM END"); resolve(); }); }); console.log("PROGRAM END"); })();
書き込みストリーム
データの出力先があって、そこに少しづつデータを書き出す機能を提供します。
write()
関数でデータを書き込みストリームに書き込みます- 全てのデータを送り込んだ後は、
end()
関数でデータが終了したことを書き込みストリームに伝えます - 書き込みストリームがデータの出力を完了すると
finish
イベントが発行されます
例)ファイル書き込み
import * as fs from "fs"; (async () => { await new Promise((resolve, reject) => { const writeData = ["data_1", "data_2", "data_3"]; const ws = fs.createWriteStream("output.txt"); // Write Data writeData.forEach((v) => { ws.write(v); }); ws.end(); // Write Stream ws.on('finish', () => { console.log("WRITE STRAM END"); resolve(); }); }); console.log("PROGRAM END"); })();
読み込みストリームと書き込みストリームをつなげる
読み込みストリームからデータを読み込んで、そのまま書き込みストリームに出力する場合、スクラッチで実装すると下記のようになります。
data
イベントで読み込みストリームからデータを受けて、write()
でそのデータを書き込みストリームに書き込みますend
イベントで読み込みストリームがデータの最後まで到達したら、end()
で書き込みストリームにデータが最後であることを伝えます- 全ての作業の完了は、書き込みストリームの書き込みが完了した
finish
イベントを受け取った時になります
ファイルを読み込みながらファイルに書き込み(独自実装)
import * as fs from "fs"; (async () => { await new Promise((resolve, reject) => { const rs = fs.createReadStream('input.txt'); const ws = fs.createWriteStream("output.txt"); // Read Stream rs.on('data', (chunk) => { ws.write(chunk); }); rs.on('end', () => { ws.end(); }); // Write Stream ws.on('finish', () => { console.log("WRITE STRAM END"); resolve(); }); }); console.log("PROGRAM END"); })();
パイプ
上記の読み込みストリームから書き込みストリームへの接続は、読み込みストリームのpipe()
関数を使えば自動でやってくれます。
全ての作業の完了は、上記と同様、書き込みストリームの書き込みが完了したfinish
イベントを受け取った時になります。
ファイルを読み込みながらファイルに書き込み(パイプ)
import * as fs from "fs"; (async () => { await new Promise((resolve, reject) => { const rs = fs.createReadStream('input.txt'); const ws = fs.createWriteStream("output.txt"); rs.pipe(ws); // Write Stream ws.on('finish', () => { console.log("WRITE STRAM END"); resolve(); }); }); console.log("PROGRAM END"); })();
Duplex・Tranformストリーム
読み込み・書き込みの両方の機能を持ったストリームです。
データ圧縮・展開するglib
など、主にデータ変換などを行うストリームで使われています。
ちょっとややこしいのですが、変換するデータを「書き込みストリーム」で書き込み、変換されたデータを「読み込みストリーム」から取得します。
後述するpipe()
の中間ストリームで使用されることが多く、それ単独で使われることはあまりないのですが、使うとしたら下記のようになります。
- 書き込みストリームへの入力データ書き込みで始まり、読み込みストリームから結果データを取り出し、読み込みストリームからの読み込み完了で終了します
- 書き込みへのデータ送信の終了は、書き込みストリームの
end()
関数で伝えます - 全ての作業の完了は、読み込みストリームが終端に到達した、
end
イベントを受け取った時になります
例)データをbase64に変換する
import { Base64Encode } from 'base64-stream'; (async () => { await new Promise((resolve, reject) => { const base64 = new Base64Encode; // Write Data base64.write("data"); base64.end(); // Read Stream base64.on('data', (chunk) => { console.log(chunk); }); base64.on('end', () => { console.log("READ STRAM END"); resolve(); }); }); console.log("PROGRAM END"); })();
パイプでDuplex・Tranformストリームを連結させる
読み込みストリームのpipe()
関数は、引数に渡したストリームを戻り値として返します。
pipe()
にDuplex・Tranformストリームを渡すと、Duplex・Tranformストリームが返るので、引き続きpipe()
を呼び出して、次のストリームにつなげることができます。
前述の通り、Duplex・Tranformストリームは、pipe()
を使って、ストリームの間につないで使うことが多いです。
例)ファイルを圧縮して保存する
import * as fs from "fs"; import * as zlib from "zlib"; (async () => { await new Promise((resolve, reject) => { const rs = fs.createReadStream('input.txt'); const ws = fs.createWriteStream("input.txt.gz"); const gzip = zlib.createGzip(); rs.pipe(gzip).pipe(ws); // Write Stream ws.on('finish', () => { console.log("WRITE STRAM END"); resolve(); }); }); console.log("PROGRAM END"); })();
Duplex・Tranformストリームはpipe()
でどんどん後につなげていくことができます。
全ての処理の完了は、一番最後の書き込みストリームのfinish
イベントで検知します。
例)ファイルをbase64に変換して圧縮して保存する
import * as fs from "fs"; import * as zlib from "zlib"; import { Base64Encode } from 'base64-stream'; (async () => { await new Promise((resolve, reject) => { const rs = fs.createReadStream('input.txt'); const ws = fs.createWriteStream("output.txt.gz"); const base64 = new Base64Encode; const gzip = zlib.createGzip(); rs.pipe(base64).pipe(gzip).pipe(ws); // Write Stream ws.on('finish', () => { console.log("WRITE STRAM END"); resolve(); }); }); console.log("PROGRAM END"); })();
メモリのデータをストリームにする
メモリのデータを書き込みストリームに渡したい時があります。
write()
で書き込みストリームに直接書き込んでもいいのですが、任意のデータから読み込みストリームを作成することができるので、それをpipe()
で書き込みストリームに渡すこともできます。
任意のデータの読み込みストリームを作成するには下記の手順で行います
- まず空の読み込みストリームを作成し
- そのストリームに
push()
で流すデータを書き込みます - 流すデータが無くなって終了する場合は、
push()
にnull
を書き込むことにより伝えます
例)メモリのデータをストリームでファイルに出力する
import * as fs from "fs"; import { Readable } from 'stream'; (async () => { await new Promise((resolve, reject) => { const rs = new Readable(); const ws = fs.createWriteStream("output.txt"); rs.pipe(ws); // create read stream data rs.push("data"); rs.push(null); ws.on('finish', () => { console.log("WRITE STRAM END"); resolve(); }); }); console.log("PROGRAM END"); })();
感想など
独自の入力元や出力先、変換を行う、オリジナルのストリームを作成することもできるのですが、難易度高めなので、有り物のストリームだけを使ってやりくりするのが無難かなぁと思います。
ストリームはコールバック関数の使用が必須なのですが、async・await全盛の昨今、ちょっと古臭い感じがしますね。