新しいことにはウェルカム

技術 | 電子工作 | ガジェット | ゲーム のメモ書き

Node.jsのstream(ストリーム)の使い方メモ

データ処理を行う時、ついつい面倒くさくて、全てのデータをメモリに読み込んでから処理しがちです。

データサイズが小さい時は問題ないのですが、巨大なデータを扱う場合は、メモリを大量に消費しますし、1つのデータの読み込みや書き込みが完了するまで、他の読み書き作業はブロックされるので、時間がかかってしまいます。

そんな時は、少しづつデータを読み込こみながらその都度処理する、いわゆるストリーム処理をすると、メモリはバッファ分だけで済みますし、データの読み書きは並行実行されるので、早く処理が終わり便利です。

ストリーム処理が非同期処理と相性がいいのか、Node.jsには標準でストリーム関連の機能が入っていて、外部ライブラリでもストリーム処理ができるようになっているものが多いです。

Node.jsではストリーム処理はよく出てくるので、使い方を知っておくと何かと便利なのですが、非同期関数を扱う分、全体を読み込んでから処理する場合と比べて、多少とっつきにくいかなぁと思います。

ここでは、そんなNode.jsのstream(ストリーム)の使い方をメモしておこうと思います。

読み込みストリーム

元となるデータがあって、そこから少しづつデータを読み出す機能を提供します。

  • 読み込んだデータはdataイベントで取得します
  • データが最後に到達するとendイベントが発行されます

例)ファイル読み込み

import * as fs from "fs";

(async () => {
    await new Promise((resolve, reject) => {

        const rs = fs.createReadStream('input.txt');

        // Read Stream
        rs.on('data', (chunk) => {
            console.log(chunk.toString());
        });

        rs.on('end', () => {
            console.log("READ STRAM END");
            resolve();
        });

    });

    console.log("PROGRAM END");
})();

書き込みストリーム

データの出力先があって、そこに少しづつデータを書き出す機能を提供します。

  • write()関数でデータを書き込みストリームに書き込みます
  • 全てのデータを送り込んだ後は、end()関数でデータが終了したことを書き込みストリームに伝えます
  • 書き込みストリームがデータの出力を完了するとfinishイベントが発行されます

例)ファイル書き込み

import * as fs from "fs";

(async () => {
    await new Promise((resolve, reject) => {

        const writeData = ["data_1", "data_2", "data_3"];

        const ws = fs.createWriteStream("output.txt");

        // Write Data
        writeData.forEach((v) => {
            ws.write(v);
        });
        ws.end();

        // Write Stream
        ws.on('finish', () => {
            console.log("WRITE STRAM END");
            resolve();
        });

    });

    console.log("PROGRAM END");
})();

読み込みストリームと書き込みストリームをつなげる

読み込みストリームからデータを読み込んで、そのまま書き込みストリームに出力する場合、スクラッチで実装すると下記のようになります。

  • dataイベントで読み込みストリームからデータを受けて、write()でそのデータを書き込みストリームに書き込みます
  • endイベントで読み込みストリームがデータの最後まで到達したら、end()で書き込みストリームにデータが最後であることを伝えます
  • 全ての作業の完了は、書き込みストリームの書き込みが完了したfinishイベントを受け取った時になります

ファイルを読み込みながらファイルに書き込み(独自実装)

import * as fs from "fs";

(async () => {
    await new Promise((resolve, reject) => {

        const rs = fs.createReadStream('input.txt');
        const ws = fs.createWriteStream("output.txt");

        // Read Stream
        rs.on('data', (chunk) => {
            ws.write(chunk);
        });

        rs.on('end', () => {
            ws.end();
        });

        // Write Stream
        ws.on('finish', () => {
            console.log("WRITE STRAM END");
            resolve();
        });

    });

    console.log("PROGRAM END");
})();

パイプ

上記の読み込みストリームから書き込みストリームへの接続は、読み込みストリームのpipe()関数を使えば自動でやってくれます。

全ての作業の完了は、上記と同様、書き込みストリームの書き込みが完了したfinishイベントを受け取った時になります。

ファイルを読み込みながらファイルに書き込み(パイプ)

import * as fs from "fs";

(async () => {
    await new Promise((resolve, reject) => {

        const rs = fs.createReadStream('input.txt');
        const ws = fs.createWriteStream("output.txt");

        rs.pipe(ws);

        // Write Stream
        ws.on('finish', () => {
            console.log("WRITE STRAM END");
            resolve();
        });
    });

    console.log("PROGRAM END");
})();

