CompletableFuture.supplyAsync() / thenApply() / join()
| 対応: | Java 8(2014) |
|---|
非同期処理を宣言的に記述する仕組みです(Java 8以降)。複数の非同期タスクを連結・合成したり、エラーハンドリングを一連のチェーンで書いたりできます。
構文
『supplyAsync()』は非同期で処理を実行し結果を返します(Supplier)。『runAsync()』は非同期で処理を実行しますが結果は返しません(Runnable)。
import java.util.concurrent.*;
CompletableFuture<T> cf = CompletableFuture.supplyAsync(() -> /* T型の値を返す処理 */);
CompletableFuture<Void> cf = CompletableFuture.runAsync(() -> { /* 処理 */ });
// 結果を変換して次の処理に渡す(Functionを受け取る)
cf.thenApply(result -> /* 変換処理 */);
// 結果を受け取って別のCompletableFutureを返す
cf.thenCompose(result -> /* CompletableFutureを返す処理 */);
// 結果を消費する(戻り値なし)
cf.thenAccept(result -> { /* 処理 */ });
// 例外が発生した場合のデフォルト値を返す
cf.exceptionally(ex -> /* デフォルト値 */);
// 結果またはエラーの両方を処理する
cf.handle((result, ex) -> /* 処理 */);
// 結果が返るまでブロックして値を取得する
T value = cf.join();
T value = cf.get(); // InterruptedException, ExecutionException が発生する
主なメソッド一覧
| メソッド | 概要 |
|---|---|
| supplyAsync(Supplier) | 非同期でサプライヤーを実行し、戻り値を持つ CompletableFuture を返します。『Supplier』は値を返す関数、『Runnable』は値を返さない関数を表す関数型インターフェースです。 |
| runAsync(Runnable) | 非同期でRunnableを実行します。戻り値は CompletableFuture<Void> です。 |
| thenApply(Function) | 完了した結果を受け取り変換して次のステージに渡します。 |
| thenAccept(Consumer) | 完了した結果を受け取って処理します。次のステージには Void を渡します。 |
| thenCompose(Function) | 結果を受け取り新たな CompletableFuture を返してフラットに連結します。 |
| exceptionally(Function) | 例外発生時にデフォルト値を返します。 |
| handle(BiFunction) | 結果と例外の両方を処理できます。正常・異常どちらの場合も呼び出されます。 |
| allOf(futures...) | すべての CompletableFuture が完了するまで待ちます。 |
| anyOf(futures...) | いずれかの CompletableFuture が完了したら進みます。 |
| join() | 結果を返します。get() とは異なりチェック例外をスローしません。 |
サンプルコード
非同期でデータを取得して変換し、最後に『join()』で結果を取り出します。
CompletableFutureExample.java
import java.util.concurrent.*;
class CompletableFutureExample {
public static void main(String[] args) {
CompletableFuture<String> cf = CompletableFuture
.supplyAsync(() -> {
// 時間のかかる処理(API呼び出しなど)を模擬する
try { Thread.sleep(500); } catch (InterruptedException e) {}
return "hello";
})
.thenApply(String::toUpperCase)
.thenApply(s -> s + " WORLD");
System.out.println(cf.join()); // 『HELLO WORLD』と出力される
// exceptionally() でエラー時のデフォルト値を返す
CompletableFuture<Integer> safeCf = CompletableFuture
.supplyAsync(() -> {
if (true) throw new RuntimeException("処理に失敗しました。");
return 42;
})
.exceptionally(ex -> {
System.out.println("エラー: " + ex.getMessage());
return -1;
});
System.out.println(safeCf.join()); // 『-1』と出力される
// allOf() で複数の非同期タスクをまとめて待機する
CompletableFuture<Integer> task1 = CompletableFuture.supplyAsync(() -> 10);
CompletableFuture<Integer> task2 = CompletableFuture.supplyAsync(() -> 20);
CompletableFuture<Integer> task3 = CompletableFuture.supplyAsync(() -> 30);
CompletableFuture.allOf(task1, task2, task3).join();
int total = task1.join() + task2.join() + task3.join();
System.out.println("合計: " + total); // 『合計: 60』と出力される
}
}
javac CompletableFutureExample.java java CompletableFutureExample HELLO WORLD エラー: java.lang.RuntimeException: 処理に失敗しました。 -1 合計: 60
実践パターン: thenCompose で非同期処理を連結
CompletableFutureChain.java
import java.util.concurrent.*;
class CompletableFutureChain {
public static void main(String[] args) {
// thenCompose: 非同期タスクをフラットに連結する
CompletableFuture<String> pipeline = CompletableFuture
.supplyAsync(() -> {
// Step1: ユーザーIDを取得する(例: 認証API呼び出し)
return 42;
})
.thenCompose(userId ->
CompletableFuture.supplyAsync(() -> {
// Step2: IDを使ってプロフィールを取得する(例: DB検索)
return "SON GOKU (id=" + userId + ")";
})
)
.thenApply(profile -> "Profile: " + profile)
.exceptionally(ex -> "Error: " + ex.getMessage());
System.out.println(pipeline.join()); // 『Profile: SON GOKU (id=42)』と出力される
// handle() で正常・異常の両方を処理する
CompletableFuture<String> handled = CompletableFuture
.supplyAsync(() -> {
if (Math.random() < 0.5) throw new RuntimeException("処理失敗");
return "Vegeta";
})
.handle((result, ex) -> {
if (ex != null) {
return "フォールバック: ゲスト";
}
return "ようこそ、" + result;
});
System.out.println(handled.join());
}
}
javac CompletableFutureChain.java java CompletableFutureChain Profile: SON GOKU (id=42) ようこそ、Vegeta
※ Math.random() によって結果が変わります。
よくあるミス1: get()のチェック例外
get() はチェック例外をスローするため try-catch が必要です。チェーン内では join() を使うとシンプルに書けます。
GetNg.java
import java.util.concurrent.*; CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> "hello"); String value = cf.get();
javac GetNg.java
GetNg.java:4: error: unreported exception InterruptedException; must be caught or declared to be thrown
String value = cf.get();
^
GetNg.java:4: error: unreported exception ExecutionException; must be caught or declared to be thrown
String value = cf.get();
^
2 errors
JoinOk.java
import java.util.concurrent.*; CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> "hello"); String value = cf.join(); System.out.println(value);
javac JoinOk.java java JoinOk hello
よくあるミス2: thenApplyとthenComposeの混同
thenApply と thenCompose の使い分けを間違えると、CompletableFuture がネストしてしまいます。
ThenApplyNg.java
import java.util.concurrent.*;
CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> "hello");
// thenApply に非同期タスクを返すラムダを渡すとネストする
CompletableFuture<CompletableFuture<String>> nested = cf.thenApply(
s -> CompletableFuture.supplyAsync(() -> s.toUpperCase())
);
System.out.println(nested.join().join());
javac ThenApplyNg.java java ThenApplyNg HELLO
非同期タスクを返す場合は thenCompose を使うとフラットに解決されます。
ThenComposeOk.java
import java.util.concurrent.*;
CompletableFuture<String> cf = CompletableFuture.supplyAsync(() -> "hello");
CompletableFuture<String> flat = cf.thenCompose(
s -> CompletableFuture.supplyAsync(() -> s.toUpperCase())
);
System.out.println(flat.join());
javac ThenComposeOk.java java ThenComposeOk HELLO
変換処理(同期)には『thenApply()』、次のステージが非同期タスクを返す場合は『thenCompose()』を使います。『thenApply()』に非同期タスクを返すラムダを渡すと、結果がネストした『CompletableFuture』になってしまいます。
概要
コールバック地獄とは、非同期処理のネストが深くなりコードが読みにくくなる問題のことです。『CompletableFuture』を使うことで、コールバック地獄に陥らずに非同期処理を直列・並列に組み合わせることができます。thenApply() は結果の変換、thenCompose() は非同期タスクの連結(フラットマップ)、handle() は正常・異常を問わず処理する汎用ハンドラとして使い分けます。
デフォルトでは共通の ForkJoinPool.commonPool() 上で実行されます。カスタムスレッドプールを使いたい場合は supplyAsync(supplier, executor) のように Executor を第2引数に渡してください。
スレッドプールの管理には『ExecutorService / Executors.newFixedThreadPool()』を参照してください。
記事の間違いや著作権の侵害等ございましたらお手数ですがこちらまでご連絡頂ければ幸いです。