言語
日本語
English

Caution

お使いのブラウザはJavaScriptが無効になっております。
当サイトでは検索などの処理にJavaScriptを使用しています。
より快適にご利用頂くため、JavaScriptを有効にしたうえで当サイトを閲覧することをお勧めいたします。

Node.js辞典

  1. トップページ
  2. Node.js辞典
  3. Stream(ストリーム処理)

Stream(ストリーム処理)

『Stream』はNode.jsが提供するデータを逐次的に処理する仕組みです。ファイルやネットワークから届くデータを一度にメモリに読み込まず、小さなチャンク(塊)単位で流れるように処理することで、メモリ使用量を抑えつつ大量のデータを扱えます。Node.jsのfsモジュールやHTTPサーバーなど、多くのコアモジュールがStreamをベースに作られています。

Streamの4種類

種類概要代表例
Readableデータを読み込む一方向のストリームです。fs.createReadStream()、HTTPリクエスト
Writableデータを書き込む一方向のストリームです。fs.createWriteStream()、HTTPレスポンス
Duplex読み取りと書き込みの両方ができるストリームです。TCPソケット、net.Socket
Transform読み込みながらデータを変換して書き込む双方向ストリームです。zlib.createGzip()、暗号化

主要メソッド・イベント一覧

メソッド / イベント概要
readable.on('data', fn)チャンクが届くたびに呼ばれるイベントです。
readable.on('end', fn)ストリームの読み込みが完了したときに呼ばれるイベントです。
readable.pipe(writable)ReadableをWritableに繋ぎ、データを自動的に流します。
readable.unpipe()pipe()で繋いだ接続を解除します。
writable.write(chunk)Writableストリームにデータを書き込みます。バックプレッシャー時はfalseを返します。
writable.end()書き込みの終了を通知します。
writable.on('finish', fn)書き込みがすべて完了したときに呼ばれるイベントです。
writable.on('drain', fn)バックプレッシャーが解消されたときに呼ばれるイベントです。
stream.pipeline(..., fn)複数のストリームを繋ぎ、エラー時に全ストリームをクリーンアップします(Node.js v10以降)。

Readable — データを読み込む

『Readable』ストリームはdataイベントでチャンクを受け取り、endイベントで完了を検知します。fs.createReadStream()はファイルをReadableストリームとして開きます。

readable_stream.js
var fs = require('fs');

// 極道の作戦記録ファイルをストリームで読み込む
var readable = fs.createReadStream('battle_log.txt', {
    encoding: 'utf8',
    highWaterMark: 64  // チャンクサイズを64バイトに設定する
});

var chunks = [];

// dataイベント: チャンクが届くたびに呼ばれる
readable.on('data', function(chunk) {
    chunks.push(chunk);
    process.stdout.write('[chunk] ' + chunk.length + '文字受信\n');
});

// endイベント: 読み込みが完了したときに呼ばれる
readable.on('end', function() {
    var fullText = chunks.join('');
    console.log('読み込み完了。合計:', fullText.length + '文字');
    console.log('内容:', fullText.slice(0, 30) + '...');
});

// errorイベント: エラー時に呼ばれる
readable.on('error', function(err) {
    console.error('桐生一馬: 読み込みエラーが発生した —', err.message);
});
node readable_stream.js
[chunk] 64文字受信
[chunk] 64文字受信
[chunk] 32文字受信
読み込み完了。合計: 160文字
内容: 桐生一馬の極道作戦記録 第一章...

Writable — データを書き込む

『Writable』ストリームはwrite()でデータを書き込み、end()で終了を通知します。finishイベントで書き込み完了を検知します。

writable_stream.js
var fs = require('fs');

// ログファイルへの書き込みストリームを作成する
var writable = fs.createWriteStream('yakuza_report.txt', { encoding: 'utf8' });

