1 /*
2  * Copyright (C) 2022 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.android.systemui.util.kotlin
18 
19 import com.android.systemui.util.time.SystemClock
20 import com.android.systemui.util.time.SystemClockImpl
21 import kotlinx.coroutines.CoroutineStart
22 import java.util.concurrent.atomic.AtomicReference
23 import kotlinx.coroutines.Dispatchers
24 import kotlinx.coroutines.Job
25 import kotlinx.coroutines.coroutineScope
26 import kotlinx.coroutines.delay
27 import kotlinx.coroutines.flow.Flow
28 import kotlinx.coroutines.flow.channelFlow
29 import kotlinx.coroutines.flow.distinctUntilChanged
30 import kotlinx.coroutines.flow.flow
31 import kotlinx.coroutines.flow.onStart
32 import kotlinx.coroutines.launch
33 import kotlin.math.max
34 
35 /**
36  * Returns a new [Flow] that combines the two most recent emissions from [this] using [transform].
37  * Note that the new Flow will not start emitting until it has received two emissions from the
38  * upstream Flow.
39  *
40  * Useful for code that needs to compare the current value to the previous value.
41  */
42 fun <T, R> Flow<T>.pairwiseBy(transform: suspend (old: T, new: T) -> R): Flow<R> = flow {
43     val noVal = Any()
44     var previousValue: Any? = noVal
45     collect { newVal ->
46         if (previousValue != noVal) {
47             @Suppress("UNCHECKED_CAST")
48             emit(transform(previousValue as T, newVal))
49         }
50         previousValue = newVal
51     }
52 }
53 
54 /**
55  * Returns a new [Flow] that combines the two most recent emissions from [this] using [transform].
56  * [initialValue] will be used as the "old" value for the first emission.
57  *
58  * Useful for code that needs to compare the current value to the previous value.
59  */
60 fun <T, R> Flow<T>.pairwiseBy(
61     initialValue: T,
62     transform: suspend (previousValue: T, newValue: T) -> R,
63 ): Flow<R> =
64     onStart { emit(initialValue) }.pairwiseBy(transform)
65 
66 /**
67  * Returns a new [Flow] that combines the two most recent emissions from [this] using [transform].
68  *
69  *
70  * The output of [getInitialValue] will be used as the "old" value for the first emission. As
71  * opposed to the initial value in the above [pairwiseBy], [getInitialValue] can do some work before
72  * returning the initial value.
73  *
74  * Useful for code that needs to compare the current value to the previous value.
75  */
76 fun <T, R> Flow<T>.pairwiseBy(
77     getInitialValue: suspend () -> T,
78     transform: suspend (previousValue: T, newValue: T) -> R,
79 ): Flow<R> =
80     onStart { emit(getInitialValue()) }.pairwiseBy(transform)
81 
82 /**
83  * Returns a new [Flow] that produces the two most recent emissions from [this]. Note that the new
84  * Flow will not start emitting until it has received two emissions from the upstream Flow.
85  *
86  * Useful for code that needs to compare the current value to the previous value.
87  */
88 fun <T> Flow<T>.pairwise(): Flow<WithPrev<T>> = pairwiseBy(::WithPrev)
89 
90 /**
91  * Returns a new [Flow] that produces the two most recent emissions from [this]. [initialValue]
92  * will be used as the "old" value for the first emission.
93  *
94  * Useful for code that needs to compare the current value to the previous value.
95  */
96 fun <T> Flow<T>.pairwise(initialValue: T): Flow<WithPrev<T>> = pairwiseBy(initialValue, ::WithPrev)
97 
98 /** Holds a [newValue] emitted from a [Flow], along with the [previousValue] emitted value. */
99 data class WithPrev<T>(val previousValue: T, val newValue: T)
100 
101 /**
102  * Returns a new [Flow] that combines the [Set] changes between each emission from [this] using
103  * [transform].
104  *
105  * If [emitFirstEvent] is `true`, then the first [Set] emitted from the upstream [Flow] will cause
106  * a change event to be emitted that contains no removals, and all elements from that first [Set]
107  * as additions.
108  *
109  * If [emitFirstEvent] is `false`, then the first emission is ignored and no changes are emitted
110  * until a second [Set] has been emitted from the upstream [Flow].
111  */
112 fun <T, R> Flow<Set<T>>.setChangesBy(
113     transform: suspend (removed: Set<T>, added: Set<T>) -> R,
114     emitFirstEvent: Boolean = true,
115 ): Flow<R> = (if (emitFirstEvent) onStart { emit(emptySet()) } else this)
116     .distinctUntilChanged()
117     .pairwiseBy { old: Set<T>, new: Set<T> ->
118         // If an element was present in the old set, but not the new one, then it was removed
119         val removed = old - new
120         // If an element is present in the new set, but on the old one, then it was added
121         val added = new - old
122         transform(removed, added)
123     }
124 
125 /**
126  * Returns a new [Flow] that produces the [Set] changes between each emission from [this].
127  *
128  * If [emitFirstEvent] is `true`, then the first [Set] emitted from the upstream [Flow] will cause
129  * a change event to be emitted that contains no removals, and all elements from that first [Set]
130  * as additions.
131  *
132  * If [emitFirstEvent] is `false`, then the first emission is ignored and no changes are emitted
133  * until a second [Set] has been emitted from the upstream [Flow].
134  */
135 fun <T> Flow<Set<T>>.setChanges(emitFirstEvent: Boolean = true): Flow<SetChanges<T>> =
136     setChangesBy(::SetChanges, emitFirstEvent)
137 
138 /** Contains the difference in elements between two [Set]s. */
139 data class SetChanges<T>(
140     /** Elements that are present in the first [Set] but not in the second. */
141     val removed: Set<T>,
142     /** Elements that are present in the second [Set] but not in the first. */
143     val added: Set<T>,
144 )
145 
146 /**
147  * Returns a new [Flow] that emits at the same rate as [this], but combines the emitted value with
148  * the most recent emission from [other] using [transform].
149  *
150  * Note that the returned Flow will not emit anything until [other] has emitted at least one value.
151  */
152 fun <A, B, C> Flow<A>.sample(other: Flow<B>, transform: suspend (A, B) -> C): Flow<C> = flow {
153     coroutineScope {
154         val noVal = Any()
155         val sampledRef = AtomicReference(noVal)
156         val job = launch(Dispatchers.Unconfined) {
157             other.collect { sampledRef.set(it) }
158         }
159         collect {
160             val sampled = sampledRef.get()
161             if (sampled != noVal) {
162                 @Suppress("UNCHECKED_CAST")
163                 emit(transform(it, sampled as B))
164             }
165         }
166         job.cancel()
167     }
168 }
169 
170 /**
171  * Returns a new [Flow] that emits at the same rate as [this], but emits the most recently emitted
172  * value from [other] instead.
173  *
174  * Note that the returned Flow will not emit anything until [other] has emitted at least one value.
175  */
176 fun <A> Flow<*>.sample(other: Flow<A>): Flow<A> = sample(other) { _, a -> a }
177 
178 /**
179  * Returns a flow that mirrors the original flow, but delays values following emitted values for the
180  * given [periodMs]. If the original flow emits more than one value during this period, only the
181  * latest value is emitted.
182  *
183  * Example:
184  *
185  * ```kotlin
186  * flow {
187  *     emit(1)     // t=0ms
188  *     delay(90)
189  *     emit(2)     // t=90ms
190  *     delay(90)
191  *     emit(3)     // t=180ms
192  *     delay(1010)
193  *     emit(4)     // t=1190ms
194  *     delay(1010)
195  *     emit(5)     // t=2200ms
196  * }.throttle(1000)
197  * ```
198  *
199  * produces the following emissions at the following times
200  *
201  * ```text
202  * 1 (t=0ms), 3 (t=1000ms), 4 (t=2000ms), 5 (t=3000ms)
203  * ```
204  */
205 fun <T> Flow<T>.throttle(periodMs: Long): Flow<T> = this.throttle(periodMs, SystemClockImpl())
206 
207 /**
208  * Returns a flow that mirrors the original flow, but delays values following emitted values for the
209  * given [periodMs] as reported by the given [clock]. If the original flow emits more than one value
210  * during this period, only The latest value is emitted.
211  *
212  * Example:
213  *
214  * ```kotlin
215  * flow {
216  *     emit(1)     // t=0ms
217  *     delay(90)
218  *     emit(2)     // t=90ms
219  *     delay(90)
220  *     emit(3)     // t=180ms
221  *     delay(1010)
222  *     emit(4)     // t=1190ms
223  *     delay(1010)
224  *     emit(5)     // t=2200ms
225  * }.throttle(1000)
226  * ```
227  *
228  * produces the following emissions at the following times
229  *
230  * ```text
231  * 1 (t=0ms), 3 (t=1000ms), 4 (t=2000ms), 5 (t=3000ms)
232  * ```
233  */
234 fun <T> Flow<T>.throttle(periodMs: Long, clock: SystemClock): Flow<T> = channelFlow {
235     coroutineScope {
236         var previousEmitTimeMs = 0L
237         var delayJob: Job? = null
238         var sendJob: Job? = null
239         val outerScope = this
240 
241         collect {
242             delayJob?.cancel()
243             sendJob?.join()
244             val currentTimeMs = clock.elapsedRealtime()
245             val timeSinceLastEmit = currentTimeMs - previousEmitTimeMs
246             val timeUntilNextEmit = max(0L, periodMs - timeSinceLastEmit)
247             if (timeUntilNextEmit > 0L) {
248                 // We create delayJob to allow cancellation during the delay period
249                 delayJob = launch {
250                     delay(timeUntilNextEmit)
251                     sendJob = outerScope.launch(start = CoroutineStart.UNDISPATCHED) {
252                         send(it)
253                         previousEmitTimeMs = clock.elapsedRealtime()
254                     }
255                 }
256             } else {
257                 send(it)
258                 previousEmitTimeMs = currentTimeMs
259             }
260         }
261     }
262 }