1 /*
2  * Copyright (C) 2022 The Android Open Source Project
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *      http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 package com.android.systemui.util.kotlin
18 
19 import android.testing.AndroidTestingRunner
20 import androidx.test.filters.SmallTest
21 import com.android.systemui.SysuiTestCase
22 import com.android.systemui.util.time.FakeSystemClock
23 import com.google.common.truth.Truth.assertThat
24 import kotlinx.coroutines.Dispatchers
25 import kotlinx.coroutines.ExperimentalCoroutinesApi
26 import kotlinx.coroutines.Job
27 import kotlinx.coroutines.delay
28 import kotlinx.coroutines.flow.Flow
29 import kotlinx.coroutines.flow.MutableSharedFlow
30 import kotlinx.coroutines.flow.MutableStateFlow
31 import kotlinx.coroutines.flow.asFlow
32 import kotlinx.coroutines.flow.emptyFlow
33 import kotlinx.coroutines.flow.filterIsInstance
34 import kotlinx.coroutines.flow.flow
35 import kotlinx.coroutines.flow.flowOf
36 import kotlinx.coroutines.flow.merge
37 import kotlinx.coroutines.flow.takeWhile
38 import kotlinx.coroutines.flow.toList
39 import kotlinx.coroutines.launch
40 import kotlinx.coroutines.runBlocking
41 import kotlinx.coroutines.test.TestScope
42 import kotlinx.coroutines.test.advanceTimeBy
43 import kotlinx.coroutines.test.runCurrent
44 import kotlinx.coroutines.test.runTest
45 import kotlinx.coroutines.yield
46 import org.junit.Test
47 import org.junit.runner.RunWith
48 
49 @SmallTest
50 @RunWith(AndroidTestingRunner::class)
51 class PairwiseFlowTest : SysuiTestCase() {
52     @Test
53     fun simple() = runBlocking {
54         assertThatFlow((1..3).asFlow().pairwise())
55             .emitsExactly(
56                 WithPrev(1, 2),
57                 WithPrev(2, 3),
58             )
59     }
60 
61     @Test fun notEnough() = runBlocking { assertThatFlow(flowOf(1).pairwise()).emitsNothing() }
62 
63     @Test
64     fun withInit() = runBlocking {
65         assertThatFlow(flowOf(2).pairwise(initialValue = 1)).emitsExactly(WithPrev(1, 2))
66     }
67 
68     @Test
69     fun notEnoughWithInit() = runBlocking {
70         assertThatFlow(emptyFlow<Int>().pairwise(initialValue = 1)).emitsNothing()
71     }
72 
73     @Test
74     fun withTransform() = runBlocking {
75         assertThatFlow(
76                 flowOf("val1", "val2", "val3").pairwiseBy { prev: String, next: String ->
77                     "$prev|$next"
78                 }
79             )
80             .emitsExactly("val1|val2", "val2|val3")
81     }
82 
83     @Test
84     fun withGetInit() = runBlocking {
85         var initRun = false
86         assertThatFlow(
87                 flowOf("val1", "val2").pairwiseBy(
88                     getInitialValue = {
89                         initRun = true
90                         "initial"
91                     }
92                 ) { prev: String, next: String -> "$prev|$next" }
93             )
94             .emitsExactly("initial|val1", "val1|val2")
95         assertThat(initRun).isTrue()
96     }
97 
98     @Test
99     fun notEnoughWithGetInit() = runBlocking {
100         var initRun = false
101         assertThatFlow(
102                 emptyFlow<String>().pairwiseBy(
103                     getInitialValue = {
104                         initRun = true
105                         "initial"
106                     }
107                 ) { prev: String, next: String -> "$prev|$next" }
108             )
109             .emitsNothing()
110         // Even though the flow will not emit anything, the initial value function should still get
111         // run.
112         assertThat(initRun).isTrue()
113     }
114 
115     @Test
116     fun getInitNotRunWhenFlowNotCollected() = runBlocking {
117         var initRun = false
118         flowOf("val1", "val2").pairwiseBy(
119             getInitialValue = {
120                 initRun = true
121                 "initial"
122             }
123         ) { prev: String, next: String -> "$prev|$next" }
124 
125         // Since the flow isn't collected, ensure [initialValueFun] isn't run.
126         assertThat(initRun).isFalse()
127     }
128 
129     @Test
130     fun withStateFlow() =
131         runBlocking(Dispatchers.Main.immediate) {
132             val state = MutableStateFlow(1)
133             val stop = MutableSharedFlow<Unit>()
134 
135             val stoppable = merge(state, stop).takeWhile { it is Int }.filterIsInstance<Int>()
136 
137             val job1 = launch { assertThatFlow(stoppable.pairwise()).emitsExactly(WithPrev(1, 2)) }
138             state.value = 2
139             val job2 = launch { assertThatFlow(stoppable.pairwise()).emitsNothing() }
140 
141             stop.emit(Unit)
142 
143             assertThatJob(job1).isCompleted()
144             assertThatJob(job2).isCompleted()
145         }
146 }
147 
148 @SmallTest
149 @RunWith(AndroidTestingRunner::class)
150 class SetChangesFlowTest : SysuiTestCase() {
151     @Test
152     fun simple() = runBlocking {
153         assertThatFlow(flowOf(setOf(1, 2, 3), setOf(2, 3, 4)).setChanges())
154             .emitsExactly(
155                 SetChanges(
156                     added = setOf(1, 2, 3),
157                     removed = emptySet(),
158                 ),
159                 SetChanges(
160                     added = setOf(4),
161                     removed = setOf(1),
162                 ),
163             )
164     }
165 
166     @Test
167     fun onlyOneEmission() = runBlocking {
168         assertThatFlow(flowOf(setOf(1)).setChanges())
169             .emitsExactly(
170                 SetChanges(
171                     added = setOf(1),
172                     removed = emptySet(),
173                 )
174             )
175     }
176 
177     @Test
178     fun fromEmptySet() = runBlocking {
179         assertThatFlow(flowOf(emptySet(), setOf(1, 2)).setChanges())
180             .emitsExactly(
181                 SetChanges(
182                     removed = emptySet(),
183                     added = setOf(1, 2),
184                 )
185             )
186     }
187 
188     @Test
189     fun dontEmitFirstEvent() = runBlocking {
190         assertThatFlow(flowOf(setOf(1, 2), setOf(2, 3)).setChanges(emitFirstEvent = false))
191             .emitsExactly(
192                 SetChanges(
193                     removed = setOf(1),
194                     added = setOf(3),
195                 )
196             )
197     }
198 }
199 
200 @SmallTest
201 @RunWith(AndroidTestingRunner::class)
202 class SampleFlowTest : SysuiTestCase() {
203     @Test
204     fun simple() = runBlocking {
205         assertThatFlow(
206                 flow {
207                         yield()
208                         emit(1)
209                     }
210                     .sample(flowOf(2)) { a, b -> a to b }
211             )
212             .emitsExactly(1 to 2)
213     }
214 
215     @Test
216     fun otherFlowNoValueYet() = runBlocking {
217         assertThatFlow(flowOf(1).sample(emptyFlow<Unit>())).emitsNothing()
218     }
219 
220     @Test
221     fun multipleSamples() = runBlocking {
222         val samplee = MutableSharedFlow<Int>()
223         val sampler = flow {
224             emit(1)
225             samplee.emit(1)
226             emit(2)
227             samplee.emit(2)
228             samplee.emit(3)
229             emit(3)
230             emit(4)
231         }
232         assertThatFlow(sampler.sample(samplee) { a, b -> a to b })
233             .emitsExactly(
234                 2 to 1,
235                 3 to 3,
236                 4 to 3,
237             )
238     }
239 }
240 
241 @OptIn(ExperimentalCoroutinesApi::class)
242 @SmallTest
243 @RunWith(AndroidTestingRunner::class)
244 class ThrottleFlowTest : SysuiTestCase() {
245 
246     @Test
247     fun doesNotAffectEmissions_whenDelayAtLeastEqualToPeriod() = runTest {
248         // Arrange
249         val choreographer = createChoreographer(this)
250         val output = mutableListOf<Int>()
251         val collectJob = backgroundScope.launch {
252             flow {
253                 emit(1)
254                 delay(1000)
255                 emit(2)
256             }.throttle(1000, choreographer.fakeClock).toList(output)
257         }
258 
259         // Act
260         choreographer.advanceAndRun(0)
261 
262         // Assert
263         assertThat(output).containsExactly(1)
264 
265         // Act
266         choreographer.advanceAndRun(999)
267 
268         // Assert
269         assertThat(output).containsExactly(1)
270 
271         // Act
272         choreographer.advanceAndRun(1)
273 
274         // Assert
275         assertThat(output).containsExactly(1, 2)
276 
277         // Cleanup
278         collectJob.cancel()
279     }
280 
281     @Test
282     fun delaysEmissions_withShorterThanPeriodDelay_untilPeriodElapses() = runTest {
283         // Arrange
284         val choreographer = createChoreographer(this)
285         val output = mutableListOf<Int>()
286         val collectJob = backgroundScope.launch {
287             flow {
288                 emit(1)
289                 delay(500)
290                 emit(2)
291             }.throttle(1000, choreographer.fakeClock).toList(output)
292         }
293 
294         // Act
295         choreographer.advanceAndRun(0)
296 
297         // Assert
298         assertThat(output).containsExactly(1)
299 
300         // Act
301         choreographer.advanceAndRun(500)
302         choreographer.advanceAndRun(499)
303 
304         // Assert
305         assertThat(output).containsExactly(1)
306 
307         // Act
308         choreographer.advanceAndRun(1)
309 
310         // Assert
311         assertThat(output).containsExactly(1, 2)
312 
313         // Cleanup
314         collectJob.cancel()
315     }
316 
317     @Test
318     fun filtersAllButLastEmission_whenMultipleEmissionsInPeriod() = runTest {
319         // Arrange
320         val choreographer = createChoreographer(this)
321         val output = mutableListOf<Int>()
322         val collectJob = backgroundScope.launch {
323             flow {
324                 emit(1)
325                 delay(500)
326                 emit(2)
327                 delay(500)
328                 emit(3)
329             }.throttle(1000, choreographer.fakeClock).toList(output)
330         }
331 
332         // Act
333         choreographer.advanceAndRun(0)
334 
335         // Assert
336         assertThat(output).containsExactly(1)
337 
338         // Act
339         choreographer.advanceAndRun(500)
340         choreographer.advanceAndRun(499)
341 
342         // Assert
343         assertThat(output).containsExactly(1)
344 
345         // Act
346         choreographer.advanceAndRun(1)
347 
348         // Assert
349         assertThat(output).containsExactly(1, 3)
350 
351         // Cleanup
352         collectJob.cancel()
353     }
354 
355     @Test
356     fun filtersAllButLastEmission_andDelaysIt_whenMultipleEmissionsInShorterThanPeriod() = runTest {
357         // Arrange
358         val choreographer = createChoreographer(this)
359         val output = mutableListOf<Int>()
360         val collectJob = backgroundScope.launch {
361             flow {
362                 emit(1)
363                 delay(500)
364                 emit(2)
365                 delay(250)
366                 emit(3)
367             }.throttle(1000, choreographer.fakeClock).toList(output)
368         }
369 
370         // Act
371         choreographer.advanceAndRun(0)
372 
373         // Assert
374         assertThat(output).containsExactly(1)
375 
376         // Act
377         choreographer.advanceAndRun(500)
378         choreographer.advanceAndRun(250)
379         choreographer.advanceAndRun(249)
380 
381         // Assert
382         assertThat(output).containsExactly(1)
383 
384         // Act
385         choreographer.advanceAndRun(1)
386 
387         // Assert
388         assertThat(output).containsExactly(1, 3)
389 
390         // Cleanup
391         collectJob.cancel()
392     }
393 
394     private fun createChoreographer(testScope: TestScope) = object {
395         val fakeClock = FakeSystemClock()
396 
397         fun advanceAndRun(millis: Long) {
398             fakeClock.advanceTime(millis)
399             testScope.advanceTimeBy(millis)
400             testScope.runCurrent()
401         }
402     }
403 }
404 
405 private fun <T> assertThatFlow(flow: Flow<T>) =
406     object {
407         suspend fun emitsExactly(vararg emissions: T) =
408             assertThat(flow.toList()).containsExactly(*emissions).inOrder()
409         suspend fun emitsNothing() = assertThat(flow.toList()).isEmpty()
410     }
411 
412 private fun assertThatJob(job: Job) =
413     object {
414         fun isCompleted() = assertThat(job.isCompleted).isTrue()
415     }
416