import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.callbackFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
fun main() = runBlocking {
val myFlow = callbackFlow {
// This is asynchronous callback
giveMeValue {
offer(it)
}
awaitClose {
// This block is executed when producer channel is cancelled
// This function resumes with a cancellation exception.
// Do Cleanup here. For e.g. Close listeners.
println("Closing..")
close()
}
}
val job = launch {
myFlow.collect {
println(it)
}
}
delay(2000L)
job.cancel() // cancels the job
job.join() // waits for job's completion
}
suspend fun giveMeValue(callback: (String) -> Unit) {
arrayOf("Hi there!", "Hello", "I love Kotlin").forEach {
delay(100L)
callback(it)
}
}