新しいことにはウェルカム

技術 | 電子工作 | ガジェット | ゲーム のメモ書き

Node.jsのstream(ストリーム)の使い方メモ

データ処理を行う時、ついつい面倒くさくて、全てのデータをメモリに読み込んでから処理しがちです。

データサイズが小さい時は問題ないのですが、巨大なデータを扱う場合は、メモリを大量に消費しますし、1つのデータの読み込みや書き込みが完了するまで、他の読み書き作業はブロックされるので、時間がかかってしまいます。

そんな時は、少しづつデータを読み込こみながらその都度処理する、いわゆるストリーム処理をすると、メモリはバッファ分だけで済みますし、データの読み書きは並行実行されるので、早く処理が終わり便利です。

ストリーム処理が非同期処理と相性がいいのか、Node.jsには標準でストリーム関連の機能が入っていて、外部ライブラリでもストリーム処理ができるようになっているものが多いです。

Node.jsではストリーム処理はよく出てくるので、使い方を知っておくと何かと便利なのですが、非同期関数を扱う分、全体を読み込んでから処理する場合と比べて、多少とっつきにくいかなぁと思います。

ここでは、そんなNode.jsのstream(ストリーム)の使い方をメモしておこうと思います。

読み込みストリーム

元となるデータがあって、そこから少しづつデータを読み出す機能を提供します。

  • 読み込んだデータはdataイベントで取得します
  • データが最後に到達するとendイベントが発行されます

例)ファイル読み込み

import * as fs from "fs";

(async () => {
    await new Promise((resolve, reject) => {

        const rs = fs.createReadStream('input.txt');

        // Read Stream
        rs.on('data', (chunk) => {
            console.log(chunk.toString());
        });

        rs.on('end', () => {
            console.log("READ STRAM END");
            resolve();
        });

    });

    console.log("PROGRAM END");
})();

書き込みストリーム

データの出力先があって、そこに少しづつデータを書き出す機能を提供します。

  • write()関数でデータを書き込みストリームに書き込みます
  • 全てのデータを送り込んだ後は、end()関数でデータが終了したことを書き込みストリームに伝えます
  • 書き込みストリームがデータの出力を完了するとfinishイベントが発行されます

例)ファイル書き込み

import * as fs from "fs";

(async () => {
    await new Promise((resolve, reject) => {

        const writeData = ["data_1", "data_2", "data_3"];

        const ws = fs.createWriteStream("output.txt");

        // Write Data
        writeData.forEach((v) => {
            ws.write(v);
        });
        ws.end();

        // Write Stream
        ws.on('finish', () => {
            console.log("WRITE STRAM END");
            resolve();
        });

    });

    console.log("PROGRAM END");
})();

読み込みストリームと書き込みストリームをつなげる

読み込みストリームからデータを読み込んで、そのまま書き込みストリームに出力する場合、スクラッチで実装すると下記のようになります。

  • dataイベントで読み込みストリームからデータを受けて、write()でそのデータを書き込みストリームに書き込みます
  • endイベントで読み込みストリームがデータの最後まで到達したら、end()で書き込みストリームにデータが最後であることを伝えます
  • 全ての作業の完了は、書き込みストリームの書き込みが完了したfinishイベントを受け取った時になります

ファイルを読み込みながらファイルに書き込み(独自実装)

import * as fs from "fs";

(async () => {
    await new Promise((resolve, reject) => {

        const rs = fs.createReadStream('input.txt');
        const ws = fs.createWriteStream("output.txt");

        // Read Stream
        rs.on('data', (chunk) => {
            ws.write(chunk);
        });

        rs.on('end', () => {
            ws.end();
        });

        // Write Stream
        ws.on('finish', () => {
            console.log("WRITE STRAM END");
            resolve();
        });

    });

    console.log("PROGRAM END");
})();

パイプ

上記の読み込みストリームから書き込みストリームへの接続は、読み込みストリームのpipe()関数を使えば自動でやってくれます。

全ての作業の完了は、上記と同様、書き込みストリームの書き込みが完了したfinishイベントを受け取った時になります。

ファイルを読み込みながらファイルに書き込み(パイプ)

import * as fs from "fs";

(async () => {
    await new Promise((resolve, reject) => {

        const rs = fs.createReadStream('input.txt');
        const ws = fs.createWriteStream("output.txt");

        rs.pipe(ws);

        // Write Stream
        ws.on('finish', () => {
            console.log("WRITE STRAM END");
            resolve();
        });
    });

    console.log("PROGRAM END");
})();

