|
|
@@ -12,25 +12,23 @@ class TimedFlow<T>( |
|
|
|
@OptIn(ExperimentalTypeInference::class) |
|
|
|
inline fun <R> transform( |
|
|
|
@BuilderInference crossinline action: suspend TimedFlowCollector<R>.(value: T) -> Unit |
|
|
|
) = TimedFlow<R>( |
|
|
|
onEvent.transform { arg -> |
|
|
|
val collector = this |
|
|
|
val timedCollector: TimedFlowCollector<R> = object : TimedFlowCollector<R> { |
|
|
|
override suspend fun emit(value: R, delay: Long) { |
|
|
|
collector.emit(TimedInfo(value, arg.time + delay)) |
|
|
|
} |
|
|
|
) = onEvent.transform<TimedInfo<T>, TimedInfo<R>> { arg -> |
|
|
|
val collector = this |
|
|
|
val timedCollector: TimedFlowCollector<R> = object : TimedFlowCollector<R> { |
|
|
|
override suspend fun emit(value: R, delay: Long) { |
|
|
|
collector.emit(TimedInfo(value, arg.time + delay)) |
|
|
|
} |
|
|
|
|
|
|
|
timedCollector.action(arg.value) |
|
|
|
} |
|
|
|
) |
|
|
|
|
|
|
|
timedCollector.action(arg.value) |
|
|
|
}.asTimedFlow() |
|
|
|
|
|
|
|
fun take(count: Int): TimedFlow<T> { |
|
|
|
var consumed = 0 |
|
|
|
return TimedFlow(onEvent.transform { value -> |
|
|
|
return onEvent.transform { value -> |
|
|
|
if (consumed++ < count) |
|
|
|
emit(value) |
|
|
|
}) |
|
|
|
}.asTimedFlow() |
|
|
|
} |
|
|
|
|
|
|
|
} |
|
|
@@ -38,3 +36,5 @@ class TimedFlow<T>( |
|
|
|
interface TimedFlowCollector<in T> { |
|
|
|
suspend fun emit(value: T, delay: Long = 0L) |
|
|
|
} |
|
|
|
|
|
|
|
fun <T> Flow<TimedInfo<T>>.asTimedFlow() = TimedFlow(this) |