Kotlin Flow é a API de streams reativos da biblioteca de coroutines do Kotlin. Se você já trabalhou com RxJava, vai achar a API do Flow muito mais enxuta e natural, pois ela se integra diretamente com suspend functions e structured concurrency. Neste guia, vamos cobrir desde os fundamentos até padrões avançados de uso no Android e no backend, sempre com exemplos práticos que você pode adaptar aos seus projetos.
O Que é um Flow
Um Flow é uma sequência de valores emitidos de forma assíncrona. Diferente de uma Sequence, que e sincrona, o Flow permite que cada emissao envolva operações suspensas, como chamadas de rede ou consultas a banco de dados. A grande sacada e que o Flow e cold por padrão – o código produtor só executa quando alguem começa a coletar.
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.delay
fun contagem(): Flow<Int> = flow {
for (i in 1..5) {
delay(500)
emit(i)
}
}
fun main() = runBlocking {
contagem().collect { valor ->
println("Recebido: $valor")
}
}
// Saida (com 500ms entre cada):
// Recebido: 1
// Recebido: 2
// Recebido: 3
// Recebido: 4
// Recebido: 5
O builder flow { } define o bloco produtor. A função emit envia cada valor para quem estiver coletando. Sem a chamada a collect, nada acontece – e justamente isso que caracteriza um cold flow.
Flow Builders
Alem do builder flow { }, existem outras formas de criar flows:
import kotlinx.coroutines.flow.*
fun main() {
// flowOf: cria um flow a partir de valores fixos
val fixo: Flow<String> = flowOf("Kotlin", "Brasil", "Flow")
// asFlow: converte coleções e sequencias em flow
val deListagem: Flow<Int> = listOf(1, 2, 3).asFlow()
val deRange: Flow<Int> = (1..10).asFlow()
// channelFlow: quando voce precisa emitir de contextos diferentes
val concorrente = channelFlow {
send("do contexto principal")
kotlinx.coroutines.launch {
send("de outra coroutine")
}
}
}
O channelFlow e especialmente útil quando você precisa emitir valores a partir de múltiplas coroutines concorrentes, algo que o builder flow { } não permite diretamente por questões de segurança de thread.
Cold vs Hot Flows
Essa distincao e fundamental para usar Flow corretamente. Um cold flow só começa a produzir valores quando alguem coleta. Cada coletor recebe sua própria execução independente do produtor. Ja um hot flow emite valores independentemente de ter coletores ou não.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
// Cold Flow: cada collect dispara o produtor do zero
val cold = flow {
println("Produtor iniciado")
emit(1)
emit(2)
}
println("--- Primeiro collect ---")
cold.collect { println(it) }
println("--- Segundo collect ---")
cold.collect { println(it) }
// "Produtor iniciado" aparece duas vezes
}
Na prática, você usa cold flows para operações que precisam ser executadas por demanda, como buscar dados de uma API. Ja hot flows são ideais para representar estados que existem independentemente de quem observa, como o estado de uma tela no Android.
Operadores de Transformacao
Flow oferece um conjunto rico de operadores que permitem criar pipelines de processamento poderosos. A maioria segue a mesma lógica das funções de coleções do Kotlin:
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
data class Usuario(val nome: String, val idade: Int)
fun main() = runBlocking {
val usuarios = flowOf(
Usuario("Ana", 28),
Usuario("Bruno", 17),
Usuario("Carla", 35),
Usuario("Diego", 22),
Usuario("Elena", 15)
)
// map: transforma cada elemento
usuarios
.map { it.nome.uppercase() }
.collect { println(it) }
// ANA, BRUNO, CARLA, DIEGO, ELENA
// filter: filtra elementos
usuarios
.filter { it.idade >= 18 }
.collect { println("${it.nome} e maior de idade") }
// transform: mais flexivel que map, permite emitir zero ou mais elementos
usuarios
.transform { usuario ->
emit("Processando: ${usuario.nome}")
if (usuario.idade >= 18) {
emit("${usuario.nome} aprovado")
}
}
.collect { println(it) }
// take: limita a quantidade de elementos
usuarios
.take(3)
.collect { println(it.nome) }
// Ana, Bruno, Carla
}
O operador transform e o mais flexivel de todos. Enquanto map obrigatoriamente transforma um valor em outro, transform permite emitir quantos valores quiser para cada elemento de entrada, inclusive nenhum.
Combinando Flows
Em cenários reais, você frequentemente precisa combinar dados de múltiplas fontes. Flow oferece vários operadores para isso:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
fun main() = runBlocking {
val nomes = flowOf("Ana", "Bruno", "Carla")
val idades = flowOf(28, 33, 25)
// zip: combina flows par a par
nomes.zip(idades) { nome, idade ->
"$nome tem $idade anos"
}.collect { println(it) }
// Ana tem 28 anos
// Bruno tem 33 anos
// Carla tem 25 anos
// combine: emite sempre que qualquer flow emitir
val temperatura = flow {
emit(22)
delay(1000)
emit(25)
delay(1000)
emit(23)
}
val umidade = flow {
emit(60)
delay(1500)
emit(55)
}
temperatura.combine(umidade) { temp, umi ->
"Temp: ${temp}C, Umidade: ${umi}%"
}.collect { println(it) }
// merge: intercala emissoes de múltiplos flows
val flow1 = flow {
emit("A1"); delay(100); emit("A2")
}
val flow2 = flow {
emit("B1"); delay(150); emit("B2")
}
merge(flow1, flow2).collect { println(it) }
}
A diferenca entre zip e combine e sutil mas importante: zip espera que ambos os flows emitam para produzir um par, enquanto combine re-emite sempre que qualquer um dos flows produz um novo valor, usando o ultimo valor do outro. No Android, combine e muito usado para combinar diferentes fontes de estado da UI.
StateFlow e SharedFlow
StateFlow e SharedFlow são as versões hot do Flow. Eles vivem no escopo de quem os cria e emitem valores independentemente dos coletores.
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
class ContadorViewModel {
// StateFlow: sempre tem um valor atual, ideal para estado de UI
private val _contador = MutableStateFlow(0)
val contador: StateFlow<Int> = _contador.asStateFlow()
// SharedFlow: para eventos unicos, sem valor inicial obrigatorio
private val _eventos = MutableSharedFlow<String>()
val eventos: SharedFlow<String> = _eventos.asSharedFlow()
fun incrementar() {
_contador.value++
}
suspend fun notificar(mensagem: String) {
_eventos.emit(mensagem)
}
}
fun main() = runBlocking {
val viewModel = ContadorViewModel()
// StateFlow emite o valor atual imediatamente ao coletar
val job = launch {
viewModel.contador.collect { println("Contador: $it") }
}
delay(100)
viewModel.incrementar()
delay(100)
viewModel.incrementar()
delay(100)
job.cancel()
// Saida: Contador: 0, Contador: 1, Contador: 2
}
O StateFlow e a escolha padrão para representar estados no ViewModel de uma arquitetura MVVM. Ele sempre possui um valor (não pode ser vazio) e aplica distinctUntilChanged automaticamente, evitando emissoes duplicadas. Ja o SharedFlow e mais adequado para eventos pontuais como navegação ou mensagens de erro. Para entender melhor a arquitetura MVVM, veja o guia de MVVM com Kotlin.
callbackFlow
Quando você precisa converter uma API baseada em callbacks para Flow, o callbackFlow e a ferramenta certa:
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.flow.callbackFlow
interface SensorListener {
fun onDadoRecebido(valor: Float)
fun onErro(erro: Throwable)
}
class SensorManager {
private var listener: SensorListener? = null
fun registrar(l: SensorListener) { listener = l }
fun desregistrar() { listener = null }
}
fun SensorManager.dadosFlow() = callbackFlow {
val listener = object : SensorListener {
override fun onDadoRecebido(valor: Float) {
trySend(valor)
}
override fun onErro(erro: Throwable) {
close(erro)
}
}
registrar(listener)
awaitClose {
desregistrar()
}
}
O awaitClose e obrigatório e define o que acontece quando o flow e cancelado. Sem ele, o compilador emite um aviso. Esse padrão e muito comum ao integrar SDKs do Android como Firebase, sensores do dispositivo ou APIs de localização.
Buffer e Conflate
Quando o produtor e mais rápido que o consumidor, você pode usar buffer para desacoplar as velocidades ou conflate para descartar valores intermediarios:
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlin.system.measureTimeMillis
fun main() = runBlocking {
val tempo = measureTimeMillis {
flow {
for (i in 1..5) {
delay(100) // produz a cada 100ms
emit(i)
}
}
.buffer() // desacopla produtor e consumidor
.collect { valor ->
delay(300) // consome em 300ms
println(valor)
}
}
println("Buffer levou ${tempo}ms")
// Sem buffer: ~2000ms / Com buffer: ~1700ms
val tempoConflate = measureTimeMillis {
flow {
for (i in 1..5) {
delay(100)
emit(i)
}
}
.conflate() // descarta valores intermediarios
.collect { valor ->
delay(300)
println(valor)
}
}
println("Conflate levou ${tempoConflate}ms")
// Conflate pula valores intermediarios, processando apenas os mais recentes
}
A escolha entre buffer e conflate depende do cenário. Use buffer quando cada valor importa (como mensagens de chat). Use conflate quando apenas o valor mais recente importa (como atualizações de posicao GPS).
Tratamento de Erros
Flow oferece operadores específicos para lidar com erros de forma declarativa:
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
fun main() = runBlocking {
// catch: intercepta excecoes do upstream
flow {
emit(1)
emit(2)
throw RuntimeException("Erro no produtor")
emit(3) // nunca executado
}
.catch { e -> println("Capturado: ${e.message}") }
.collect { println("Valor: $it") }
// Valor: 1
// Valor: 2
// Capturado: Erro no produtor
// retry: tenta novamente em caso de falha
var tentativa = 0
flow {
tentativa++
if (tentativa < 3) throw RuntimeException("Falha na tentativa $tentativa")
emit("Sucesso na tentativa $tentativa")
}
.retry(retries = 3) { causa ->
println("Retentando: ${causa.message}")
true
}
.collect { println(it) }
// onCompletion: executa ao final, com ou sem erro
flowOf(1, 2, 3)
.onCompletion { causa ->
if (causa == null) println("Completou sem erros")
else println("Completou com erro: ${causa.message}")
}
.collect { println(it) }
}
O operador catch só captura exceções que ocorrem acima dele na cadeia (upstream). Erros no collect não são capturados por catch. Para um tratamento completo, considere usar onEach + launchIn ou um bloco try-catch ao redor do collect.
Testando Flows
Testar flows e direto ao ponto com as ferramentas certas. A biblioteca kotlinx-coroutines-test oferece o runTest e o Turbine e uma alternativa popular da comunidade:
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.test.runTest
import kotlin.test.Test
import kotlin.test.assertEquals
class ProcessadorTest {
private fun numerosFlow() = flowOf(1, 2, 3, 4, 5)
@Test
fun `deve filtrar e transformar corretamente`() = runTest {
val resultado = numerosFlow()
.filter { it % 2 != 0 }
.map { it * 10 }
.toList()
assertEquals(listOf(10, 30, 50), resultado)
}
@Test
fun `deve emitir valores do StateFlow`() = runTest {
val stateFlow = MutableStateFlow("inicial")
val valores = mutableListOf<String>()
val job = launch {
stateFlow.take(3).toList(valores)
}
stateFlow.value = "segundo"
stateFlow.value = "terceiro"
job.join()
assertEquals(listOf("inicial", "segundo", "terceiro"), valores)
}
}
O toList() e o jeito mais simples de coletar todos os valores de um flow finito para verificação. Para flows infinitos ou que dependem de tempo, o Turbine oferece assercoesadicionais como awaitItem() e awaitComplete(). Para mais sobre testes, consulte o guia de testes em Kotlin.
Flow no Android com Lifecycle
No Android, coletar flows requer cuidado com o ciclo de vida para evitar vazamentos de memória e processamento desnecessário quando a tela não esta visivel:
import androidx.lifecycle.ViewModel
import androidx.lifecycle.viewModelScope
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.launch
class ListaViewModel(
private val repositorio: ProdutoRepositorio
) : ViewModel() {
private val _uiState = MutableStateFlow<UiState>(UiState.Carregando)
val uiState: StateFlow<UiState> = _uiState.asStateFlow()
init {
viewModelScope.launch {
repositorio.buscarProdutos()
.catch { erro ->
_uiState.value = UiState.Erro(erro.message ?: "Erro desconhecido")
}
.collect { produtos ->
_uiState.value = UiState.Sucesso(produtos)
}
}
}
}
sealed class UiState {
object Carregando : UiState()
data class Sucesso(val produtos: List<String>) : UiState()
data class Erro(val mensagem: String) : UiState()
}
// Na Activity ou Fragment, use repeatOnLifecycle
// lifecycleScope.launch {
// repeatOnLifecycle(Lifecycle.State.STARTED) {
// viewModel.uiState.collect { state ->
// when (state) {
// is UiState.Carregando -> mostrarLoading()
// is UiState.Sucesso -> mostrarProdutos(state.produtos)
// is UiState.Erro -> mostrarErro(state.mensagem)
// }
// }
// }
// }
O repeatOnLifecycle garante que a coleta só acontece quando o lifecycle esta no estado especificado (normalmente STARTED). Quando a Activity vai para background, a coleta e cancelada automaticamente e retomada ao voltar para foreground. Para entender melhor como coroutines se encaixam nesse cenário, confira o guia completo de coroutines.
Conclusão
Kotlin Flow e a abordagem moderna para programação reativa no ecossistema Kotlin. Cold flows para operações sob demanda, StateFlow para estado da UI, SharedFlow para eventos, e um conjunto rico de operadores para transformar e combinar dados. A integração nativa com coroutines torna tudo mais simples e previsivel do que alternativas como RxJava. Se você esta construindo aplicações Android, o Flow combinado com o lifecycle-aware collection e praticamente a escolha padrão hoje. Para projetos backend, ele brilha em cenários de streaming de dados e processamento reativo com Ktor ou Spring WebFlux.