Flow — 基本
| 対応: | Kotlin 1.3(2018) |
|---|
KotlinのFlowは非同期のデータストリームです。『flow { emit() }』でデータを順次発行し、『collect { }』で受け取ります。リストと異なり遅延評価されるため、大量データや継続的なイベントの処理に適しています。
構文
// Flow の生成
val flow: Flow<型> = flow {
emit(値) // データを発行
emit(値2)
}
// Flow の収集(コルーチン内で実行)
flow.collect { value ->
// 受け取った値を処理
}
// 変換操作(中間演算子)
flow.map { it * 2 }
.filter { it > 3 }
.collect { println(it) }
関数一覧
| 関数 / 演算子 | 概要 |
|---|---|
| flow { emit() } | 新しい Flow を生成します。ブロック内で『emit()』を呼んで値を発行します。 |
| flowOf(vararg) | 指定した値を発行する Flow を作成します。 |
| List.asFlow() | リストを Flow に変換します。 |
| flow.collect { } | Flow の値を受け取って処理します(終端演算子)。 |
| flow.map { } | 各値を変換します(中間演算子)。 |
| flow.filter { } | 条件に合う値だけを通します(中間演算子)。 |
| flow.take(n) | 最初の n 件だけ取得します(中間演算子)。 |
| flow.toList() | 全値を収集してリストにします(終端演算子)。 |
サンプルコード
flow_basic.kt
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
// Flow を返す関数
fun countDown(): Flow<Int> = flow {
for (i in 5 downTo 1) {
delay(100) // 非同期処理を模倣
emit(i)
}
}
fun main() = runBlocking {
countDown().collect { value ->
println("カウント: $value")
}
flowOf("Apple", "Banana", "Cherry")
.map { it.uppercase() }
.filter { it.startsWith("B") || it.startsWith("A") }
.collect { println(it) }
val total = (1..10).asFlow()
.filter { it % 2 == 0 } // 偶数だけ
.map { it * it } // 2乗
.toList()
println(total) // [4, 16, 36, 64, 100]
(1..100).asFlow()
.take(3)
.collect { println(it) } // 1, 2, 3
}
kotlinc flow_basic.kt -include-runtime -d flow_basic.jar java -jar flow_basic.jar カウント: 5 カウント: 4 カウント: 3 カウント: 2 カウント: 1 APPLE BANANA [4, 16, 36, 64, 100] 1 2 3
エラーハンドリング
Flow内で例外が発生した場合、『catch』演算子で捕捉できます。『catch』はその上流(先に定義された演算子)の例外のみを捕捉します。下流の例外は捕捉できないため注意してください。
sample_flow_error.kt
import kotlinx.coroutines.* import kotlinx.coroutines.flow.* fun riskyFlow(): Flow= flow { emit(1) emit(2) throw RuntimeException("ストリームエラー") // 途中で例外が発生 emit(3) // これは実行されません } fun main() = runBlocking { riskyFlow() .catch { e -> emit(-1) } // 例外をキャッチして代替値を発行 .collect { println("受信: $it") } // 出力: 受信: 1 / 受信: 2 / 受信: -1 // onCompletion: 正常・異常どちらの終了でも実行されます (1..3).asFlow() .onCompletion { cause -> if (cause != null) println("エラーで終了: $cause") else println("正常に完了しました") } .collect { println(it) } }
kotlinc sample_flow_error.kt -include-runtime -d sample_flow_error.jar java -jar sample_flow_error.jar 受信: 1 受信: 2 受信: -1 1 2 3 正常に完了しました
StateFlow / SharedFlow
Flow(コールドストリーム)に対し、複数のコレクターで同じデータを共有できる「ホットストリーム」として『StateFlow』と『SharedFlow』があります。StateFlow は常に最新の1件を保持し、新しいコレクターが接続した瞬間にその値を受け取ります。これは Flow とは異なる重要な性質です。
| 種類 | 特徴 | 主な用途 |
|---|---|---|
| Flow | コールドストリーム。collectが呼ばれるたびに最初から実行 | DBクエリ・1回限りの処理 |
| StateFlow | ホットストリーム。常に最新値を保持 | UI状態管理(ViewModelのstate) |
| SharedFlow | ホットストリーム。値を保持しない(デフォルト) | イベント通知(ナビゲーションなど) |
sample_stateflow.kt
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
// StateFlow: 初期値が必要です
val stateFlow = MutableStateFlow(0)
// コレクターを起動(バックグラウンドで監視)
val job = launch {
stateFlow.collect { value ->
println("StateFlow受信: $value")
}
}
stateFlow.value = 1 // 直接書き込みます
stateFlow.value = 2
stateFlow.value = 2 // 同じ値は通知されません(StateFlowの特性)
stateFlow.value = 3
delay(100) // コレクターの処理を待ちます
job.cancel()
}
kotlinc sample_stateflow.kt -include-runtime -d sample_stateflow.jar java -jar sample_stateflow.jar StateFlow受信: 0 StateFlow受信: 1 StateFlow受信: 2 StateFlow受信: 3
よくあるミス
sample_flow_mistakes.kt
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
// ❌ よくあるミス1: コルーチンの外でcollectを呼ぶ
// flow { emit(1) }.collect { } → コンパイルエラー(suspend関数はコルーチン内のみ)
// ❌ よくあるミス2: Flowを複数回collectして「初期化されていない」と思い込む
val myFlow = flow {
println("Flowが実行されました") // collectのたびに実行されます
emit(1)
emit(2)
}
myFlow.collect { println("1回目: $it") }
myFlow.collect { println("2回目: $it") } // コールドストリームなので再び実行されます
// ❌ よくあるミス3: StateFlowの同値更新が通知されないことを知らない
val state = MutableStateFlow("loading")
val received = mutableListOf()
val job = launch { state.collect { received.add(it) } }
state.value = "loading" // 同じ値なので通知されません
state.value = "success"
delay(50)
job.cancel()
println("受信値: $received") // [loading, success](2回目のloadingはない)
}
kotlinc sample_flow_mistakes.kt -include-runtime -d sample_flow_mistakes.jar java -jar sample_flow_mistakes.jar Flowが実行されました 1回目: 1 1回目: 2 Flowが実行されました 2回目: 1 2回目: 2 受信値: [loading, success]
概要
FlowはKotlinの非同期データストリームで、RxJavaのObservableに相当しますが、コルーチンとシームレスに統合されています。『collect』が呼ばれるまで実行されない「コールドストリーム」です。
中間演算子(『map』『filter』など)はチェーンしても遅延評価されます。実際の実行は終端演算子(『collect』『toList』など)を呼んだときにまとめて行われます。
コルーチンの基本はsuspend 関数を、コルーチン間のデータ通信はChannelを参照してください。
記事の間違いや著作権の侵害等ございましたらお手数ですがこちらまでご連絡頂ければ幸いです。