1 /* 2 * Copyright (C) 2022 The Android Open Source Project 3 * 4 * Licensed under the Apache License, Version 2.0 (the "License"); 5 * you may not use this file except in compliance with the License. 6 * You may obtain a copy of the License at 7 * 8 * http://www.apache.org/licenses/LICENSE-2.0 9 * 10 * Unless required by applicable law or agreed to in writing, software 11 * distributed under the License is distributed on an "AS IS" BASIS, 12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 * See the License for the specific language governing permissions and 14 * limitations under the License. 15 */ 16 17 package com.android.systemui.util.kotlin 18 19 import com.android.systemui.util.time.SystemClock 20 import com.android.systemui.util.time.SystemClockImpl 21 import kotlinx.coroutines.CoroutineStart 22 import java.util.concurrent.atomic.AtomicReference 23 import kotlinx.coroutines.Dispatchers 24 import kotlinx.coroutines.Job 25 import kotlinx.coroutines.coroutineScope 26 import kotlinx.coroutines.delay 27 import kotlinx.coroutines.flow.Flow 28 import kotlinx.coroutines.flow.channelFlow 29 import kotlinx.coroutines.flow.distinctUntilChanged 30 import kotlinx.coroutines.flow.flow 31 import kotlinx.coroutines.flow.onStart 32 import kotlinx.coroutines.launch 33 import kotlin.math.max 34 35 /** 36 * Returns a new [Flow] that combines the two most recent emissions from [this] using [transform]. 37 * Note that the new Flow will not start emitting until it has received two emissions from the 38 * upstream Flow. 39 * 40 * Useful for code that needs to compare the current value to the previous value. 41 */ 42 fun <T, R> Flow<T>.pairwiseBy(transform: suspend (old: T, new: T) -> R): Flow<R> = flow { 43 val noVal = Any() 44 var previousValue: Any? = noVal 45 collect { newVal -> 46 if (previousValue != noVal) { 47 @Suppress("UNCHECKED_CAST") 48 emit(transform(previousValue as T, newVal)) 49 } 50 previousValue = newVal 51 } 52 } 53 54 /** 55 * Returns a new [Flow] that combines the two most recent emissions from [this] using [transform]. 56 * [initialValue] will be used as the "old" value for the first emission. 57 * 58 * Useful for code that needs to compare the current value to the previous value. 59 */ 60 fun <T, R> Flow<T>.pairwiseBy( 61 initialValue: T, 62 transform: suspend (previousValue: T, newValue: T) -> R, 63 ): Flow<R> = 64 onStart { emit(initialValue) }.pairwiseBy(transform) 65 66 /** 67 * Returns a new [Flow] that combines the two most recent emissions from [this] using [transform]. 68 * 69 * 70 * The output of [getInitialValue] will be used as the "old" value for the first emission. As 71 * opposed to the initial value in the above [pairwiseBy], [getInitialValue] can do some work before 72 * returning the initial value. 73 * 74 * Useful for code that needs to compare the current value to the previous value. 75 */ 76 fun <T, R> Flow<T>.pairwiseBy( 77 getInitialValue: suspend () -> T, 78 transform: suspend (previousValue: T, newValue: T) -> R, 79 ): Flow<R> = 80 onStart { emit(getInitialValue()) }.pairwiseBy(transform) 81 82 /** 83 * Returns a new [Flow] that produces the two most recent emissions from [this]. Note that the new 84 * Flow will not start emitting until it has received two emissions from the upstream Flow. 85 * 86 * Useful for code that needs to compare the current value to the previous value. 87 */ 88 fun <T> Flow<T>.pairwise(): Flow<WithPrev<T>> = pairwiseBy(::WithPrev) 89 90 /** 91 * Returns a new [Flow] that produces the two most recent emissions from [this]. [initialValue] 92 * will be used as the "old" value for the first emission. 93 * 94 * Useful for code that needs to compare the current value to the previous value. 95 */ 96 fun <T> Flow<T>.pairwise(initialValue: T): Flow<WithPrev<T>> = pairwiseBy(initialValue, ::WithPrev) 97 98 /** Holds a [newValue] emitted from a [Flow], along with the [previousValue] emitted value. */ 99 data class WithPrev<T>(val previousValue: T, val newValue: T) 100 101 /** 102 * Returns a new [Flow] that combines the [Set] changes between each emission from [this] using 103 * [transform]. 104 * 105 * If [emitFirstEvent] is `true`, then the first [Set] emitted from the upstream [Flow] will cause 106 * a change event to be emitted that contains no removals, and all elements from that first [Set] 107 * as additions. 108 * 109 * If [emitFirstEvent] is `false`, then the first emission is ignored and no changes are emitted 110 * until a second [Set] has been emitted from the upstream [Flow]. 111 */ 112 fun <T, R> Flow<Set<T>>.setChangesBy( 113 transform: suspend (removed: Set<T>, added: Set<T>) -> R, 114 emitFirstEvent: Boolean = true, 115 ): Flow<R> = (if (emitFirstEvent) onStart { emit(emptySet()) } else this) 116 .distinctUntilChanged() 117 .pairwiseBy { old: Set<T>, new: Set<T> -> 118 // If an element was present in the old set, but not the new one, then it was removed 119 val removed = old - new 120 // If an element is present in the new set, but on the old one, then it was added 121 val added = new - old 122 transform(removed, added) 123 } 124 125 /** 126 * Returns a new [Flow] that produces the [Set] changes between each emission from [this]. 127 * 128 * If [emitFirstEvent] is `true`, then the first [Set] emitted from the upstream [Flow] will cause 129 * a change event to be emitted that contains no removals, and all elements from that first [Set] 130 * as additions. 131 * 132 * If [emitFirstEvent] is `false`, then the first emission is ignored and no changes are emitted 133 * until a second [Set] has been emitted from the upstream [Flow]. 134 */ 135 fun <T> Flow<Set<T>>.setChanges(emitFirstEvent: Boolean = true): Flow<SetChanges<T>> = 136 setChangesBy(::SetChanges, emitFirstEvent) 137 138 /** Contains the difference in elements between two [Set]s. */ 139 data class SetChanges<T>( 140 /** Elements that are present in the first [Set] but not in the second. */ 141 val removed: Set<T>, 142 /** Elements that are present in the second [Set] but not in the first. */ 143 val added: Set<T>, 144 ) 145 146 /** 147 * Returns a new [Flow] that emits at the same rate as [this], but combines the emitted value with 148 * the most recent emission from [other] using [transform]. 149 * 150 * Note that the returned Flow will not emit anything until [other] has emitted at least one value. 151 */ 152 fun <A, B, C> Flow<A>.sample(other: Flow<B>, transform: suspend (A, B) -> C): Flow<C> = flow { 153 coroutineScope { 154 val noVal = Any() 155 val sampledRef = AtomicReference(noVal) 156 val job = launch(Dispatchers.Unconfined) { 157 other.collect { sampledRef.set(it) } 158 } 159 collect { 160 val sampled = sampledRef.get() 161 if (sampled != noVal) { 162 @Suppress("UNCHECKED_CAST") 163 emit(transform(it, sampled as B)) 164 } 165 } 166 job.cancel() 167 } 168 } 169 170 /** 171 * Returns a new [Flow] that emits at the same rate as [this], but emits the most recently emitted 172 * value from [other] instead. 173 * 174 * Note that the returned Flow will not emit anything until [other] has emitted at least one value. 175 */ 176 fun <A> Flow<*>.sample(other: Flow<A>): Flow<A> = sample(other) { _, a -> a } 177 178 /** 179 * Returns a flow that mirrors the original flow, but delays values following emitted values for the 180 * given [periodMs]. If the original flow emits more than one value during this period, only the 181 * latest value is emitted. 182 * 183 * Example: 184 * 185 * ```kotlin 186 * flow { 187 * emit(1) // t=0ms 188 * delay(90) 189 * emit(2) // t=90ms 190 * delay(90) 191 * emit(3) // t=180ms 192 * delay(1010) 193 * emit(4) // t=1190ms 194 * delay(1010) 195 * emit(5) // t=2200ms 196 * }.throttle(1000) 197 * ``` 198 * 199 * produces the following emissions at the following times 200 * 201 * ```text 202 * 1 (t=0ms), 3 (t=1000ms), 4 (t=2000ms), 5 (t=3000ms) 203 * ``` 204 */ 205 fun <T> Flow<T>.throttle(periodMs: Long): Flow<T> = this.throttle(periodMs, SystemClockImpl()) 206 207 /** 208 * Returns a flow that mirrors the original flow, but delays values following emitted values for the 209 * given [periodMs] as reported by the given [clock]. If the original flow emits more than one value 210 * during this period, only The latest value is emitted. 211 * 212 * Example: 213 * 214 * ```kotlin 215 * flow { 216 * emit(1) // t=0ms 217 * delay(90) 218 * emit(2) // t=90ms 219 * delay(90) 220 * emit(3) // t=180ms 221 * delay(1010) 222 * emit(4) // t=1190ms 223 * delay(1010) 224 * emit(5) // t=2200ms 225 * }.throttle(1000) 226 * ``` 227 * 228 * produces the following emissions at the following times 229 * 230 * ```text 231 * 1 (t=0ms), 3 (t=1000ms), 4 (t=2000ms), 5 (t=3000ms) 232 * ``` 233 */ 234 fun <T> Flow<T>.throttle(periodMs: Long, clock: SystemClock): Flow<T> = channelFlow { 235 coroutineScope { 236 var previousEmitTimeMs = 0L 237 var delayJob: Job? = null 238 var sendJob: Job? = null 239 val outerScope = this 240 241 collect { 242 delayJob?.cancel() 243 sendJob?.join() 244 val currentTimeMs = clock.elapsedRealtime() 245 val timeSinceLastEmit = currentTimeMs - previousEmitTimeMs 246 val timeUntilNextEmit = max(0L, periodMs - timeSinceLastEmit) 247 if (timeUntilNextEmit > 0L) { 248 // We create delayJob to allow cancellation during the delay period 249 delayJob = launch { 250 delay(timeUntilNextEmit) 251 sendJob = outerScope.launch(start = CoroutineStart.UNDISPATCHED) { 252 send(it) 253 previousEmitTimeMs = clock.elapsedRealtime() 254 } 255 } 256 } else { 257 send(it) 258 previousEmitTimeMs = currentTimeMs 259 } 260 } 261 } 262 }