mirror of
https://github.com/Suwayomi/Suwayomi-Server.git
synced 2026-06-30 09:24:34 -05:00
Fix subscribeTo sometimes not emitting initial value (#2076)
* Add a test for `subscribeTo` * subscribeTo: Fix initial flow value sometimes not propagated Co-authored-by: schroda <50052685+schroda@users.noreply.github.com> Co-authored-by: Syer10 <mitchellptbo@gmail.com> * lint --------- Co-authored-by: schroda <50052685+schroda@users.noreply.github.com> Co-authored-by: Syer10 <mitchellptbo@gmail.com>
This commit is contained in:
@@ -20,6 +20,7 @@ import kotlinx.coroutines.flow.Flow
|
||||
import kotlinx.coroutines.flow.MutableSharedFlow
|
||||
import kotlinx.coroutines.flow.MutableStateFlow
|
||||
import kotlinx.coroutines.flow.asStateFlow
|
||||
import kotlinx.coroutines.flow.conflate
|
||||
import kotlinx.coroutines.flow.distinctUntilChanged
|
||||
import kotlinx.coroutines.flow.drop
|
||||
import kotlinx.coroutines.flow.launchIn
|
||||
@@ -73,6 +74,21 @@ val serverConfig: ServerConfig by lazy { GlobalConfigManager.module() }
|
||||
|
||||
private val application: Application by injectLazy()
|
||||
|
||||
@OptIn(ExperimentalCoroutinesApi::class)
|
||||
fun <T> subscribeTo(
|
||||
flow: Flow<T>,
|
||||
ignoreInitialValue: Boolean = true,
|
||||
onChange: suspend (value: T) -> Unit,
|
||||
) {
|
||||
val actualFlow =
|
||||
if (ignoreInitialValue) {
|
||||
flow.drop(1)
|
||||
} else {
|
||||
flow
|
||||
}
|
||||
actualFlow.distinctUntilChanged().conflate().onEach { onChange(it) }.launchIn(mutableConfigValueScope)
|
||||
}
|
||||
|
||||
// Settings are ordered by "protoNumber".
|
||||
class ServerConfig(
|
||||
getConfig: () -> Config,
|
||||
@@ -1067,18 +1083,7 @@ class ServerConfig(
|
||||
flow: Flow<T>,
|
||||
onChange: suspend (value: T) -> Unit,
|
||||
ignoreInitialValue: Boolean = true,
|
||||
) {
|
||||
val actualFlow =
|
||||
if (ignoreInitialValue) {
|
||||
flow.drop(1)
|
||||
} else {
|
||||
flow
|
||||
}
|
||||
|
||||
val sharedFlow = MutableSharedFlow<T>(extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)
|
||||
actualFlow.distinctUntilChanged().mapLatest { sharedFlow.emit(it) }.launchIn(mutableConfigValueScope)
|
||||
sharedFlow.onEach { onChange(it) }.launchIn(mutableConfigValueScope)
|
||||
}
|
||||
) = subscribeTo(flow, ignoreInitialValue, onChange)
|
||||
|
||||
fun <T> subscribeTo(
|
||||
flow: Flow<T>,
|
||||
|
||||
26
server/src/test/kotlin/suwayomi/tachidesk/FlowTest.kt
Normal file
26
server/src/test/kotlin/suwayomi/tachidesk/FlowTest.kt
Normal file
@@ -0,0 +1,26 @@
|
||||
package suwayomi.tachidesk
|
||||
|
||||
import graphql.Assert.assertTrue
|
||||
import kotlinx.coroutines.flow.MutableStateFlow
|
||||
import kotlinx.coroutines.flow.first
|
||||
import kotlinx.coroutines.test.runTest
|
||||
import suwayomi.tachidesk.server.subscribeTo
|
||||
import java.util.concurrent.CountDownLatch
|
||||
import java.util.concurrent.TimeUnit
|
||||
import kotlin.test.Test
|
||||
|
||||
class FlowTest {
|
||||
@Test
|
||||
fun subscribe() =
|
||||
runTest {
|
||||
(1..1000).forEach { _ ->
|
||||
val testFlow = MutableStateFlow(value = 3)
|
||||
testFlow.first()
|
||||
val latch = CountDownLatch(1)
|
||||
subscribeTo(testFlow, ignoreInitialValue = false) { _ ->
|
||||
latch.countDown()
|
||||
}
|
||||
assertTrue(latch.await(5, TimeUnit.SECONDS))
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user