worker_threads(マルチスレッド)
『worker_threads』モジュールは、Node.jsでマルチスレッド処理を実現するための標準モジュールです。Node.jsはイベントループがシングルスレッドで動作するため、CPUを長時間占有する重い計算はメインスレッドをブロックします。Worker Threadsを使うと、そのような処理を別スレッドにオフロードできます。
基本構文
var worker_threads = require('worker_threads');
var Worker = worker_threads.Worker;
var isMainThread = worker_threads.isMainThread;
var parentPort = worker_threads.parentPort;
var workerData = worker_threads.workerData;
if (isMainThread) {
// メインスレッド: ワーカーを生成する
var worker = new Worker(__filename, {
workerData: { /* ワーカーに渡すデータ */ }
});
worker.on('message', function(result) { /* 結果を受け取る */ });
worker.on('error', function(err) { /* エラー処理 */ });
} else {
// ワーカースレッド: 処理を実行する
var result = heavyComputation(workerData);
parentPort.postMessage(result);
}
主要API一覧
| API | 概要 |
|---|---|
isMainThread | 現在のコードがメインスレッドで動いているかどうかを返します。 |
workerData | メインスレッドからWorkerコンストラクタのworkerDataオプションで渡されたデータです。ワーカースレッドのみで参照できます。 |
parentPort | ワーカースレッドからメインスレッドへメッセージを送るためのポートです。postMessage()で送信します。 |
new Worker(filename, options) | 新しいワーカースレッドを生成します。filenameに実行するスクリプトのパスを指定します。 |
worker.postMessage(value) | メインスレッドからワーカースレッドへメッセージを送信します。 |
worker.on('message', cb) | ワーカースレッドからのメッセージを受け取るイベントです。 |
worker.terminate() | ワーカースレッドを強制終了します。Promiseを返します。 |
MessageChannel | ワーカースレッド同士が直接通信するためのチャンネルです。port1とport2のペアを作成します。 |
SharedArrayBuffer | スレッド間でメモリを共有するためのバッファです。コピーなしでデータを参照できます。 |
Atomics | SharedArrayBufferへの操作をアトミック(不可分)に行うためのオブジェクトです。競合状態を防ぎます。 |
CPU負荷の高い処理のオフロード
フィボナッチ数列の計算など、CPUを長時間占有する処理をワーカースレッドに移すことで、メインスレッドのイベントループをブロックせずに済みます。
worker_heavy.js
var worker_threads = require('worker_threads');
var Worker = worker_threads.Worker;
var isMainThread = worker_threads.isMainThread;
var parentPort = worker_threads.parentPort;
var workerData = worker_threads.workerData;
// --- ワーカーとメインスレッドを同じファイルに書く ---
if (isMainThread) {
// メインスレッドの処理
console.log('メインスレッド PID: ' + process.pid);
// ワーカーを起動してフィボナッチ計算を依頼する(虎杖悠仁の体に宿儺を宿すように)
var worker = new Worker(__filename, {
workerData: { n: 40 } // 計算するフィボナッチ数の項
});
// ワーカーから結果を受け取る
worker.on('message', function(result) {
console.log('計算結果 fib(40) = ' + result);
});
worker.on('error', function(err) {
console.error('ワーカーエラー:', err);
});
worker.on('exit', function(code) {
console.log('ワーカー終了 コード: ' + code);
});
// ワーカーが処理中でもメインスレッドは動き続ける
var interval = setInterval(function() {
process.stdout.write('.');
}, 100);
worker.on('exit', function() {
clearInterval(interval);
console.log('');
});
} else {
// ワーカースレッドの処理: 重い計算を行う
function fib(n) {
if (n <= 1) return n;
return fib(n - 1) + fib(n - 2);
}
var result = fib(workerData.n);
// 計算結果をメインスレッドへ送信する
parentPort.postMessage(result);
}
node worker_heavy.js メインスレッド PID: 34567 .................... 計算結果 fib(40) = 102334155 ワーカー終了 コード: 0
worker.postMessage — 双方向通信
メインスレッドからワーカーへ複数のタスクを送り、ワーカーが1つずつ処理して結果を返すパターンです。
bidirectional.js
var worker_threads = require('worker_threads');
var Worker = worker_threads.Worker;
var isMainThread = worker_threads.isMainThread;
var parentPort = worker_threads.parentPort;
if (isMainThread) {
var worker = new Worker(__filename);
// ワーカーからのメッセージを受け取る
worker.on('message', function(msg) {
console.log('結果受信: ' + msg.input + ' の二乗 = ' + msg.result);
});
// 複数のタスクを送信する(伏黒恵が式神を次々と召喚するように)
var tasks = [5, 10, 15, 20, 25];
tasks.forEach(function(n) {
worker.postMessage({ input: n });
});
// 全タスク送信後にワーカーを終了させるシグナルを送る
setTimeout(function() {
worker.postMessage({ done: true });
}, 1000);
} else {
// ワーカースレッド: メインスレッドからのメッセージを待ち受ける
parentPort.on('message', function(msg) {
if (msg.done) {
process.exit(0);
}
// 受け取った数値を二乗して返す
var result = msg.input * msg.input;
parentPort.postMessage({ input: msg.input, result: result });
});
}
node bidirectional.js 結果受信: 5 の二乗 = 25 結果受信: 10 の二乗 = 100 結果受信: 15 の二乗 = 225 結果受信: 20 の二乗 = 400 結果受信: 25 の二乗 = 625
SharedArrayBuffer — メモリの共有
通常のメッセージ送受信はデータをコピーして渡します。『SharedArrayBuffer』を使うと、メインスレッドとワーカースレッドが同じメモリ領域を参照できます。コピーのオーバーヘッドがなく、大量データの処理に向いています。競合を避けるために『Atomics』と組み合わせて使います。
shared_buffer.js
var worker_threads = require('worker_threads');
var Worker = worker_threads.Worker;
var isMainThread = worker_threads.isMainThread;
var parentPort = worker_threads.parentPort;
var workerData = worker_threads.workerData;
if (isMainThread) {
// 共有メモリを確保する(4バイト = Int32 1要素分)
var sharedBuffer = new SharedArrayBuffer(4);
var sharedArray = new Int32Array(sharedBuffer);
// 初期値を設定する(釘崎野薔薇の呪力ゲージ:初期値100)
sharedArray[0] = 100;
console.log('初期値: ' + sharedArray[0]);
// ワーカーに SharedArrayBuffer を渡す(参照渡し: コピーではない)
var worker = new Worker(__filename, {
workerData: { sharedBuffer: sharedBuffer }
});
worker.on('message', function(msg) {
if (msg === 'done') {
// ワーカーが書き込んだ値をメインスレッドから直接読める
console.log('ワーカー処理後の値: ' + sharedArray[0]);
}
});
} else {
// ワーカースレッドの処理
var sharedArray = new Int32Array(workerData.sharedBuffer);
// Atomics.add でアトミックに値を加算する(競合状態を防ぐ)
Atomics.add(sharedArray, 0, 50); // 100 + 50 = 150
console.log('ワーカーで加算後: ' + sharedArray[0]);
parentPort.postMessage('done');
}
node shared_buffer.js 初期値: 100 ワーカーで加算後: 150 ワーカー処理後の値: 150
よくあるミス
SharedArrayBuffer以外のオブジェクトをpostMessage()でそのまま渡せると思っている
通常のオブジェクトはpostMessage()でシリアライズ(構造化複製)されてコピーが渡されます。関数や循環参照を含むオブジェクトはシリアライズできないため渡せません。スレッド間でメモリを共有したい場合はSharedArrayBufferを使います。
var worker_threads = require('worker_threads');
var Worker = worker_threads.Worker;
var isMainThread = worker_threads.isMainThread;
var parentPort = worker_threads.parentPort;
var workerData = worker_threads.workerData;
if (isMainThread) {
var worker = new Worker(__filename, { workerData: {} });
// NG: 関数はpostMessage()で渡せない
try {
worker.postMessage({ fn: function() {} });
} catch (err) {
console.error('虎杖悠仁: 関数は渡せません —', err.message);
}
// OK: SharedArrayBufferでメモリ共有する
var sharedBuf = new SharedArrayBuffer(4);
worker.postMessage({ sharedBuf: sharedBuf });
} else {
parentPort.on('message', function(msg) {
if (msg.sharedBuf) {
console.log('ワーカー: SharedArrayBufferを受け取った');
}
});
}
workerのerrorイベントを処理しない
ワーカースレッド内でキャッチされなかった例外は、メインスレッドのworker.on('error', ...)イベントとして発火します。このイベントにハンドラを設定していない場合、例外がプロセス全体をクラッシュさせます。
var worker_threads = require('worker_threads');
var Worker = worker_threads.Worker;
var isMainThread = worker_threads.isMainThread;
var parentPort = worker_threads.parentPort;
if (isMainThread) {
var worker = new Worker(__filename);
// errorイベントを必ず設定する
worker.on('error', function(err) {
console.error('伏黒恵: ワーカーエラーを補足 —', err.message);
});
worker.on('exit', function(code) {
if (code !== 0) {
console.error('ワーカーが異常終了 コード:', code);
}
});
} else {
// ワーカー内で未キャッチの例外を発生させる
throw new Error('ワーカー内部エラー');
}
概要
Worker Threadsはclusterモジュールとは異なり、プロセスではなくスレッドを生成します。スレッドはプロセスより起動コストが低く、SharedArrayBufferによる真のメモリ共有が可能です。一方で、同一スレッド内でのメモリ共有は競合状態(レースコンディション)を引き起こす可能性があるため、Atomicsの適切な利用が重要です。
Worker Threadsが向いているのは、画像処理・暗号化・大量データの変換など、CPUを長時間使い続ける処理です。I/Oバウンドな処理(ファイル読み込み・HTTPリクエストなど)はNode.jsの非同期I/Oで十分対応できるため、Worker Threadsを使う必要はありません。
同じファイルにisMainThreadで分岐を書くパターンと、別ファイルとして切り出すパターンのどちらも使えます。コードが複雑になる場合は別ファイルへの切り出しを選ぶと読みやすくなります。
記事の間違いや著作権の侵害等ございましたらお手数ですがこちらまでご連絡頂ければ幸いです。