// 各キャラの報告を順番に書き込む
var reports = [
    '桐生一馬: 神室町のパトロール完了\n',
    '真島吾朗: 蒼天堀の状況確認済み\n',
    '春日一番: 伊勢佐木異人町の見回り終了\n',
    '錦山彰: 任務報告 — 完了\n',
    '澤村遥: 全員の安全を確認\n',
];

var index = 0;

var writeNext = function() {
    if (index >= reports.length) {
        // 全データを書き込んだら終了する
        writable.end();
        return;
    }
    // write()はバックプレッシャー時にfalseを返す
    var canContinue = writable.write(reports[index]);
    index++;
    if (canContinue) {
        writeNext(); // バッファに余裕があれば次を書き込む
    }
};

// drainイベント: バックプレッシャーが解消されたら呼ばれる
writable.on('drain', writeNext);

// finishイベント: 全書き込み完了後に呼ばれる
writable.on('finish', function() {
    console.log('全報告の書き込みが完了しました。');
});

writeNext();
node writable_stream.js
全報告の書き込みが完了しました。

pipe() — ストリームを繋ぐ

『pipe()』を使うとReadableストリームのデータをWritableストリームへ自動的に流せます。バックプレッシャーの調整も自動で行われます。ReadableがすべてのデータをWritableに渡し終えると、end()が自動的に呼ばれます。

pipe_stream.js
var fs = require('fs');

// 読み込み元と書き込み先のストリームを作成する
var readable = fs.createReadStream('yakuza_report.txt');
var writable = fs.createWriteStream('yakuza_report_backup.txt');

// pipe()でReadableとWritableを繋ぐ
// データが流れるように自動で処理される
readable.pipe(writable);

writable.on('finish', function() {
    console.log('真島吾朗: バックアップ完了。これでよし。');
});

// エラーハンドリングは各ストリームに個別に設定する
readable.on('error', function(err) {
    console.error('読み込みエラー:', err.message);
});
writable.on('error', function(err) {
    console.error('書き込みエラー:', err.message);
});
node pipe_stream.js
真島吾朗: バックアップ完了。これでよし。

バックプレッシャーの仕組み

『バックプレッシャー』とは、Writableストリームの処理速度がReadableの供給速度に追いつかない場合に発生する「詰まり」の仕組みです。write()falseを返したとき、Readableは送出を一時停止し、Writableがdrainイベントを発火したら再開することで、メモリの過負荷を防ぎます。

状態動作
write()trueを返すWritableのバッファに余裕があります。そのまま書き込みを続けられます。
write()falseを返すバッファが満杯です。Readableをpause()して送出を一時停止します。
drainイベントが発火バッファが空になりました。Readableをresume()して再開します。

pipe()はこのバックプレッシャー制御を自動で行います。手動でwrite()を呼ぶ場合は、戻り値を確認してpause()resume()drainを組み合わせて制御します。

pipeline() — 安全なストリーム連結

複数のストリームをpipe()で繋いだ場合、途中でエラーが発生しても後続ストリームが自動破棄されずリソースリークが起きる場合があります。Node.js v10以降の『stream.pipeline()』はエラー発生時にすべてのストリームを安全にクリーンアップします。

pipeline_stream.js
var fs = require('fs');
var stream = require('stream');
var zlib = require('zlib');

// pipeline()でReadable→Transform→Writableを繋ぐ
// エラーが発生すると全ストリームが自動的にクリーンアップされる
stream.pipeline(
    fs.createReadStream('yakuza_report.txt'),
    zlib.createGzip(),                           // gzip圧縮のTransformストリーム
    fs.createWriteStream('yakuza_report.txt.gz'),
    function(err) {
        if (err) {
            console.error('春日一番: パイプラインエラー —', err.message);
        } else {
            console.log('澤村遥: gzip圧縮完了。ファイルサイズが減りました!');
        }
    }
);
node pipeline_stream.js
澤村遥: gzip圧縮完了。ファイルサイズが減りました!

カスタムTransformストリーム

