Caution
お使いのブラウザはJavaScriptが実行できない状態になっております。
当サイトはWebプログラミングの情報サイトの為、
JavaScriptが実行できない環境では正しいコンテンツが提供出来ません。
JavaScriptが実行可能な状態でご閲覧頂くようお願い申し上げます。
std::sync::mpsc チャンネル
Rustのスレッド間通信には『std::sync::mpsc』(multiple producer, single consumer)チャンネルを使います。送信側(Sender)から値を送り、受信側(Receiver)で受け取るメッセージパッシングのパターンです。
構文
use std::sync::mpsc;
use std::thread;
// チャンネルを作成します(tx: 送信者、rx: 受信者)。
let (tx, rx) = mpsc::channel();
// 送信します(tx は Clone 可能なので複数スレッドから送れます)。
tx.send(value).unwrap();
// 受信します(ブロックして次のメッセージを待ちます)。
let received = rx.recv().unwrap();
// 非ブロッキング受信(メッセージがなければすぐ Err を返します)。
match rx.try_recv() {
Ok(val) => println!("{}", val),
Err(_) => println!("まだメッセージなし"),
}
// 全メッセージを受信します(送信者が全ドロップすると終了します)。
for msg in rx {
println!("{}", msg);
}
メソッド一覧
| メソッド | 概要 |
|---|---|
| mpsc::channel() | 非同期チャンネルを作成します。(Sender<T>, Receiver<T>) を返します。 |
| mpsc::sync_channel(n) | 容量 n の同期チャンネルを作成します。バッファが満杯になると送信をブロックします。 |
| tx.send(val) | 値を送信します。受信者がドロップ済みなら Err を返します。 |
| tx.clone() | 送信者をクローンします(複数スレッドから送信できます)。 |
| rx.recv() | 値が届くまでブロックして受信します。全送信者がドロップしたら Err を返します。 |
| rx.try_recv() | すぐに受信を試みます。メッセージがなければ即座に Err を返します(ブロックなし)。 |
| rx.recv_timeout(dur) | タイムアウト付きで受信します。 |
| for msg in rx | 送信者が全て閉じるまで繰り返し受信します。 |
サンプルコード
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
// --- 基本的な送受信 ---
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
tx.send(String::from("スレッドからこんにちは!")).unwrap();
});
let msg = rx.recv().unwrap();
println!("受信: {}", msg);
// --- 複数メッセージの送受信 ---
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let messages = vec!["ping", "pong", "done"];
for msg in messages {
tx.send(msg).unwrap();
thread::sleep(Duration::from_millis(10));
}
// tx がドロップされると rx のイテレーションが終了します。
});
for msg in rx {
println!("受信: {}", msg);
}
// --- 複数の送信者(mpsc の "multiple producer")---
let (tx, rx) = mpsc::channel();
let mut handles = vec![];
for i in 0..3 {
let tx_clone = tx.clone(); // 送信者をクローンします。
let h = thread::spawn(move || {
tx_clone.send(format!("送信者{}: データ", i)).unwrap();
});
handles.push(h);
}
drop(tx); // 元の tx を閉じます(rx のイテレーション終了のため)。
for msg in rx {
println!("{}", msg);
}
for h in handles {
h.join().unwrap();
}
// --- try_recv で非ブロッキング受信 ---
let (tx, rx) = mpsc::channel::<i32>();
thread::spawn(move || {
thread::sleep(Duration::from_millis(50));
tx.send(42).unwrap();
});
loop {
match rx.try_recv() {
Ok(val) => { println!("受信: {}", val); break; }
Err(mpsc::TryRecvError::Empty) => {
println!("まだ届いていません…");
thread::sleep(Duration::from_millis(20));
}
Err(mpsc::TryRecvError::Disconnected) => break,
}
}
}
概要
『mpsc::channel()』は「複数送信者・単一受信者」のチャンネルを作ります。送信者(『Sender』)は『clone()』できるため複数スレッドから送信できますが、受信者(『Receiver』)はクローンできません(single consumer)。
全ての『Sender』がドロップされると、『rx.recv()』は『Err』を返し、『for msg in rx』ループが終了します。これを利用してワーカースレッドの終了を検知できます。
共有メモリでのスレッド間通信はMutex<T> / RwLock<T>を、スレッドの生成と待機はstd::thread::spawn() / join()を参照してください。
記事の間違いや著作権の侵害等ございましたらお手数ですがこちらまでご連絡頂ければ幸いです。