import kotlinx.coroutines.channels.awaitClose import kotlinx.coroutines.delay import kotlinx.coroutines.flow.callbackFlow import kotlinx.coroutines.flow.collect import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking fun main() = runBlocking { val myFlow = callbackFlow { // This is asynchronous callback giveMeValue { offer(it) } awaitClose { // This block is executed when producer channel is cancelled // This function resumes with a cancellation exception. // Do Cleanup here. For e.g. Close listeners. println("Closing..") close() } } val job = launch { myFlow.collect { println(it) } } delay(2000L) job.cancel() // cancels the job job.join() // waits for job's completion } suspend fun giveMeValue(callback: (String) -> Unit) { arrayOf("Hi there!", "Hello", "I love Kotlin").forEach { delay(100L) callback(it) } }