mirror of
https://github.com/Suwayomi/Suwayomi-Server.git
synced 2026-07-04 11:24:35 -05:00
Implement SyncYomi (#1813)
* Implement SyncYomi
* Add ability to select what to sync
* Properly fix default category bug
* Add periodic sync
* Add PostgreSQL support
* Deschedule previous task
* Check if SyncYomi is enabled in syncData function
* Don't allow multiple syncs at the same time
* Convert SyncYomiSyncService to object
* Make startSync non-suspend
* Return a result from startSync
* Sync before library update
* Improvements
* Use NetworkHelper client
* Lint
* Use measureTime
* Database improvements
- Move entire sync operation into a single transaction
- Stop loading all manga to memory
* Revert "Database improvements"
This reverts commit bee8d214c3.
* Actual database improvements
* Remove runBlocking
* Remove title check
* Update updateNonFavorites function
* Update timeout code
* Improve PostgreSQL query
* Create lastSyncState variable
* Create lastSyncStatus query
* Convert lastSyncState to StateFlow
* Create lastSyncStatusChange subscription
* Replace backupRestoreStatus with backupRestoreId
* Add startDate and endDate
* Add logs for sync start and end
* Handle all errors in syncData
* Change category restore function to match Mihon's behavior
* Fix comment
* Remove duplicate BackupMangaHandler.backup call
* Remove duplicated log
* Rename subscription to syncStatusChanged
* Use same flags for restoring
* Update syncInterval config to use DurationSetting
* Update sync scheduling logic
* Reorder conditions to reduce database calls
* Prevent deleted ghost chapters from reappearing during sync
jobobby04/TachiyomiSY#1575
* Improve sync merging categories
jobobby04/TachiyomiSY#1559
* Make columns not null
* Improve H2 triggers
* Add documentation
This commit is contained in:
@@ -0,0 +1,519 @@
|
||||
package suwayomi.tachidesk.global.impl.sync
|
||||
|
||||
import android.app.Application
|
||||
import android.content.Context
|
||||
import io.github.oshai.kotlinlogging.KotlinLogging
|
||||
import kotlinx.coroutines.DelicateCoroutinesApi
|
||||
import kotlinx.coroutines.GlobalScope
|
||||
import kotlinx.coroutines.flow.MutableStateFlow
|
||||
import kotlinx.coroutines.flow.StateFlow
|
||||
import kotlinx.coroutines.flow.asStateFlow
|
||||
import kotlinx.coroutines.flow.combine
|
||||
import kotlinx.coroutines.flow.first
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.coroutines.sync.Mutex
|
||||
import kotlinx.coroutines.sync.withLock
|
||||
import kotlinx.serialization.Serializable
|
||||
import kotlinx.serialization.protobuf.ProtoBuf
|
||||
import org.jetbrains.exposed.v1.core.and
|
||||
import org.jetbrains.exposed.v1.core.eq
|
||||
import org.jetbrains.exposed.v1.jdbc.selectAll
|
||||
import org.jetbrains.exposed.v1.jdbc.transactions.transaction
|
||||
import org.jetbrains.exposed.v1.jdbc.update
|
||||
import suwayomi.tachidesk.graphql.types.StartSyncResult
|
||||
import suwayomi.tachidesk.manga.impl.Category
|
||||
import suwayomi.tachidesk.manga.impl.Library.handleMangaThumbnail
|
||||
import suwayomi.tachidesk.manga.impl.backup.BackupFlags
|
||||
import suwayomi.tachidesk.manga.impl.backup.proto.ProtoBackupImport
|
||||
import suwayomi.tachidesk.manga.impl.backup.proto.handlers.BackupCategoryHandler
|
||||
import suwayomi.tachidesk.manga.impl.backup.proto.handlers.BackupMangaHandler
|
||||
import suwayomi.tachidesk.manga.impl.backup.proto.handlers.BackupSourceHandler
|
||||
import suwayomi.tachidesk.manga.impl.backup.proto.models.Backup
|
||||
import suwayomi.tachidesk.manga.impl.backup.proto.models.BackupChapter
|
||||
import suwayomi.tachidesk.manga.impl.backup.proto.models.BackupManga
|
||||
import suwayomi.tachidesk.manga.model.dataclass.ChapterDataClass
|
||||
import suwayomi.tachidesk.manga.model.dataclass.MangaDataClass
|
||||
import suwayomi.tachidesk.manga.model.table.CategoryMangaTable
|
||||
import suwayomi.tachidesk.manga.model.table.CategoryTable
|
||||
import suwayomi.tachidesk.manga.model.table.ChapterTable
|
||||
import suwayomi.tachidesk.manga.model.table.MangaTable
|
||||
import suwayomi.tachidesk.manga.model.table.toDataClass
|
||||
import suwayomi.tachidesk.server.serverConfig
|
||||
import suwayomi.tachidesk.util.HAScheduler
|
||||
import uy.kohesive.injekt.Injekt
|
||||
import uy.kohesive.injekt.api.get
|
||||
import kotlin.time.Clock
|
||||
import kotlin.time.Duration.Companion.seconds
|
||||
import kotlin.time.Instant
|
||||
import kotlin.time.measureTime
|
||||
|
||||
@Serializable
|
||||
data class SyncData(
|
||||
val backup: Backup? = null,
|
||||
)
|
||||
|
||||
object SyncManager {
|
||||
private val syncPreferences = Injekt.get<Application>().getSharedPreferences("sync", Context.MODE_PRIVATE)
|
||||
private val logger = KotlinLogging.logger {}
|
||||
|
||||
private var currentTaskId: String? = null
|
||||
private val syncMutex = Mutex()
|
||||
|
||||
private val _lastSyncState: MutableStateFlow<SyncState?> = MutableStateFlow(null)
|
||||
val lastSyncState: StateFlow<SyncState?> = _lastSyncState.asStateFlow()
|
||||
|
||||
@OptIn(DelicateCoroutinesApi::class)
|
||||
fun scheduleSyncTask() {
|
||||
serverConfig.subscribeTo(
|
||||
combine(
|
||||
serverConfig.syncYomiEnabled,
|
||||
serverConfig.syncInterval,
|
||||
) { enabled, interval -> Pair(enabled, interval) },
|
||||
{ (enabled, interval) ->
|
||||
currentTaskId?.let { HAScheduler.deschedule(it) }
|
||||
|
||||
currentTaskId =
|
||||
if (enabled && interval > 0.seconds) {
|
||||
val lastSyncDate =
|
||||
syncPreferences
|
||||
.getLong("last_scheduled_sync", 0L)
|
||||
.takeIf { it != 0L }
|
||||
?.let { Instant.fromEpochMilliseconds(it) }
|
||||
|
||||
if (lastSyncDate == null) {
|
||||
syncPreferences
|
||||
.edit()
|
||||
.putLong("last_scheduled_sync", Clock.System.now().toEpochMilliseconds())
|
||||
.apply()
|
||||
}
|
||||
|
||||
val delay =
|
||||
if (lastSyncDate != null) {
|
||||
((interval) - (Clock.System.now() - lastSyncDate)).coerceAtLeast(0.seconds)
|
||||
} else {
|
||||
interval
|
||||
}
|
||||
|
||||
HAScheduler.schedule(
|
||||
{
|
||||
startSync(periodic = true)
|
||||
|
||||
syncPreferences
|
||||
.edit()
|
||||
.putLong("last_scheduled_sync", Clock.System.now().toEpochMilliseconds())
|
||||
.apply()
|
||||
},
|
||||
interval = interval.inWholeMilliseconds,
|
||||
delay = delay.inWholeMilliseconds,
|
||||
name = "sync",
|
||||
)
|
||||
} else {
|
||||
syncPreferences
|
||||
.edit()
|
||||
.remove("last_scheduled_sync")
|
||||
.apply()
|
||||
null
|
||||
}
|
||||
},
|
||||
ignoreInitialValue = false,
|
||||
)
|
||||
}
|
||||
|
||||
@OptIn(DelicateCoroutinesApi::class)
|
||||
fun startSync(periodic: Boolean = false): StartSyncResult {
|
||||
if (!serverConfig.syncYomiEnabled.value) {
|
||||
return StartSyncResult.SYNC_DISABLED
|
||||
}
|
||||
|
||||
if (!syncMutex.tryLock()) {
|
||||
return StartSyncResult.SYNC_IN_PROGRESS
|
||||
}
|
||||
|
||||
GlobalScope.launch {
|
||||
try {
|
||||
syncData(periodic)
|
||||
} finally {
|
||||
syncMutex.unlock()
|
||||
}
|
||||
}
|
||||
|
||||
return StartSyncResult.SUCCESS
|
||||
}
|
||||
|
||||
suspend fun ensureSync() {
|
||||
if (!serverConfig.syncYomiEnabled.value) {
|
||||
return
|
||||
}
|
||||
|
||||
if (syncMutex.tryLock()) {
|
||||
// there is no ongoing sync, so start one
|
||||
try {
|
||||
syncData()
|
||||
} finally {
|
||||
syncMutex.unlock()
|
||||
}
|
||||
} else {
|
||||
// wait for the ongoing sync to finish
|
||||
syncMutex.withLock {}
|
||||
}
|
||||
}
|
||||
|
||||
private suspend fun syncData(periodic: Boolean = false) {
|
||||
val startInstant = Clock.System.now()
|
||||
_lastSyncState.value = SyncState.Started(startInstant)
|
||||
|
||||
try {
|
||||
logger.info {
|
||||
if (periodic) {
|
||||
"Starting periodic sync"
|
||||
} else {
|
||||
"Starting manual sync"
|
||||
}
|
||||
}
|
||||
|
||||
transaction {
|
||||
MangaTable.update({ MangaTable.isSyncing eq true }) {
|
||||
it[isSyncing] = false
|
||||
}
|
||||
ChapterTable.update({ ChapterTable.isSyncing eq true }) {
|
||||
it[isSyncing] = false
|
||||
}
|
||||
CategoryTable.update({ CategoryTable.isSyncing eq true }) {
|
||||
it[isSyncing] = false
|
||||
}
|
||||
}
|
||||
|
||||
val backupFlags =
|
||||
BackupFlags(
|
||||
includeManga = serverConfig.syncDataManga.value,
|
||||
includeCategories = serverConfig.syncDataCategories.value,
|
||||
includeChapters = serverConfig.syncDataChapters.value,
|
||||
includeTracking = serverConfig.syncDataTracking.value,
|
||||
includeHistory = serverConfig.syncDataHistory.value,
|
||||
includeClientData = false,
|
||||
includeServerSettings = false,
|
||||
)
|
||||
|
||||
_lastSyncState.value = SyncState.CreatingBackup(startInstant)
|
||||
val backupMangas = BackupMangaHandler.backup(backupFlags)
|
||||
val backup =
|
||||
Backup(
|
||||
backupMangas,
|
||||
BackupCategoryHandler.backup(backupFlags).filter { it.name != Category.DEFAULT_CATEGORY_NAME },
|
||||
BackupSourceHandler.backup(backupMangas, backupFlags),
|
||||
emptyMap(),
|
||||
null,
|
||||
)
|
||||
|
||||
val syncData =
|
||||
SyncData(
|
||||
backup = backup,
|
||||
)
|
||||
|
||||
val remoteBackup =
|
||||
SyncYomiSyncService.doSync(syncData, startInstant) {
|
||||
_lastSyncState.value = it
|
||||
}
|
||||
|
||||
if (remoteBackup == null) {
|
||||
logger.debug { "Skip restore due to network issues" }
|
||||
finishWithError(startInstant, "Network error", periodic)
|
||||
return
|
||||
}
|
||||
|
||||
if (remoteBackup === syncData.backup) {
|
||||
// nothing changed
|
||||
logger.debug { "Skip restore due to remote was overwrite from local" }
|
||||
finishWithSuccess(startInstant, periodic)
|
||||
return
|
||||
}
|
||||
|
||||
// Stop the sync early if the remote backup is null or empty
|
||||
if (remoteBackup.backupManga.isEmpty() && remoteBackup.backupCategories.isEmpty() && remoteBackup.backupSources.isEmpty()) {
|
||||
logger.error { "No data found on remote server." }
|
||||
finishWithError(startInstant, "No data found on remote server.", periodic)
|
||||
return
|
||||
}
|
||||
|
||||
val isLibraryEmpty =
|
||||
transaction {
|
||||
MangaTable
|
||||
.selectAll()
|
||||
.where { MangaTable.inLibrary eq true }
|
||||
.empty()
|
||||
}
|
||||
|
||||
// Check if it's first sync based on lastSyncTimestamp
|
||||
if (syncPreferences.getLong("last_sync_timestamp", 0) == 0L && !isLibraryEmpty) {
|
||||
// It's first sync no need to restore data. (just update remote data)
|
||||
finishWithSuccess(startInstant, periodic)
|
||||
return
|
||||
}
|
||||
|
||||
val (filteredFavorites, nonFavorites) = filterFavoritesAndNonFavorites(remoteBackup)
|
||||
updateNonFavorites(nonFavorites)
|
||||
|
||||
val newSyncData =
|
||||
backup.copy(
|
||||
backupManga = filteredFavorites,
|
||||
backupCategories = remoteBackup.backupCategories,
|
||||
backupSources = remoteBackup.backupSources,
|
||||
)
|
||||
|
||||
val hasMangaChanges = filteredFavorites.isNotEmpty()
|
||||
val hasCategoryChanges = remoteBackup.backupCategories != backup.backupCategories
|
||||
val hasSourceChanges = remoteBackup.backupSources != backup.backupSources
|
||||
|
||||
if (!hasMangaChanges && !hasCategoryChanges && !hasSourceChanges) {
|
||||
// update the sync timestamp
|
||||
finishWithSuccess(startInstant, periodic)
|
||||
return
|
||||
}
|
||||
|
||||
if (serverConfig.syncDataCategories.value) {
|
||||
val mergedUids = newSyncData.backupCategories.map { it.uid }.toSet()
|
||||
val mergedNames = newSyncData.backupCategories.map { it.name }.toSet()
|
||||
val localCategories = Category.getCategoryList().filterNot { it.default } // Exclude system category
|
||||
val categoriesToDelete =
|
||||
localCategories.filter {
|
||||
it.uid !in mergedUids && it.name !in mergedNames
|
||||
}
|
||||
if (categoriesToDelete.isNotEmpty()) {
|
||||
transaction {
|
||||
categoriesToDelete.forEach {
|
||||
Category.removeCategory(it.id)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
val backupStream = ProtoBuf.encodeToByteArray(Backup.serializer(), newSyncData).inputStream()
|
||||
val restoreId =
|
||||
ProtoBackupImport.restore(
|
||||
sourceStream = backupStream,
|
||||
flags = backupFlags,
|
||||
isSync = true,
|
||||
)
|
||||
_lastSyncState.value = SyncState.Restoring(startInstant, restoreId)
|
||||
|
||||
ProtoBackupImport.notifyFlow.first {
|
||||
val restoreState = ProtoBackupImport.getRestoreState(restoreId)
|
||||
|
||||
restoreState == ProtoBackupImport.BackupRestoreState.Success ||
|
||||
restoreState == ProtoBackupImport.BackupRestoreState.Failure
|
||||
}
|
||||
|
||||
// update the sync timestamp
|
||||
finishWithSuccess(startInstant, periodic)
|
||||
} catch (e: Throwable) {
|
||||
logger.error { "Error syncing: ${e.message}" }
|
||||
finishWithError(startInstant, "${e::class.qualifiedName}: ${e.message}", periodic)
|
||||
}
|
||||
}
|
||||
|
||||
private fun finishWithSuccess(
|
||||
startInstant: Instant,
|
||||
periodic: Boolean,
|
||||
) {
|
||||
syncPreferences
|
||||
.edit()
|
||||
.putLong("last_sync_timestamp", Clock.System.now().toEpochMilliseconds())
|
||||
.apply()
|
||||
_lastSyncState.value = SyncState.Success(startInstant)
|
||||
|
||||
logger.info {
|
||||
if (periodic) {
|
||||
"Periodic sync completed successfully"
|
||||
} else {
|
||||
"Manual sync completed successfully"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun finishWithError(
|
||||
startInstant: Instant,
|
||||
message: String,
|
||||
periodic: Boolean,
|
||||
) {
|
||||
_lastSyncState.value = SyncState.Error(startInstant, message)
|
||||
|
||||
logger.info {
|
||||
if (periodic) {
|
||||
"Periodic sync failed: $message"
|
||||
} else {
|
||||
"Manual sync failed: $message"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun isMangaDifferent(
|
||||
localManga: MangaDataClass,
|
||||
remoteManga: BackupManga,
|
||||
): Boolean {
|
||||
if (localManga.version != remoteManga.version) {
|
||||
return true
|
||||
}
|
||||
|
||||
val localChapters =
|
||||
transaction {
|
||||
ChapterTable
|
||||
.selectAll()
|
||||
.where { ChapterTable.manga eq localManga.id }
|
||||
.map { ChapterTable.toDataClass(it) }
|
||||
}
|
||||
|
||||
if (areChaptersDifferent(localChapters, remoteManga.chapters)) {
|
||||
return true
|
||||
}
|
||||
|
||||
val localCategories =
|
||||
transaction {
|
||||
CategoryMangaTable
|
||||
.innerJoin(CategoryTable)
|
||||
.selectAll()
|
||||
.where { CategoryMangaTable.manga eq localManga.id }
|
||||
.map { it[CategoryTable.order] }
|
||||
}
|
||||
|
||||
return localCategories.toSet() != remoteManga.categories.toSet()
|
||||
}
|
||||
|
||||
private fun areChaptersDifferent(
|
||||
localChapters: List<ChapterDataClass>,
|
||||
remoteChapters: List<BackupChapter>,
|
||||
): Boolean {
|
||||
val localChapterMap = localChapters.associateBy { it.url }
|
||||
val remoteChapterMap = remoteChapters.associateBy { it.url }
|
||||
|
||||
if (localChapterMap.size != remoteChapterMap.size) {
|
||||
return true
|
||||
}
|
||||
|
||||
for ((url, localChapter) in localChapterMap) {
|
||||
val remoteChapter = remoteChapterMap[url]
|
||||
|
||||
// If a matching remote chapter doesn't exist, or the version numbers are different, consider them different
|
||||
if (remoteChapter == null || localChapter.version != remoteChapter.version) {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
private fun filterFavoritesAndNonFavorites(backup: Backup): Pair<List<BackupManga>, List<BackupManga>> {
|
||||
val favorites = mutableListOf<BackupManga>()
|
||||
val nonFavorites = mutableListOf<BackupManga>()
|
||||
|
||||
val elapsedTime =
|
||||
measureTime {
|
||||
logger.debug { "Starting to filter favorites and non-favorites from backup data." }
|
||||
|
||||
backup.backupManga.forEach { remoteManga ->
|
||||
val localManga =
|
||||
transaction {
|
||||
MangaTable
|
||||
.selectAll()
|
||||
.where {
|
||||
(MangaTable.sourceReference eq remoteManga.source) and
|
||||
(MangaTable.url eq remoteManga.url)
|
||||
}.limit(1)
|
||||
.map { MangaTable.toDataClass(it) }
|
||||
.firstOrNull()
|
||||
}
|
||||
|
||||
when {
|
||||
// Checks if the manga is in favorites and needs updating or adding
|
||||
remoteManga.favorite -> {
|
||||
if (localManga == null || isMangaDifferent(localManga, remoteManga)) {
|
||||
logger.debug { "Adding to favorites: ${remoteManga.title}" }
|
||||
favorites.add(remoteManga)
|
||||
} else {
|
||||
logger.debug { "Already up-to-date favorite: ${remoteManga.title}" }
|
||||
}
|
||||
}
|
||||
|
||||
// Handle non-favorites
|
||||
!remoteManga.favorite -> {
|
||||
logger.debug { "Adding to non-favorites: ${remoteManga.title}" }
|
||||
nonFavorites.add(remoteManga)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
logger.debug {
|
||||
"Filtering completed in $elapsedTime. Favorites found: ${favorites.size}, Non-favorites found: ${nonFavorites.size}"
|
||||
}
|
||||
|
||||
return Pair(favorites, nonFavorites)
|
||||
}
|
||||
|
||||
private fun updateNonFavorites(nonFavorites: List<BackupManga>) {
|
||||
nonFavorites.forEach { nonFavorite ->
|
||||
val localManga =
|
||||
transaction {
|
||||
MangaTable
|
||||
.selectAll()
|
||||
.where {
|
||||
(MangaTable.sourceReference eq nonFavorite.source) and
|
||||
(MangaTable.url eq nonFavorite.url)
|
||||
}.limit(1)
|
||||
.map { MangaTable.toDataClass(it) }
|
||||
.firstOrNull()
|
||||
}
|
||||
|
||||
if (localManga != null) {
|
||||
if (localManga.inLibrary != nonFavorite.favorite) {
|
||||
transaction {
|
||||
MangaTable.update({ MangaTable.id eq localManga.id }) {
|
||||
it[inLibrary] = nonFavorite.favorite
|
||||
}
|
||||
}.apply {
|
||||
handleMangaThumbnail(localManga.id, nonFavorite.favorite)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
sealed class SyncState(
|
||||
open val startDate: Instant,
|
||||
) {
|
||||
data class Started(
|
||||
override val startDate: Instant,
|
||||
) : SyncState(startDate)
|
||||
|
||||
data class CreatingBackup(
|
||||
override val startDate: Instant,
|
||||
) : SyncState(startDate)
|
||||
|
||||
data class Downloading(
|
||||
override val startDate: Instant,
|
||||
) : SyncState(startDate)
|
||||
|
||||
data class Merging(
|
||||
override val startDate: Instant,
|
||||
) : SyncState(startDate)
|
||||
|
||||
data class Uploading(
|
||||
override val startDate: Instant,
|
||||
) : SyncState(startDate)
|
||||
|
||||
data class Restoring(
|
||||
override val startDate: Instant,
|
||||
val restoreId: String,
|
||||
) : SyncState(startDate)
|
||||
|
||||
data class Success(
|
||||
override val startDate: Instant,
|
||||
val endDate: Instant = Clock.System.now(),
|
||||
) : SyncState(startDate)
|
||||
|
||||
data class Error(
|
||||
override val startDate: Instant,
|
||||
val message: String,
|
||||
val endDate: Instant = Clock.System.now(),
|
||||
) : SyncState(startDate)
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,517 @@
|
||||
package suwayomi.tachidesk.global.impl.sync
|
||||
|
||||
import android.app.Application
|
||||
import android.content.Context
|
||||
import eu.kanade.tachiyomi.network.GET
|
||||
import eu.kanade.tachiyomi.network.NetworkHelper
|
||||
import eu.kanade.tachiyomi.network.PUT
|
||||
import eu.kanade.tachiyomi.network.await
|
||||
import io.github.oshai.kotlinlogging.KotlinLogging
|
||||
import io.javalin.http.HttpStatus
|
||||
import kotlinx.serialization.SerializationException
|
||||
import kotlinx.serialization.protobuf.ProtoBuf
|
||||
import okhttp3.Headers
|
||||
import okhttp3.MediaType.Companion.toMediaType
|
||||
import okhttp3.RequestBody.Companion.toRequestBody
|
||||
import suwayomi.tachidesk.manga.impl.backup.proto.models.Backup
|
||||
import suwayomi.tachidesk.manga.impl.backup.proto.models.BackupCategory
|
||||
import suwayomi.tachidesk.manga.impl.backup.proto.models.BackupChapter
|
||||
import suwayomi.tachidesk.manga.impl.backup.proto.models.BackupManga
|
||||
import suwayomi.tachidesk.manga.impl.backup.proto.models.BackupSource
|
||||
import suwayomi.tachidesk.server.serverConfig
|
||||
import uy.kohesive.injekt.Injekt
|
||||
import uy.kohesive.injekt.api.get
|
||||
import uy.kohesive.injekt.injectLazy
|
||||
import kotlin.time.Duration.Companion.milliseconds
|
||||
import kotlin.time.Duration.Companion.seconds
|
||||
import kotlin.time.Instant
|
||||
|
||||
object SyncYomiSyncService {
|
||||
private val syncPreferences = Injekt.get<Application>().getSharedPreferences("sync", Context.MODE_PRIVATE)
|
||||
|
||||
private val network: NetworkHelper by injectLazy()
|
||||
private val logger = KotlinLogging.logger {}
|
||||
|
||||
private class SyncYomiException(
|
||||
message: String?,
|
||||
) : Exception(message)
|
||||
|
||||
suspend fun doSync(
|
||||
syncData: SyncData,
|
||||
startDate: Instant,
|
||||
setSyncState: (SyncManager.SyncState) -> Unit,
|
||||
): Backup? {
|
||||
setSyncState(SyncManager.SyncState.Downloading(startDate))
|
||||
val (remoteData, etag) = pullSyncData()
|
||||
|
||||
val finalSyncData =
|
||||
if (remoteData != null) {
|
||||
require(etag.isNotEmpty()) { "ETag should never be empty if remote data is not null" }
|
||||
logger.debug { "Try update remote data with ETag($etag)" }
|
||||
setSyncState(SyncManager.SyncState.Merging(startDate))
|
||||
mergeSyncData(syncData, remoteData)
|
||||
} else {
|
||||
// init or overwrite remote data
|
||||
logger.debug { "Try overwrite remote data with ETag($etag)" }
|
||||
syncData
|
||||
}
|
||||
|
||||
if (finalSyncData.backup != null) {
|
||||
setSyncState(SyncManager.SyncState.Uploading(startDate))
|
||||
}
|
||||
pushSyncData(finalSyncData, etag)
|
||||
return finalSyncData.backup
|
||||
}
|
||||
|
||||
private suspend fun pullSyncData(): Pair<SyncData?, String> {
|
||||
val host = serverConfig.syncYomiHost.value
|
||||
val apiKey = serverConfig.syncYomiApiKey.value
|
||||
val downloadUrl = "$host/api/sync/content"
|
||||
|
||||
val headersBuilder = Headers.Builder().add("X-API-Token", apiKey)
|
||||
val lastETag = syncPreferences.getString("last_sync_etag", "") ?: ""
|
||||
if (lastETag != "") {
|
||||
headersBuilder.add("If-None-Match", lastETag)
|
||||
}
|
||||
val headers = headersBuilder.build()
|
||||
|
||||
val downloadRequest =
|
||||
GET(
|
||||
url = downloadUrl,
|
||||
headers = headers,
|
||||
)
|
||||
|
||||
val response = network.client.newCall(downloadRequest).await()
|
||||
|
||||
if (response.code == HttpStatus.NOT_MODIFIED.code) {
|
||||
// not modified
|
||||
require(lastETag.isNotEmpty())
|
||||
logger.info { "Remote server not modified" }
|
||||
return Pair(null, lastETag)
|
||||
} else if (response.code == HttpStatus.NOT_FOUND.code) {
|
||||
// maybe got deleted from remote
|
||||
return Pair(null, "")
|
||||
}
|
||||
|
||||
if (response.isSuccessful) {
|
||||
val newETag =
|
||||
response.headers["ETag"]
|
||||
?.takeIf { it.isNotEmpty() } ?: throw SyncYomiException("Missing ETag")
|
||||
|
||||
val byteArray =
|
||||
response.body.byteStream().use {
|
||||
return@use it.readBytes()
|
||||
}
|
||||
|
||||
return try {
|
||||
val backup = ProtoBuf.decodeFromByteArray(Backup.serializer(), byteArray)
|
||||
return Pair(SyncData(backup = backup), newETag)
|
||||
} catch (_: SerializationException) {
|
||||
logger.info { "Bad content responsed from server" }
|
||||
// the body is invalid
|
||||
// return default value so we can overwrite it
|
||||
Pair(null, "")
|
||||
}
|
||||
} else {
|
||||
val responseBody = response.body.string()
|
||||
logger.error { "SyncError: $responseBody" }
|
||||
throw SyncYomiException("Failed to download sync data: $responseBody")
|
||||
}
|
||||
}
|
||||
|
||||
private suspend fun pushSyncData(
|
||||
syncData: SyncData,
|
||||
eTag: String,
|
||||
) {
|
||||
val backup = syncData.backup ?: return
|
||||
|
||||
val host = serverConfig.syncYomiHost.value
|
||||
val apiKey = serverConfig.syncYomiApiKey.value
|
||||
val uploadUrl = "$host/api/sync/content"
|
||||
|
||||
val headersBuilder = Headers.Builder().add("X-API-Token", apiKey)
|
||||
if (eTag.isNotEmpty()) {
|
||||
headersBuilder.add("If-Match", eTag)
|
||||
}
|
||||
val headers = headersBuilder.build()
|
||||
|
||||
// Set timeout to 30 seconds
|
||||
val timeout = 30.seconds
|
||||
val client =
|
||||
network.client
|
||||
.newBuilder()
|
||||
.connectTimeout(timeout)
|
||||
.readTimeout(timeout)
|
||||
.writeTimeout(timeout)
|
||||
.build()
|
||||
|
||||
val byteArray = ProtoBuf.encodeToByteArray(Backup.serializer(), backup)
|
||||
if (byteArray.isEmpty()) {
|
||||
throw IllegalStateException("Empty backup error")
|
||||
}
|
||||
val body = byteArray.toRequestBody("application/octet-stream".toMediaType())
|
||||
|
||||
val uploadRequest =
|
||||
PUT(
|
||||
url = uploadUrl,
|
||||
headers = headers,
|
||||
body = body,
|
||||
)
|
||||
|
||||
val response = client.newCall(uploadRequest).await()
|
||||
|
||||
if (response.isSuccessful) {
|
||||
val newETag =
|
||||
response.headers["ETag"]
|
||||
?.takeIf { it.isNotEmpty() } ?: throw SyncYomiException("Missing ETag")
|
||||
syncPreferences
|
||||
.edit()
|
||||
.putString("last_sync_etag", newETag)
|
||||
.apply()
|
||||
logger.debug { "SyncYomi sync completed" }
|
||||
} else if (response.code == HttpStatus.PRECONDITION_FAILED.code) {
|
||||
// other clients updated remote data, will try next time
|
||||
logger.debug { "SyncYomi sync failed with 412" }
|
||||
} else {
|
||||
val responseBody = response.body.string()
|
||||
logger.error { "SyncError: $responseBody" }
|
||||
}
|
||||
}
|
||||
|
||||
fun mergeSyncData(
|
||||
localSyncData: SyncData,
|
||||
remoteSyncData: SyncData,
|
||||
): SyncData {
|
||||
val mergedCategoriesList =
|
||||
mergeCategoriesLists(localSyncData.backup?.backupCategories, remoteSyncData.backup?.backupCategories)
|
||||
|
||||
val mergedMangaList =
|
||||
mergeMangaLists(
|
||||
localSyncData.backup?.backupManga,
|
||||
remoteSyncData.backup?.backupManga,
|
||||
localSyncData.backup?.backupCategories ?: emptyList(),
|
||||
remoteSyncData.backup?.backupCategories ?: emptyList(),
|
||||
mergedCategoriesList,
|
||||
)
|
||||
|
||||
val mergedSourcesList =
|
||||
mergeSourcesLists(localSyncData.backup?.backupSources, remoteSyncData.backup?.backupSources)
|
||||
|
||||
// Create the merged Backup object
|
||||
val mergedBackup =
|
||||
Backup(
|
||||
backupManga = mergedMangaList,
|
||||
backupCategories = mergedCategoriesList,
|
||||
backupSources = mergedSourcesList,
|
||||
meta = emptyMap(),
|
||||
serverSettings = null,
|
||||
)
|
||||
|
||||
// Create the merged SData object
|
||||
return SyncData(
|
||||
backup = mergedBackup,
|
||||
)
|
||||
}
|
||||
|
||||
private fun mergeMangaLists(
|
||||
localMangaList: List<BackupManga>?,
|
||||
remoteMangaList: List<BackupManga>?,
|
||||
localCategories: List<BackupCategory>,
|
||||
remoteCategories: List<BackupCategory>,
|
||||
mergedCategories: List<BackupCategory>,
|
||||
): List<BackupManga> {
|
||||
val localMangaListSafe = localMangaList.orEmpty()
|
||||
val remoteMangaListSafe = remoteMangaList.orEmpty()
|
||||
|
||||
logger.debug { "Starting merge. Local list size: ${localMangaListSafe.size}, Remote list size: ${remoteMangaListSafe.size}" }
|
||||
|
||||
fun mangaCompositeKey(manga: BackupManga): String =
|
||||
"${manga.source}|${manga.url}|${manga.title.lowercase().trim()}|${manga.author?.lowercase()?.trim()}"
|
||||
|
||||
// Create maps using composite keys
|
||||
val localMangaMap = localMangaListSafe.associateBy { mangaCompositeKey(it) }
|
||||
val remoteMangaMap = remoteMangaListSafe.associateBy { mangaCompositeKey(it) }
|
||||
|
||||
val localCategoriesMapByOrder = localCategories.associateBy { it.order }
|
||||
val remoteCategoriesMapByOrder = remoteCategories.associateBy { it.order }
|
||||
val mergedCategoriesMapByName = mergedCategories.associateBy { it.name }
|
||||
|
||||
fun updateCategories(
|
||||
theManga: BackupManga,
|
||||
theMap: Map<Int, BackupCategory>,
|
||||
): BackupManga =
|
||||
theManga.copy(
|
||||
categories =
|
||||
theManga.categories.mapNotNull {
|
||||
theMap[it]?.let { category ->
|
||||
mergedCategoriesMapByName[category.name]?.order
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
val lastSyncTime = syncPreferences.getLong("last_sync_timestamp", 0).milliseconds.inWholeSeconds
|
||||
|
||||
val mergedList =
|
||||
(localMangaMap.keys + remoteMangaMap.keys).distinct().mapNotNull { compositeKey ->
|
||||
val local = localMangaMap[compositeKey]
|
||||
val remote = remoteMangaMap[compositeKey]
|
||||
|
||||
// New version comparison logic
|
||||
when {
|
||||
local != null && remote == null -> {
|
||||
if (lastSyncTime == 0L || local.lastModifiedAt > lastSyncTime) {
|
||||
updateCategories(local, localCategoriesMapByOrder)
|
||||
} else {
|
||||
logger.debug { "Dropping local manga deleted on remote: ${local.title}." }
|
||||
null
|
||||
}
|
||||
}
|
||||
|
||||
local == null && remote != null -> {
|
||||
if (lastSyncTime == 0L || remote.lastModifiedAt > lastSyncTime) {
|
||||
updateCategories(remote, remoteCategoriesMapByOrder)
|
||||
} else {
|
||||
logger.debug { "Dropping deleted remote manga: ${remote.title}." }
|
||||
null
|
||||
}
|
||||
}
|
||||
|
||||
local != null && remote != null -> {
|
||||
// Compare versions to decide which manga to keep
|
||||
if (local.version >= remote.version) {
|
||||
logger.debug { "Keeping local version of ${local.title} with merged chapters." }
|
||||
updateCategories(
|
||||
local.copy(
|
||||
chapters =
|
||||
mergeChapters(
|
||||
local.chapters,
|
||||
remote.chapters,
|
||||
lastSyncTime,
|
||||
serverConfig.syncDataChapters.value,
|
||||
),
|
||||
),
|
||||
localCategoriesMapByOrder,
|
||||
)
|
||||
} else {
|
||||
logger.debug { "Keeping remote version of ${remote.title} with merged chapters." }
|
||||
updateCategories(
|
||||
remote.copy(
|
||||
chapters =
|
||||
mergeChapters(
|
||||
local.chapters,
|
||||
remote.chapters,
|
||||
lastSyncTime,
|
||||
serverConfig.syncDataChapters.value,
|
||||
),
|
||||
),
|
||||
remoteCategoriesMapByOrder,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
else -> {
|
||||
null
|
||||
} // No manga found for key
|
||||
}
|
||||
}
|
||||
|
||||
// Counting favorites and non-favorites
|
||||
val (favorites, nonFavorites) = mergedList.partition { it.favorite }
|
||||
|
||||
logger.debug {
|
||||
"Merge completed. Total merged manga: ${mergedList.size}, Favorites: ${favorites.size}, Non-Favorites: ${nonFavorites.size}"
|
||||
}
|
||||
|
||||
return mergedList
|
||||
}
|
||||
|
||||
private fun mergeChapters(
|
||||
localChapters: List<BackupChapter>,
|
||||
remoteChapters: List<BackupChapter>,
|
||||
lastSyncTime: Long,
|
||||
syncingChapters: Boolean,
|
||||
): List<BackupChapter> {
|
||||
if (!syncingChapters) {
|
||||
return remoteChapters // If not syncing chapters, keep remote untouched
|
||||
}
|
||||
|
||||
fun chapterCompositeKey(chapter: BackupChapter): String = "${chapter.url}|${chapter.name}|${chapter.chapterNumber}"
|
||||
|
||||
val localChapterMap = localChapters.associateBy { chapterCompositeKey(it) }
|
||||
val remoteChapterMap = remoteChapters.associateBy { chapterCompositeKey(it) }
|
||||
|
||||
logger.debug { "Starting chapter merge. Local chapters: ${localChapters.size}, Remote chapters: ${remoteChapters.size}" }
|
||||
|
||||
// Merge both chapter maps based on version numbers
|
||||
val mergedChapters =
|
||||
(localChapterMap.keys + remoteChapterMap.keys).distinct().mapNotNull { compositeKey ->
|
||||
val localChapter = localChapterMap[compositeKey]
|
||||
val remoteChapter = remoteChapterMap[compositeKey]
|
||||
|
||||
logger.debug {
|
||||
"Processing chapter key: $compositeKey. Local chapter: ${localChapter != null}, Remote chapter: ${remoteChapter != null}"
|
||||
}
|
||||
|
||||
when {
|
||||
localChapter != null && remoteChapter == null -> {
|
||||
if (lastSyncTime == 0L || localChapter.lastModifiedAt > lastSyncTime) {
|
||||
logger.debug { "Keeping local chapter: ${localChapter.name}." }
|
||||
localChapter
|
||||
} else {
|
||||
logger.debug { "Dropping local chapter deleted on remote: ${localChapter.name}." }
|
||||
null
|
||||
}
|
||||
}
|
||||
|
||||
localChapter == null && remoteChapter != null -> {
|
||||
if (lastSyncTime == 0L || remoteChapter.lastModifiedAt > lastSyncTime) {
|
||||
logger.debug { "Taking remote chapter: ${remoteChapter.name}." }
|
||||
remoteChapter
|
||||
} else {
|
||||
logger.debug { "Dropping deleted remote chapter: ${remoteChapter.name}." }
|
||||
null
|
||||
}
|
||||
}
|
||||
|
||||
localChapter != null && remoteChapter != null -> {
|
||||
// Use version number to decide which chapter to keep
|
||||
val chosenChapter =
|
||||
if (localChapter.version >= remoteChapter.version) {
|
||||
// If there mare more chapter on remote, local sourceOrder will need to be updated to maintain correct source order.
|
||||
if (localChapters.size < remoteChapters.size) {
|
||||
localChapter.copy(sourceOrder = remoteChapter.sourceOrder)
|
||||
} else {
|
||||
localChapter
|
||||
}
|
||||
} else {
|
||||
remoteChapter
|
||||
}
|
||||
logger.debug {
|
||||
"Merging chapter: ${chosenChapter.name}. Chosen version from: ${if (localChapter.version >= remoteChapter.version) "Local" else "Remote"}, Local version: ${localChapter.version}, Remote version: ${remoteChapter.version}."
|
||||
}
|
||||
chosenChapter
|
||||
}
|
||||
|
||||
else -> {
|
||||
logger.debug { "No chapter found for composite key: $compositeKey. Skipping." }
|
||||
null
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
logger.debug { "Chapter merge completed. Total merged chapters: ${mergedChapters.size}" }
|
||||
|
||||
return mergedChapters
|
||||
}
|
||||
|
||||
private fun mergeCategoriesLists(
|
||||
localCategoriesList: List<BackupCategory>?,
|
||||
remoteCategoriesList: List<BackupCategory>?,
|
||||
): List<BackupCategory> {
|
||||
if (localCategoriesList == null) return remoteCategoriesList ?: emptyList()
|
||||
if (remoteCategoriesList == null) return localCategoriesList
|
||||
val result = mutableListOf<BackupCategory>()
|
||||
val processedLocals = mutableSetOf<BackupCategory>()
|
||||
|
||||
val localMapByUid = localCategoriesList.filter { it.uid != 0L }.associateBy { it.uid }
|
||||
val localMapByName = localCategoriesList.associateBy { it.name }
|
||||
|
||||
val lastSyncTime = syncPreferences.getLong("last_sync_timestamp", 0)
|
||||
|
||||
remoteCategoriesList.forEach { remote ->
|
||||
var localMatch: BackupCategory? = null
|
||||
|
||||
// 1. Try match by UID
|
||||
if (remote.uid != 0L) {
|
||||
localMatch = localMapByUid[remote.uid]
|
||||
}
|
||||
|
||||
// 2. Try match by Name (fallback)
|
||||
if (localMatch == null) {
|
||||
localMatch = localMapByName[remote.name]
|
||||
}
|
||||
|
||||
if (localMatch != null) {
|
||||
processedLocals.add(localMatch)
|
||||
// Conflict resolution
|
||||
if (localMatch.version >= remote.version) {
|
||||
logger.debug { "Keeping local category: ${localMatch.name} (UID: ${localMatch.uid})" }
|
||||
result.add(localMatch)
|
||||
} else {
|
||||
logger.debug { "Keeping remote category: ${remote.name} (UID: ${remote.uid})" }
|
||||
// Preserve Local UID if Remote was 0
|
||||
if (remote.uid == 0L) {
|
||||
remote.uid = localMatch.uid
|
||||
}
|
||||
result.add(remote)
|
||||
}
|
||||
} else {
|
||||
val remoteModifiedTimeMillis = remote.lastModifiedAt.seconds.inWholeMilliseconds
|
||||
if (lastSyncTime == 0L || remoteModifiedTimeMillis > lastSyncTime) {
|
||||
logger.debug { "Adding new remote category: ${remote.name} (UID: ${remote.uid})" }
|
||||
result.add(remote)
|
||||
} else {
|
||||
logger.debug { "Dropping deleted remote category: ${remote.name} (UID: ${remote.uid})" }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Add remaining Local Categories
|
||||
localCategoriesList.forEach { local ->
|
||||
if (local !in processedLocals) {
|
||||
val localModifiedTimeMillis = local.lastModifiedAt.seconds.inWholeMilliseconds
|
||||
if (lastSyncTime == 0L || localModifiedTimeMillis > lastSyncTime) {
|
||||
logger.debug { "Keeping local only category: ${local.name} (UID: ${local.uid})" }
|
||||
result.add(local)
|
||||
} else {
|
||||
logger.debug { "Dropping local category deleted on remote: ${local.name} (UID: ${local.uid})" }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return result.sortedBy { it.order }
|
||||
}
|
||||
|
||||
private fun mergeSourcesLists(
|
||||
localSources: List<BackupSource>?,
|
||||
remoteSources: List<BackupSource>?,
|
||||
): List<BackupSource> {
|
||||
// Create maps using sourceId as key
|
||||
val localSourceMap = localSources?.associateBy { it.sourceId } ?: emptyMap()
|
||||
val remoteSourceMap = remoteSources?.associateBy { it.sourceId } ?: emptyMap()
|
||||
|
||||
logger.debug { "Starting source merge. Local sources: ${localSources?.size}, Remote sources: ${remoteSources?.size}" }
|
||||
|
||||
// Merge both source maps
|
||||
val mergedSources =
|
||||
(localSourceMap.keys + remoteSourceMap.keys).distinct().mapNotNull { sourceId ->
|
||||
val localSource = localSourceMap[sourceId]
|
||||
val remoteSource = remoteSourceMap[sourceId]
|
||||
|
||||
logger.debug {
|
||||
"Processing source ID: $sourceId. Local source: ${localSource != null}, Remote source: ${remoteSource != null}"
|
||||
}
|
||||
|
||||
when {
|
||||
localSource != null && remoteSource == null -> {
|
||||
logger.debug { "Using local source: ${localSource.name}." }
|
||||
localSource
|
||||
}
|
||||
|
||||
remoteSource != null && localSource == null -> {
|
||||
logger.debug { "Using remote source: ${remoteSource.name}." }
|
||||
remoteSource
|
||||
}
|
||||
|
||||
else -> {
|
||||
logger.debug { "Remote and local have the same source ID: $sourceId. Keeping local." }
|
||||
localSource
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
logger.debug { "Source merge completed. Total merged sources: ${mergedSources.size}" }
|
||||
|
||||
return mergedSources
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user