『stream.Transform』クラスを継承して、独自の変換処理をストリームとして実装できます。_transform(chunk, encoding, callback)メソッドに変換ロジックを書き、callback(null, 変換後データ)で次のストリームへ流します。

transform_stream.js
var stream = require('stream');
var util = require('util');

// テキストを行単位でカウントするTransformストリーム
var LineCounter = function() {
    stream.Transform.call(this, { readableObjectMode: true });
    this.lineCount = 0;
    this.remainder = '';
};
util.inherits(LineCounter, stream.Transform);

LineCounter.prototype._transform = function(chunk, encoding, callback) {
    var text = this.remainder + chunk.toString('utf8');
    var lines = text.split('\n');
    // 最後の要素は次のチャンクに持ち越す(未完了の行)
    this.remainder = lines.pop();
    this.lineCount += lines.length;
    // 各行を下流に流す
    lines.forEach(function(line) {
        this.push(line);
    }.bind(this));
    callback();
};

// ストリームの終端処理(残りのデータを出力する)
LineCounter.prototype._flush = function(callback) {
    if (this.remainder) {
        this.push(this.remainder);
        this.lineCount++;
    }
    callback();
};

// 使用例
var counter = new LineCounter();

counter.on('data', function(line) {
    // 各行が届く
});
counter.on('end', function() {
    console.log('錦山彰: 合計行数:', counter.lineCount);
});

// データを流し込む
counter.write('桐生一馬: 神室町パトロール\n');
counter.write('真島吾朗: 蒼天堀確認\n');
counter.write('春日一番: 伊勢佐木異人町見回り\n');
counter.end('澤村遥: 全員無事確認');
node transform_stream.js
錦山彰: 合計行数: 4

よくあるミス

pipe()でerrorイベントを処理していない

pipe()はソースストリームのエラーを宛先ストリームに伝播しません。そのため、Readableでエラーが発生してもWritableのerrorイベントは発火しません。各ストリームにerrorイベントを個別に設定する必要があります。

var fs = require('fs');

var readable = fs.createReadStream('not_exist.txt');
var writable = fs.createWriteStream('output.txt');

readable.pipe(writable);

readable.on('error', function(err) {
    console.error('読み込みエラー:', err.message);
    writable.destroy();
});
writable.on('error', function(err) {
    console.error('書き込みエラー:', err.message);
});

複数のストリームを繋ぐ場合はstream.pipeline()を使うと、エラー時に全ストリームが自動クリーンアップされます。

バックプレッシャーを無視してメモリリークが発生する

write()falseを返したにもかかわらずpause()せずに書き込みを続けると、Writableの内部バッファが際限なく膨らんでメモリリークが発生します。

var writable = fs.createWriteStream('output.txt');

var writeAll = function(items) {
    items.forEach(function(item) {
        var canContinue = writable.write(item);
        if (!canContinue) {
            console.warn('バックプレッシャーを検出 — 書き込みを一時停止する必要があります');
        }
    });
};

write()falseを返したときは送出をpause()で停止し、drainイベントを受けてからresume()で再開する必要があります。pipe()pipeline()はこの制御を自動で行います。

概要

Node.jsのStreamはメモリ効率よく大量のデータを処理するための基盤です。fs.readFile()はファイル全体をメモリに読み込みますが、fs.createReadStream()はチャンク単位で処理するため、数GBのファイルでも少量のメモリで扱えます。

pipe()はReadableとWritableを繋ぐシンプルな方法ですが、エラーハンドリングは各ストリームに個別に設定する必要があります。複数のストリームを組み合わせる場合は、エラー時の自動クリーンアップが行われるstream.pipeline()の使用が選択肢になります。

バックプレッシャーはNode.jsのストリームが自動で制御する重要な仕組みです。pipe()pipeline()を使う場合はこの制御が自動化されますが、write()を手動で呼ぶ場合は戻り値とdrainイベントを組み合わせてメモリ過負荷を防ぐ必要があります。

記事の間違いや著作権の侵害等ございましたらお手数ですがまでご連絡頂ければ幸いです。