From 911c0ce2e385bac517c9feedfbff084b3977ad78 Mon Sep 17 00:00:00 2001 From: Constantin Piber <59023762+cpiber@users.noreply.github.com> Date: Sun, 31 May 2026 23:05:09 +0200 Subject: [PATCH] Fix `subscribeTo` sometimes not emitting initial value (#2076) * Add a test for `subscribeTo` * subscribeTo: Fix initial flow value sometimes not propagated Co-authored-by: schroda <50052685+schroda@users.noreply.github.com> Co-authored-by: Syer10 * lint --------- Co-authored-by: schroda <50052685+schroda@users.noreply.github.com> Co-authored-by: Syer10 --- .../suwayomi/tachidesk/server/ServerConfig.kt | 29 +++++++++++-------- .../kotlin/suwayomi/tachidesk/FlowTest.kt | 26 +++++++++++++++++ 2 files changed, 43 insertions(+), 12 deletions(-) create mode 100644 server/src/test/kotlin/suwayomi/tachidesk/FlowTest.kt diff --git a/server/server-config/src/main/kotlin/suwayomi/tachidesk/server/ServerConfig.kt b/server/server-config/src/main/kotlin/suwayomi/tachidesk/server/ServerConfig.kt index 9c866137a..3e7acf37c 100644 --- a/server/server-config/src/main/kotlin/suwayomi/tachidesk/server/ServerConfig.kt +++ b/server/server-config/src/main/kotlin/suwayomi/tachidesk/server/ServerConfig.kt @@ -20,6 +20,7 @@ import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.asStateFlow +import kotlinx.coroutines.flow.conflate import kotlinx.coroutines.flow.distinctUntilChanged import kotlinx.coroutines.flow.drop import kotlinx.coroutines.flow.launchIn @@ -73,6 +74,21 @@ val serverConfig: ServerConfig by lazy { GlobalConfigManager.module() } private val application: Application by injectLazy() +@OptIn(ExperimentalCoroutinesApi::class) +fun subscribeTo( + flow: Flow, + ignoreInitialValue: Boolean = true, + onChange: suspend (value: T) -> Unit, +) { + val actualFlow = + if (ignoreInitialValue) { + flow.drop(1) + } else { + flow + } + actualFlow.distinctUntilChanged().conflate().onEach { onChange(it) }.launchIn(mutableConfigValueScope) +} + // Settings are ordered by "protoNumber". class ServerConfig( getConfig: () -> Config, @@ -1067,18 +1083,7 @@ class ServerConfig( flow: Flow, onChange: suspend (value: T) -> Unit, ignoreInitialValue: Boolean = true, - ) { - val actualFlow = - if (ignoreInitialValue) { - flow.drop(1) - } else { - flow - } - - val sharedFlow = MutableSharedFlow(extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST) - actualFlow.distinctUntilChanged().mapLatest { sharedFlow.emit(it) }.launchIn(mutableConfigValueScope) - sharedFlow.onEach { onChange(it) }.launchIn(mutableConfigValueScope) - } + ) = subscribeTo(flow, ignoreInitialValue, onChange) fun subscribeTo( flow: Flow, diff --git a/server/src/test/kotlin/suwayomi/tachidesk/FlowTest.kt b/server/src/test/kotlin/suwayomi/tachidesk/FlowTest.kt new file mode 100644 index 000000000..aa9f38278 --- /dev/null +++ b/server/src/test/kotlin/suwayomi/tachidesk/FlowTest.kt @@ -0,0 +1,26 @@ +package suwayomi.tachidesk + +import graphql.Assert.assertTrue +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.first +import kotlinx.coroutines.test.runTest +import suwayomi.tachidesk.server.subscribeTo +import java.util.concurrent.CountDownLatch +import java.util.concurrent.TimeUnit +import kotlin.test.Test + +class FlowTest { + @Test + fun subscribe() = + runTest { + (1..1000).forEach { _ -> + val testFlow = MutableStateFlow(value = 3) + testFlow.first() + val latch = CountDownLatch(1) + subscribeTo(testFlow, ignoreInitialValue = false) { _ -> + latch.countDown() + } + assertTrue(latch.await(5, TimeUnit.SECONDS)) + } + } +}