Duplex・Tranformストリーム

読み込み・書き込みの両方の機能を持ったストリームです。

データ圧縮・展開するglibなど、主にデータ変換などを行うストリームで使われています。

ちょっとややこしいのですが、変換するデータを「書き込みストリーム」で書き込み、変換されたデータを「読み込みストリーム」から取得します。

後述するpipe()の中間ストリームで使用されることが多く、それ単独で使われることはあまりないのですが、使うとしたら下記のようになります。

  • 書き込みストリームへの入力データ書き込みで始まり、読み込みストリームから結果データを取り出し、読み込みストリームからの読み込み完了で終了します
  • 書き込みへのデータ送信の終了は、書き込みストリームのend()関数で伝えます
  • 全ての作業の完了は、読み込みストリームが終端に到達した、endイベントを受け取った時になります

例)データをbase64に変換する

import { Base64Encode } from 'base64-stream';

(async () => {
    await new Promise((resolve, reject) => {

        const base64 = new Base64Encode;

        // Write Data
        base64.write("data");
        base64.end();

        // Read Stream
        base64.on('data', (chunk) => {
            console.log(chunk);
        });

        base64.on('end', () => {
            console.log("READ STRAM END");
            resolve();
        });

    });

    console.log("PROGRAM END");
})();

パイプでDuplex・Tranformストリームを連結させる

読み込みストリームのpipe()関数は、引数に渡したストリームを戻り値として返します。

pipe()にDuplex・Tranformストリームを渡すと、Duplex・Tranformストリームが返るので、引き続きpipe()を呼び出して、次のストリームにつなげることができます。

前述の通り、Duplex・Tranformストリームは、pipe()を使って、ストリームの間につないで使うことが多いです。

例)ファイルを圧縮して保存する

import * as fs from "fs";
import * as zlib from "zlib";

(async () => {
    await new Promise((resolve, reject) => {

        const rs = fs.createReadStream('input.txt');
        const ws = fs.createWriteStream("input.txt.gz");
        const gzip = zlib.createGzip();

        rs.pipe(gzip).pipe(ws);

        // Write Stream
        ws.on('finish', () => {
            console.log("WRITE STRAM END");
            resolve();
        });
    });

    console.log("PROGRAM END");
})();

Duplex・Tranformストリームはpipe()でどんどん後につなげていくことができます。

全ての処理の完了は、一番最後の書き込みストリームのfinishイベントで検知します。

例)ファイルをbase64に変換して圧縮して保存する

import * as fs from "fs";
import * as zlib from "zlib";
import { Base64Encode } from 'base64-stream';

(async () => {
    await new Promise((resolve, reject) => {

        const rs = fs.createReadStream('input.txt');
        const ws = fs.createWriteStream("output.txt.gz");
        const base64 = new Base64Encode;
        const gzip = zlib.createGzip();

        rs.pipe(base64).pipe(gzip).pipe(ws);

        // Write Stream
        ws.on('finish', () => {
            console.log("WRITE STRAM END");
            resolve();
        });
    });

    console.log("PROGRAM END");
})();

メモリのデータをストリームにする

メモリのデータを書き込みストリームに渡したい時があります。

write()で書き込みストリームに直接書き込んでもいいのですが、任意のデータから読み込みストリームを作成することができるので、それをpipe()で書き込みストリームに渡すこともできます。

任意のデータの読み込みストリームを作成するには下記の手順で行います

  1. まず空の読み込みストリームを作成し
  2. そのストリームにpush()で流すデータを書き込みます
  3. 流すデータが無くなって終了する場合は、push()nullを書き込むことにより伝えます

例)メモリのデータをストリームでファイルに出力する

import * as fs from "fs";
import { Readable } from 'stream';

(async () => {
    await new Promise((resolve, reject) => {

        const rs = new Readable();
        const ws = fs.createWriteStream("output.txt");

        rs.pipe(ws);

        // create read stream data
        rs.push("data");
        rs.push(null);

        ws.on('finish', () => {
            console.log("WRITE STRAM END");
            resolve();
        });
    });

    console.log("PROGRAM END");
})();

感想など

独自の入力元や出力先、変換を行う、オリジナルのストリームを作成することもできるのですが、難易度高めなので、有り物のストリームだけを使ってやりくりするのが無難かなぁと思います。

ストリームはコールバック関数の使用が必須なのですが、async・await全盛の昨今、ちょっと古臭い感じがしますね。