Emit only updater job changes instead of full status (#1302)

The update subscription emitted the full update status, which, depending on how big the status was, took forever because the graphql subscription does not support data loader batching, causing it to run into the n+1 problem
This commit is contained in:
schroda
2025-03-23 00:34:43 +01:00
committed by GitHub
parent 7d079a8728
commit 439e0c8284
10 changed files with 342 additions and 47 deletions

View File

@@ -3,14 +3,11 @@ package suwayomi.tachidesk.graphql.mutations
import graphql.execution.DataFetcherResult import graphql.execution.DataFetcherResult
import kotlinx.coroutines.flow.first import kotlinx.coroutines.flow.first
import kotlinx.coroutines.withTimeout import kotlinx.coroutines.withTimeout
import org.jetbrains.exposed.sql.selectAll
import org.jetbrains.exposed.sql.transactions.transaction
import suwayomi.tachidesk.graphql.asDataFetcherResult import suwayomi.tachidesk.graphql.asDataFetcherResult
import suwayomi.tachidesk.graphql.types.LibraryUpdateStatus
import suwayomi.tachidesk.graphql.types.UpdateStatus import suwayomi.tachidesk.graphql.types.UpdateStatus
import suwayomi.tachidesk.manga.impl.Category import suwayomi.tachidesk.manga.impl.Category
import suwayomi.tachidesk.manga.impl.update.IUpdater import suwayomi.tachidesk.manga.impl.update.IUpdater
import suwayomi.tachidesk.manga.model.table.CategoryTable
import suwayomi.tachidesk.manga.model.table.toDataClass
import suwayomi.tachidesk.server.JavalinSetup.future import suwayomi.tachidesk.server.JavalinSetup.future
import uy.kohesive.injekt.injectLazy import uy.kohesive.injekt.injectLazy
import java.util.concurrent.CompletableFuture import java.util.concurrent.CompletableFuture
@@ -19,6 +16,38 @@ import kotlin.time.Duration.Companion.seconds
class UpdateMutation { class UpdateMutation {
private val updater: IUpdater by injectLazy() private val updater: IUpdater by injectLazy()
data class UpdateLibraryInput(
val clientMutationId: String? = null,
val categories: List<Int>?,
)
data class UpdateLibraryPayload(
val clientMutationId: String? = null,
val updateStatus: LibraryUpdateStatus,
)
fun updateLibrary(input: UpdateLibraryInput): CompletableFuture<DataFetcherResult<UpdateLibraryPayload?>> {
updater.addCategoriesToUpdateQueue(
Category.getCategoryList().filter { input.categories?.contains(it.id) ?: true },
clear = true,
forceAll = !input.categories.isNullOrEmpty(),
)
return future {
asDataFetcherResult {
UpdateLibraryPayload(
input.clientMutationId,
updateStatus =
withTimeout(30.seconds) {
LibraryUpdateStatus(
updater.updates.first(),
)
},
)
}
}
}
data class UpdateLibraryMangaInput( data class UpdateLibraryMangaInput(
val clientMutationId: String? = null, val clientMutationId: String? = null,
) )
@@ -29,10 +58,11 @@ class UpdateMutation {
) )
fun updateLibraryManga(input: UpdateLibraryMangaInput): CompletableFuture<DataFetcherResult<UpdateLibraryMangaPayload?>> { fun updateLibraryManga(input: UpdateLibraryMangaInput): CompletableFuture<DataFetcherResult<UpdateLibraryMangaPayload?>> {
updater.addCategoriesToUpdateQueue( updateLibrary(
Category.getCategoryList(), UpdateLibraryInput(
clear = true, clientMutationId = input.clientMutationId,
forceAll = false, categories = null,
),
) )
return future { return future {
@@ -59,13 +89,12 @@ class UpdateMutation {
) )
fun updateCategoryManga(input: UpdateCategoryMangaInput): CompletableFuture<DataFetcherResult<UpdateCategoryMangaPayload?>> { fun updateCategoryManga(input: UpdateCategoryMangaInput): CompletableFuture<DataFetcherResult<UpdateCategoryMangaPayload?>> {
val categories = updateLibrary(
transaction { UpdateLibraryInput(
CategoryTable.selectAll().where { CategoryTable.id inList input.categories }.map { clientMutationId = input.clientMutationId,
CategoryTable.toDataClass(it) categories = input.categories,
} ),
} )
updater.addCategoriesToUpdateQueue(categories, clear = true, forceAll = true)
return future { return future {
asDataFetcherResult { asDataFetcherResult {

View File

@@ -1,6 +1,8 @@
package suwayomi.tachidesk.graphql.queries package suwayomi.tachidesk.graphql.queries
import com.expediagroup.graphql.generator.annotations.GraphQLDeprecated
import kotlinx.coroutines.flow.first import kotlinx.coroutines.flow.first
import suwayomi.tachidesk.graphql.types.LibraryUpdateStatus
import suwayomi.tachidesk.graphql.types.UpdateStatus import suwayomi.tachidesk.graphql.types.UpdateStatus
import suwayomi.tachidesk.manga.impl.update.IUpdater import suwayomi.tachidesk.manga.impl.update.IUpdater
import suwayomi.tachidesk.server.JavalinSetup.future import suwayomi.tachidesk.server.JavalinSetup.future
@@ -10,8 +12,11 @@ import java.util.concurrent.CompletableFuture
class UpdateQuery { class UpdateQuery {
private val updater: IUpdater by injectLazy() private val updater: IUpdater by injectLazy()
@GraphQLDeprecated("Replaced with libraryUpdateStatus", ReplaceWith("libraryUpdateStatus"))
fun updateStatus(): CompletableFuture<UpdateStatus> = future { UpdateStatus(updater.status.first()) } fun updateStatus(): CompletableFuture<UpdateStatus> = future { UpdateStatus(updater.status.first()) }
fun libraryUpdateStatus(): CompletableFuture<LibraryUpdateStatus> = future { LibraryUpdateStatus(updater.getStatus()) }
data class LastUpdateTimestampPayload( data class LastUpdateTimestampPayload(
val timestamp: Long, val timestamp: Long,
) )

View File

@@ -16,7 +16,7 @@ import suwayomi.tachidesk.graphql.types.DownloadUpdates
import suwayomi.tachidesk.manga.impl.download.DownloadManager import suwayomi.tachidesk.manga.impl.download.DownloadManager
class DownloadSubscription { class DownloadSubscription {
@GraphQLDeprecated("Replaced width downloadStatusChanged", ReplaceWith("downloadStatusChanged(input)")) @GraphQLDeprecated("Replaced with downloadStatusChanged", ReplaceWith("downloadStatusChanged(input)"))
fun downloadChanged(): Flow<DownloadStatus> = fun downloadChanged(): Flow<DownloadStatus> =
DownloadManager.status.map { downloadStatus -> DownloadManager.status.map { downloadStatus ->
DownloadStatus(downloadStatus) DownloadStatus(downloadStatus)

View File

@@ -7,17 +7,68 @@
package suwayomi.tachidesk.graphql.subscriptions package suwayomi.tachidesk.graphql.subscriptions
import com.expediagroup.graphql.generator.annotations.GraphQLDeprecated
import com.expediagroup.graphql.generator.annotations.GraphQLDescription
import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.map
import suwayomi.tachidesk.graphql.types.UpdateStatus import suwayomi.tachidesk.graphql.types.UpdateStatus
import suwayomi.tachidesk.graphql.types.UpdaterUpdates
import suwayomi.tachidesk.manga.impl.update.IUpdater import suwayomi.tachidesk.manga.impl.update.IUpdater
import suwayomi.tachidesk.manga.impl.update.UpdateUpdates
import uy.kohesive.injekt.injectLazy import uy.kohesive.injekt.injectLazy
class UpdateSubscription { class UpdateSubscription {
private val updater: IUpdater by injectLazy() private val updater: IUpdater by injectLazy()
@GraphQLDeprecated("Replaced with updates", ReplaceWith("updates(input)"))
fun updateStatusChanged(): Flow<UpdateStatus> = fun updateStatusChanged(): Flow<UpdateStatus> =
updater.status.map { updateStatus -> updater.status.map { updateStatus ->
UpdateStatus(updateStatus) UpdateStatus(updateStatus)
} }
data class LibraryUpdateStatusChangedInput(
@GraphQLDescription(
"Sets a max number of updates that can be contained in a updater update message." +
"Everything above this limit will be omitted and the \"updateStatus\" should be re-fetched via the " +
"corresponding query. Due to the graphql subscription execution strategy not supporting batching for data loaders, " +
"the data loaders run into the n+1 problem, which can cause the server to get unresponsive until the status " +
"update has been handled. This is an issue e.g. when starting an update.",
)
val maxUpdates: Int?,
)
fun libraryUpdateStatusChanged(input: LibraryUpdateStatusChangedInput): Flow<UpdaterUpdates> {
val omitUpdates = input.maxUpdates != null
val maxUpdates = input.maxUpdates ?: 50
return updater.updates.map { updates ->
val categoryUpdatesCount = updates.categoryUpdates.size
val mangaUpdatesCount = updates.mangaUpdates.size
val totalUpdatesCount = categoryUpdatesCount + mangaUpdatesCount
val needToOmitUpdates = omitUpdates && totalUpdatesCount > maxUpdates
if (!needToOmitUpdates) {
return@map UpdaterUpdates(updates, omittedUpdates = false)
}
val maxUpdatesAfterCategoryUpdates = (maxUpdates - categoryUpdatesCount).coerceAtLeast(0)
// the graphql subscription execution strategy does not support data loader batching which causes the n+1 problem,
// thus, too many updates (e.g. on mass enqueue or dequeue) causes unresponsiveness of the server until the
// update has been handled
UpdaterUpdates(
UpdateUpdates(
updates.isRunning,
updates.categoryUpdates.subList(0, maxUpdates),
updates.mangaUpdates.subList(0, maxUpdatesAfterCategoryUpdates),
updates.totalJobs,
updates.finishedJobs,
updates.skippedCategoriesCount,
updates.skippedMangasCount,
updates.initial,
),
omittedUpdates = true,
)
}
}
} }

View File

@@ -15,6 +15,7 @@ import suwayomi.tachidesk.graphql.server.primitives.Edge
import suwayomi.tachidesk.graphql.server.primitives.Node import suwayomi.tachidesk.graphql.server.primitives.Node
import suwayomi.tachidesk.graphql.server.primitives.NodeList import suwayomi.tachidesk.graphql.server.primitives.NodeList
import suwayomi.tachidesk.graphql.server.primitives.PageInfo import suwayomi.tachidesk.graphql.server.primitives.PageInfo
import suwayomi.tachidesk.manga.model.dataclass.CategoryDataClass
import suwayomi.tachidesk.manga.model.dataclass.IncludeOrExclude import suwayomi.tachidesk.manga.model.dataclass.IncludeOrExclude
import suwayomi.tachidesk.manga.model.table.CategoryTable import suwayomi.tachidesk.manga.model.table.CategoryTable
import java.util.concurrent.CompletableFuture import java.util.concurrent.CompletableFuture
@@ -36,6 +37,15 @@ class CategoryType(
IncludeOrExclude.fromValue(row[CategoryTable.includeInDownload]), IncludeOrExclude.fromValue(row[CategoryTable.includeInDownload]),
) )
constructor(dataClass: CategoryDataClass) : this(
dataClass.id,
dataClass.order,
dataClass.name,
dataClass.default,
dataClass.includeInUpdate,
dataClass.includeInDownload,
)
fun mangas(dataFetchingEnvironment: DataFetchingEnvironment): CompletableFuture<MangaNodeList> = fun mangas(dataFetchingEnvironment: DataFetchingEnvironment): CompletableFuture<MangaNodeList> =
dataFetchingEnvironment.getValueFromDataLoader<Int, MangaNodeList>("MangaForCategoryDataLoader", id) dataFetchingEnvironment.getValueFromDataLoader<Int, MangaNodeList>("MangaForCategoryDataLoader", id)

View File

@@ -1,11 +1,15 @@
package suwayomi.tachidesk.graphql.types package suwayomi.tachidesk.graphql.types
import com.expediagroup.graphql.generator.annotations.GraphQLDescription
import com.expediagroup.graphql.generator.annotations.GraphQLIgnore import com.expediagroup.graphql.generator.annotations.GraphQLIgnore
import com.expediagroup.graphql.server.extensions.getValueFromDataLoader import com.expediagroup.graphql.server.extensions.getValueFromDataLoader
import graphql.schema.DataFetchingEnvironment import graphql.schema.DataFetchingEnvironment
import suwayomi.tachidesk.manga.impl.update.CategoryUpdateJob
import suwayomi.tachidesk.manga.impl.update.CategoryUpdateStatus import suwayomi.tachidesk.manga.impl.update.CategoryUpdateStatus
import suwayomi.tachidesk.manga.impl.update.JobStatus import suwayomi.tachidesk.manga.impl.update.JobStatus
import suwayomi.tachidesk.manga.impl.update.UpdateJob
import suwayomi.tachidesk.manga.impl.update.UpdateStatus import suwayomi.tachidesk.manga.impl.update.UpdateStatus
import suwayomi.tachidesk.manga.impl.update.UpdateUpdates
import java.util.concurrent.CompletableFuture import java.util.concurrent.CompletableFuture
private val jobStatusToMangaIdsToCacheClearedStatus = mutableMapOf<JobStatus, MutableMap<Int, Boolean>>() private val jobStatusToMangaIdsToCacheClearedStatus = mutableMapOf<JobStatus, MutableMap<Int, Boolean>>()
@@ -47,14 +51,6 @@ class UpdateStatus(
) )
} }
class UpdateStatusCategoryType(
@get:GraphQLIgnore
val categoryIds: List<Int>,
) {
fun categories(dataFetchingEnvironment: DataFetchingEnvironment): CompletableFuture<CategoryNodeList> =
dataFetchingEnvironment.getValueFromDataLoader("CategoryForIdsDataLoader", categoryIds)
}
class UpdateStatusType( class UpdateStatusType(
@get:GraphQLIgnore @get:GraphQLIgnore
val mangaIds: List<Int>, val mangaIds: List<Int>,
@@ -85,6 +81,115 @@ class UpdateStatusType(
} }
} }
return dataFetchingEnvironment.getValueFromDataLoader<List<Int>, MangaNodeList>("MangaForIdsDataLoader", mangaIds) return dataFetchingEnvironment.getValueFromDataLoader<List<Int>, MangaNodeList>(
"MangaForIdsDataLoader",
mangaIds,
)
} }
} }
class UpdateStatusCategoryType(
@get:GraphQLIgnore
val categoryIds: List<Int>,
) {
fun categories(dataFetchingEnvironment: DataFetchingEnvironment): CompletableFuture<CategoryNodeList> =
dataFetchingEnvironment.getValueFromDataLoader("CategoryForIdsDataLoader", categoryIds)
}
class LibraryUpdateStatus(
val categoryUpdates: List<CategoryUpdateType>,
val mangaUpdates: List<MangaUpdateType>,
val jobsInfo: UpdaterJobsInfoType,
) {
constructor(updates: UpdateUpdates) : this(
categoryUpdates = updates.categoryUpdates.map(::CategoryUpdateType),
mangaUpdates = updates.mangaUpdates.map(::MangaUpdateType),
jobsInfo =
UpdaterJobsInfoType(
isRunning = updates.isRunning,
totalJobs = updates.totalJobs,
finishedJobs = updates.finishedJobs,
skippedCategoriesCount = updates.skippedCategoriesCount,
skippedMangasCount = updates.skippedMangasCount,
),
)
}
enum class MangaJobStatus {
PENDING,
RUNNING,
COMPLETE,
FAILED,
SKIPPED,
}
enum class CategoryJobStatus {
UPDATING,
SKIPPED,
}
class MangaUpdateType(
val manga: MangaType,
val status: MangaJobStatus,
) {
constructor(job: UpdateJob) : this(
MangaType(job.manga),
when (job.status) {
JobStatus.PENDING -> MangaJobStatus.PENDING
JobStatus.RUNNING -> MangaJobStatus.RUNNING
JobStatus.COMPLETE -> MangaJobStatus.COMPLETE
JobStatus.FAILED -> MangaJobStatus.FAILED
JobStatus.SKIPPED -> MangaJobStatus.SKIPPED
},
)
}
class CategoryUpdateType(
val category: CategoryType,
val status: CategoryJobStatus,
) {
constructor(job: CategoryUpdateJob) : this(
CategoryType(job.category),
when (job.status) {
CategoryUpdateStatus.UPDATING -> CategoryJobStatus.UPDATING
CategoryUpdateStatus.SKIPPED -> CategoryJobStatus.SKIPPED
},
)
}
// wrap this info in a data class so that the update subscription updates the date of the update status in the clients cache
data class UpdaterJobsInfoType(
val isRunning: Boolean,
val totalJobs: Int,
val finishedJobs: Int,
val skippedCategoriesCount: Int,
val skippedMangasCount: Int,
)
data class UpdaterUpdates(
val categoryUpdates: List<CategoryUpdateType>,
val mangaUpdates: List<MangaUpdateType>,
@GraphQLDescription("The current update status at the time of sending the initial message. Is null for all following messages")
val initial: LibraryUpdateStatus?,
val jobsInfo: UpdaterJobsInfoType,
@GraphQLDescription(
"Indicates whether updates have been omitted based on the \"maxUpdates\" subscription variable. " +
"In case updates have been omitted, the \"updateStatus\" query should be re-fetched.",
)
val omittedUpdates: Boolean,
) {
constructor(updates: UpdateUpdates, omittedUpdates: Boolean) : this(
categoryUpdates = updates.categoryUpdates.map(::CategoryUpdateType),
mangaUpdates = updates.mangaUpdates.map(::MangaUpdateType),
initial = updates.initial?.let { LibraryUpdateStatus(updates.initial) },
jobsInfo =
UpdaterJobsInfoType(
isRunning = updates.isRunning,
totalJobs = updates.totalJobs,
finishedJobs = updates.finishedJobs,
skippedCategoriesCount = updates.skippedCategoriesCount,
skippedMangasCount = updates.skippedMangasCount,
),
omittedUpdates = omittedUpdates,
)
}

View File

@@ -16,9 +16,14 @@ interface IUpdater {
fun addMangasToQueue(mangas: List<MangaDataClass>) fun addMangasToQueue(mangas: List<MangaDataClass>)
@Deprecated("Replaced with updates", replaceWith = ReplaceWith("updates"))
val status: Flow<UpdateStatus> val status: Flow<UpdateStatus>
val updates: Flow<UpdateUpdates>
val statusDeprecated: StateFlow<UpdateStatus> val statusDeprecated: StateFlow<UpdateStatus>
fun reset() fun reset()
fun getStatus(): UpdateUpdates
} }

View File

@@ -1,5 +1,6 @@
package suwayomi.tachidesk.manga.impl.update package suwayomi.tachidesk.manga.impl.update
import suwayomi.tachidesk.manga.model.dataclass.CategoryDataClass
import suwayomi.tachidesk.manga.model.dataclass.MangaDataClass import suwayomi.tachidesk.manga.model.dataclass.MangaDataClass
enum class JobStatus { enum class JobStatus {
@@ -14,3 +15,13 @@ data class UpdateJob(
val manga: MangaDataClass, val manga: MangaDataClass,
val status: JobStatus = JobStatus.PENDING, val status: JobStatus = JobStatus.PENDING,
) )
enum class CategoryUpdateStatus {
UPDATING,
SKIPPED,
}
data class CategoryUpdateJob(
val category: CategoryDataClass,
val status: CategoryUpdateStatus,
)

View File

@@ -4,11 +4,6 @@ import com.fasterxml.jackson.annotation.JsonIgnore
import suwayomi.tachidesk.manga.model.dataclass.CategoryDataClass import suwayomi.tachidesk.manga.model.dataclass.CategoryDataClass
import suwayomi.tachidesk.manga.model.dataclass.MangaDataClass import suwayomi.tachidesk.manga.model.dataclass.MangaDataClass
enum class CategoryUpdateStatus {
UPDATING,
SKIPPED,
}
data class UpdateStatus( data class UpdateStatus(
val categoryStatusMap: Map<CategoryUpdateStatus, List<CategoryDataClass>> = emptyMap(), val categoryStatusMap: Map<CategoryUpdateStatus, List<CategoryDataClass>> = emptyMap(),
val mangaStatusMap: Map<JobStatus, List<MangaDataClass>> = emptyMap(), val mangaStatusMap: Map<JobStatus, List<MangaDataClass>> = emptyMap(),
@@ -33,3 +28,14 @@ data class UpdateStatus(
numberOfJobs = jobs.size, numberOfJobs = jobs.size,
) )
} }
data class UpdateUpdates(
val isRunning: Boolean = false,
val categoryUpdates: List<CategoryUpdateJob>,
val mangaUpdates: List<UpdateJob>,
val totalJobs: Int,
val finishedJobs: Int,
val skippedCategoriesCount: Int,
val skippedMangasCount: Int,
val initial: UpdateUpdates?,
)

View File

@@ -54,8 +54,14 @@ class Updater : IUpdater {
private val notifyFlow = MutableSharedFlow<Unit>(extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST) private val notifyFlow = MutableSharedFlow<Unit>(extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)
@Deprecated("Replaced with updatesFlow", replaceWith = ReplaceWith("updatesFlow"))
private val statusFlow = MutableSharedFlow<UpdateStatus>() private val statusFlow = MutableSharedFlow<UpdateStatus>()
override val status = statusFlow.onStart { emit(getStatus()) }
@Deprecated("Replaced with updates", replaceWith = ReplaceWith("updates"))
override val status = statusFlow.onStart { emit(getStatusDeprecated(null)) }
private val updatesFlow = MutableSharedFlow<UpdateUpdates>()
override val updates = updatesFlow.onStart { emit(getUpdates(addInitial = true)) }
init { init {
// has to be in its own scope (notifyFlowScope), otherwise, the collection gets canceled due to canceling the scopes (scope) children in the reset function // has to be in its own scope (notifyFlowScope), otherwise, the collection gets canceled due to canceling the scopes (scope) children in the reset function
@@ -69,8 +75,11 @@ class Updater : IUpdater {
private val _status = MutableStateFlow(UpdateStatus()) private val _status = MutableStateFlow(UpdateStatus())
override val statusDeprecated = _status.asStateFlow() override val statusDeprecated = _status.asStateFlow()
private val mangaUpdates = ConcurrentHashMap<Int, UpdateJob>()
private val categoryUpdates = ConcurrentHashMap<Int, CategoryUpdateJob>()
private var updateStatusCategories: Map<CategoryUpdateStatus, List<CategoryDataClass>> = emptyMap() private var updateStatusCategories: Map<CategoryUpdateStatus, List<CategoryDataClass>> = emptyMap()
private var updateStatusSkippedMangas: List<MangaDataClass> = emptyList() private var updateStatusSkippedMangas: List<MangaDataClass> = emptyList()
private val tracker = ConcurrentHashMap<Int, UpdateJob>() private val tracker = ConcurrentHashMap<Int, UpdateJob>()
private val updateChannels = ConcurrentHashMap<String, Channel<UpdateJob>>() private val updateChannels = ConcurrentHashMap<String, Channel<UpdateJob>>()
@@ -112,7 +121,7 @@ class Updater : IUpdater {
val lastAutomatedUpdate = preferences.getLong(lastAutomatedUpdateKey, 0) val lastAutomatedUpdate = preferences.getLong(lastAutomatedUpdateKey, 0)
preferences.edit().putLong(lastAutomatedUpdateKey, System.currentTimeMillis()).apply() preferences.edit().putLong(lastAutomatedUpdateKey, System.currentTimeMillis()).apply()
if (getStatus().running) { if (getStatus().isRunning) {
logger.debug { "Global update is already in progress" } logger.debug { "Global update is already in progress" }
return return
} }
@@ -157,28 +166,78 @@ class Updater : IUpdater {
HAScheduler.schedule(::autoUpdateTask, updateInterval, timeToNextExecution, "global-update") HAScheduler.schedule(::autoUpdateTask, updateInterval, timeToNextExecution, "global-update")
} }
private fun getStatus(running: Boolean? = null): UpdateStatus { private fun isRunning(): Boolean =
tracker.values.toList().any { job -> job.status == JobStatus.PENDING || job.status == JobStatus.RUNNING }
// old status that is still required for the deprecated endpoints
private fun getStatusDeprecated(running: Boolean? = null): UpdateStatus {
val jobs = tracker.values.toList() val jobs = tracker.values.toList()
val isRunning = val isRunning = running ?: isRunning()
running
?: jobs.any { job ->
job.status == JobStatus.PENDING || job.status == JobStatus.RUNNING
}
return UpdateStatus(this.updateStatusCategories, jobs, this.updateStatusSkippedMangas, isRunning) return UpdateStatus(this.updateStatusCategories, jobs, this.updateStatusSkippedMangas, isRunning)
} }
private fun getStatus(
categories: List<CategoryUpdateJob>,
mangas: List<UpdateJob>,
running: Boolean? = null,
addInitial: Boolean? = false,
): UpdateUpdates =
UpdateUpdates(
running ?: isRunning(),
categories,
mangas,
tracker.size,
tracker.values.count { it.status == JobStatus.COMPLETE || it.status == JobStatus.FAILED },
this.updateStatusCategories[CategoryUpdateStatus.SKIPPED]?.size ?: 0,
this.updateStatusSkippedMangas.size,
if (addInitial == true) getStatus() else null,
)
override fun getStatus(): UpdateUpdates =
getStatus(
this.updateStatusCategories[CategoryUpdateStatus.UPDATING]
?.map {
CategoryUpdateJob(
it,
CategoryUpdateStatus.UPDATING,
)
}.orEmpty(),
tracker.values.toList(),
)
private fun getUpdates(
running: Boolean? = null,
addInitial: Boolean? = null,
): UpdateUpdates =
getStatus(
categoryUpdates.values.toList(),
mangaUpdates.values.toList(),
running,
addInitial = addInitial,
)
/** /**
* Pass "isRunning" to force a specific running state * Pass "isRunning" to force a specific running state
*/ */
private suspend fun updateStatus( private suspend fun updateStatus(
immediate: Boolean = false, immediate: Boolean = false,
categoryUpdates: List<CategoryUpdateJob> = emptyList(),
mangaUpdates: List<UpdateJob> = emptyList(),
isRunning: Boolean? = null, isRunning: Boolean? = null,
) { ) {
mangaUpdates.forEach { this.mangaUpdates[it.manga.id] = it }
categoryUpdates.forEach { this.categoryUpdates[it.category.id] = it }
if (immediate) { if (immediate) {
val status = getStatus(running = isRunning) val status = getStatusDeprecated(running = isRunning)
val updates = getUpdates(isRunning)
this.mangaUpdates.clear()
this.categoryUpdates.clear()
statusFlow.emit(status) statusFlow.emit(status)
_status.update { status } _status.update { status }
updatesFlow.emit(updates)
return return
} }
@@ -217,18 +276,15 @@ class Updater : IUpdater {
} }
// fail all updates for source // fail all updates for source
tracker val sourceUpdateJobs = tracker.filter { (_, job) -> !isFailedSourceUpdate(job) }
.filter { (_, job) -> !isFailedSourceUpdate(job) } sourceUpdateJobs.forEach { (mangaId, job) -> tracker[mangaId] = job.copy(status = JobStatus.FAILED) }
.forEach { (mangaId, job) ->
tracker[mangaId] = job.copy(status = JobStatus.FAILED)
}
updateStatus() updateStatus(mangaUpdates = sourceUpdateJobs.values.toList())
} }
private suspend fun process(job: UpdateJob) { private suspend fun process(job: UpdateJob) {
tracker[job.manga.id] = job.copy(status = JobStatus.RUNNING) tracker[job.manga.id] = job.copy(status = JobStatus.RUNNING)
updateStatus() updateStatus(mangaUpdates = listOf(tracker[job.manga.id]!!))
tracker[job.manga.id] = tracker[job.manga.id] =
try { try {
@@ -248,7 +304,7 @@ class Updater : IUpdater {
// in case this is the last update job, the running flag has to be true, before it gets set to false, to be able // in case this is the last update job, the running flag has to be true, before it gets set to false, to be able
// to properly clear the dataloader store in UpdateType // to properly clear the dataloader store in UpdateType
updateStatus(immediate = wasLastJob, isRunning = true) updateStatus(immediate = wasLastJob, isRunning = true, mangaUpdates = listOf(tracker[job.manga.id]!!))
if (wasLastJob) { if (wasLastJob) {
updateStatus(isRunning = false) updateStatus(isRunning = false)
@@ -328,6 +384,18 @@ class Updater : IUpdater {
return return
} }
scope.launch {
updateStatus(
categoryUpdates =
updateStatusCategories[CategoryUpdateStatus.UPDATING]
?.map {
CategoryUpdateJob(it, CategoryUpdateStatus.UPDATING)
}.orEmpty(),
mangaUpdates = mangasToUpdate.map { UpdateJob(it) },
isRunning = true,
)
}
addMangasToQueue( addMangasToQueue(
mangasToUpdate mangasToUpdate
.sortedWith(compareBy(String.CASE_INSENSITIVE_ORDER, MangaDataClass::title)), .sortedWith(compareBy(String.CASE_INSENSITIVE_ORDER, MangaDataClass::title)),
@@ -350,12 +418,17 @@ class Updater : IUpdater {
override fun reset() { override fun reset() {
scope.coroutineContext.cancelChildren() scope.coroutineContext.cancelChildren()
tracker.clear() tracker.clear()
this.mangaUpdates.clear()
this.categoryUpdates.clear()
this.updateStatusCategories = emptyMap() this.updateStatusCategories = emptyMap()
this.updateStatusSkippedMangas = emptyList() this.updateStatusSkippedMangas = emptyList()
scope.launch { scope.launch {
updateStatus(immediate = true, isRunning = false) updateStatus(immediate = true, isRunning = false)
} }
updateChannels.forEach { (_, channel) -> channel.cancel() } updateChannels.forEach { (_, channel) -> channel.cancel() }
updateChannels.clear() updateChannels.clear()
} }