Duplex・Tranformストリーム

読み込み・書き込みの両方の機能を持ったストリームです。

データ圧縮・展開するglibなど、主にデータ変換などを行うストリームで使われています。

ちょっとややこしいのですが、変換するデータを「書き込みストリーム」で書き込み、変換されたデータを「読み込みストリーム」から取得します。

後述するpipe()の中間ストリームで使用されることが多く、それ単独で使われることはあまりないのですが、使うとしたら下記のようになります。

  • 書き込みストリームへの入力データ書き込みで始まり、読み込みストリームから結果データを取り出し、読み込みストリームからの読み込み完了で終了します
  • 書き込みへのデータ送信の終了は、書き込みストリームのend()関数で伝えます
  • 全ての作業の完了は、読み込みストリームが終端に到達した、endイベントを受け取った時になります

例)データをbase64に変換する

import { Base64Encode } from 'base64-stream';

(async () => {
    await new Promise((resolve, reject) => {

        const base64 = new Base64Encode;

        // Write Data
        base64.write("data");
        base64.end();

        // Read Stream
        base64.on('data', (chunk) => {
            console.log(chunk);
        });

        base64.on('end', () => {
            console.log("READ STRAM END");
            resolve();
        });

    });

    console.log("PROGRAM END");
})();

パイプでDuplex・Tranformストリームを連結させる

読み込みストリームのpipe()関数は、引数に渡したストリームを戻り値として返します。

pipe()にDuplex・Tranformストリームを渡すと、Duplex・Tranformストリームが返るので、引き続きpipe()を呼び出して、次のストリームにつなげることができます。

前述の通り、Duplex・Tranformストリームは、pipe()を使って、ストリームの間につないで使うことが多いです。

例)ファイルを圧縮して保存する

import * as fs from "fs";
import * as zlib from "zlib";

(async () => {
    await new Promise((resolve, reject) => {

        const rs = fs.createReadStream('input.txt');
        const ws = fs.createWriteStream("input.txt.gz");
        const gzip = zlib.createGzip();

        rs.pipe(gzip).pipe(ws);

        // Write Stream
        ws.on('finish', () => {
            console.log("WRITE STRAM END");
            resolve();
        });
    });

    console.log("PROGRAM END");
})();

Duplex・Tranformストリームはpipe()でどんどん後につなげていくことができます。

全ての処理の完了は、一番最後の書き込みストリームのfinishイベントで検知します。

例)ファイルをbase64に変換して圧縮して保存する

import * as fs from "fs";
import * as zlib from "zlib";
import { Base64Encode } from 'base64-stream';

(async () => {
    await new Promise((resolve, reject) => {

        const rs = fs.createReadStream('input.txt');
        const ws = fs.createWriteStream("output.txt.gz");
        const base64 = new Base64Encode;
        const gzip = zlib.createGzip();

        rs.pipe(base64).pipe(gzip).pipe(ws);

        // Write Stream
        ws.on('finish', () => {
            console.log("WRITE STRAM END");
            resolve();
        });
    });

    console.log("PROGRAM END");
})();

メモリのデータをストリームにする

メモリのデータを書き込みストリームに渡したい時があります。

write()で書き込みストリームに直接書き込んでもいいのですが、任意のデータから読み込みストリームを作成することができるので、それをpipe()で書き込みストリームに渡すこともできます。

任意のデータの読み込みストリームを作成するには下記の手順で行います

  1. まず空の読み込みストリームを作成し
  2. そのストリームにpush()で流すデータを書き込みます
  3. 流すデータが無くなって終了する場合は、push()nullを書き込むことにより伝えます

例)メモリのデータをストリームでファイルに出力する

import * as fs from "fs";
import { Readable } from 'stream';

(async () => {
    await new Promise((resolve, reject) => {

        const rs = new Readable();
        const ws = fs.createWriteStream("output.txt");

        rs.pipe(ws);

        // create read stream data
        rs.push("data");
        rs.push(null);

        ws.on('finish', () => {
            console.log("WRITE STRAM END");
            resolve();
        });
    });

    console.log("PROGRAM END");
})();

感想など

独自の入力元や出力先、変換を行う、オリジナルのストリームを作成することもできるのですが、難易度高めなので、有り物のストリームだけを使ってやりくりするのが無難かなぁと思います。

ストリームはコールバック関数の使用が必須なのですが、async・await全盛の昨今、ちょっと古臭い感じがしますね。

関連カテゴリー記事

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com

www.kwbtblog.com