Downloader Rewrite (#437)

* Downloader rewrite
- Rewrite downloader to use coroutines instead of a thread
- Remove unused Page functions
- Add page progress
- Add ProgressResponseBody
- Add support for canceling a download in the middle of downloading
- Fix clear download queue

* Minor fix

* Minor improvements
- notifyAllClients now launches in another thread and only sends new data every second
- Better handling of download queue checker in step()
- Minor improvements and fixes

* Reorder downloads

* Download in parallel by source

* Remove TODO
This commit is contained in:
Mitchell Syer
2022-11-07 20:09:26 -05:00
committed by GitHub
parent 119b9db6b4
commit 2195c3df76
12 changed files with 274 additions and 116 deletions

View File

@@ -116,13 +116,13 @@ fun Call.asObservableSuccess(): Observable<Response> {
@Suppress("UNUSED_PARAMETER") @Suppress("UNUSED_PARAMETER")
fun OkHttpClient.newCallWithProgress(request: Request, listener: ProgressListener): Call { fun OkHttpClient.newCallWithProgress(request: Request, listener: ProgressListener): Call {
val progressClient = newBuilder() val progressClient = newBuilder()
// .cache(null) .cache(null)
// .addNetworkInterceptor { chain -> .addNetworkInterceptor { chain ->
// val originalResponse = chain.proceed(chain.request()) val originalResponse = chain.proceed(chain.request())
// originalResponse.newBuilder() originalResponse.newBuilder()
// .body(ProgressResponseBody(originalResponse.body!!, listener)) .body(ProgressResponseBody(originalResponse.body!!, listener))
// .build() .build()
// } }
.build() .build()
return progressClient.newCall(request) return progressClient.newCall(request)

View File

@@ -0,0 +1,44 @@
package eu.kanade.tachiyomi.network
import okhttp3.MediaType
import okhttp3.ResponseBody
import okio.Buffer
import okio.BufferedSource
import okio.ForwardingSource
import okio.Source
import okio.buffer
import java.io.IOException
class ProgressResponseBody(private val responseBody: ResponseBody, private val progressListener: ProgressListener) : ResponseBody() {
private val bufferedSource: BufferedSource by lazy {
source(responseBody.source()).buffer()
}
override fun contentType(): MediaType? {
return responseBody.contentType()
}
override fun contentLength(): Long {
return responseBody.contentLength()
}
override fun source(): BufferedSource {
return bufferedSource
}
private fun source(source: Source): Source {
return object : ForwardingSource(source) {
var totalBytesRead = 0L
@Throws(IOException::class)
override fun read(sink: Buffer, byteCount: Long): Long {
val bytesRead = super.read(sink, byteCount)
// read() returns the number of bytes read, or -1 if this source is exhausted.
totalBytesRead += if (bytesRead != -1L) bytesRead else 0
progressListener.update(totalBytesRead, responseBody.contentLength(), bytesRead == -1L)
return bytesRead
}
}
}
}

View File

@@ -1,6 +1,5 @@
package eu.kanade.tachiyomi.source.local.loader package eu.kanade.tachiyomi.source.local.loader
import eu.kanade.tachiyomi.source.model.Page
import eu.kanade.tachiyomi.util.storage.EpubFile import eu.kanade.tachiyomi.util.storage.EpubFile
import java.io.File import java.io.File
@@ -24,7 +23,6 @@ class EpubPageLoader(file: File) : PageLoader {
val streamFn = { epub.getInputStream(epub.getEntry(path)!!) } val streamFn = { epub.getInputStream(epub.getEntry(path)!!) }
ReaderPage(i).apply { ReaderPage(i).apply {
stream = streamFn stream = streamFn
status = Page.READY
} }
} }
} }

View File

@@ -2,7 +2,6 @@ package eu.kanade.tachiyomi.source.local.loader
import com.github.junrar.Archive import com.github.junrar.Archive
import com.github.junrar.rarfile.FileHeader import com.github.junrar.rarfile.FileHeader
import eu.kanade.tachiyomi.source.model.Page
import eu.kanade.tachiyomi.util.lang.compareToCaseInsensitiveNaturalOrder import eu.kanade.tachiyomi.util.lang.compareToCaseInsensitiveNaturalOrder
import suwayomi.tachidesk.manga.impl.util.storage.ImageUtil import suwayomi.tachidesk.manga.impl.util.storage.ImageUtil
import java.io.ByteArrayInputStream import java.io.ByteArrayInputStream
@@ -46,7 +45,6 @@ class RarPageLoader(file: File) : PageLoader {
ReaderPage(i).apply { ReaderPage(i).apply {
stream = streamFn stream = streamFn
status = Page.READY
} }
} }
} }
@@ -58,7 +56,6 @@ class RarPageLoader(file: File) : PageLoader {
ReaderPage(i).apply { ReaderPage(i).apply {
stream = streamFn stream = streamFn
status = Page.READY
} }
} }
} }

View File

@@ -1,6 +1,5 @@
package eu.kanade.tachiyomi.source.local.loader package eu.kanade.tachiyomi.source.local.loader
import eu.kanade.tachiyomi.source.model.Page
import eu.kanade.tachiyomi.util.lang.compareToCaseInsensitiveNaturalOrder import eu.kanade.tachiyomi.util.lang.compareToCaseInsensitiveNaturalOrder
import suwayomi.tachidesk.manga.impl.util.storage.ImageUtil import suwayomi.tachidesk.manga.impl.util.storage.ImageUtil
import java.io.File import java.io.File
@@ -24,7 +23,6 @@ class ZipPageLoader(file: File) : PageLoader {
val streamFn = { zip.getInputStream(entry) } val streamFn = { zip.getInputStream(entry) }
ReaderPage(i).apply { ReaderPage(i).apply {
stream = streamFn stream = streamFn
status = Page.READY
} }
} }
} }

View File

@@ -2,7 +2,8 @@ package eu.kanade.tachiyomi.source.model
import android.net.Uri import android.net.Uri
import eu.kanade.tachiyomi.network.ProgressListener import eu.kanade.tachiyomi.network.ProgressListener
import rx.subjects.Subject import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.asStateFlow
open class Page( open class Page(
val index: Int, val index: Int,
@@ -11,48 +12,17 @@ open class Page(
@Transient var uri: Uri? = null // Deprecated but can't be deleted due to extensions @Transient var uri: Uri? = null // Deprecated but can't be deleted due to extensions
) : ProgressListener { ) : ProgressListener {
val number: Int private val _progress = MutableStateFlow(0)
get() = index + 1 val progress = _progress.asStateFlow()
@Transient
@Volatile
var status: Int = 0
set(value) {
field = value
statusSubject?.onNext(value)
statusCallback?.invoke(this)
}
@Transient
@Volatile
var progress: Int = 0
set(value) {
field = value
statusCallback?.invoke(this)
}
@Transient
private var statusSubject: Subject<Int, Int>? = null
@Transient
private var statusCallback: ((Page) -> Unit)? = null
override fun update(bytesRead: Long, contentLength: Long, done: Boolean) { override fun update(bytesRead: Long, contentLength: Long, done: Boolean) {
progress = if (contentLength > 0) { _progress.value = if (contentLength > 0) {
(100 * bytesRead / contentLength).toInt() (100 * bytesRead / contentLength).toInt()
} else { } else {
-1 -1
} }
} }
fun setStatusSubject(subject: Subject<Int, Int>?) {
this.statusSubject = subject
}
fun setStatusCallback(f: ((Page) -> Unit)?) {
statusCallback = f
}
companion object { companion object {
const val QUEUE = 0 const val QUEUE = 0
const val LOAD_PAGE = 1 const val LOAD_PAGE = 1

View File

@@ -4,9 +4,7 @@ import eu.kanade.tachiyomi.source.model.Page
import rx.Observable import rx.Observable
fun HttpSource.getImageUrl(page: Page): Observable<Page> { fun HttpSource.getImageUrl(page: Page): Observable<Page> {
page.status = Page.LOAD_PAGE
return fetchImageUrl(page) return fetchImageUrl(page)
.doOnError { page.status = Page.ERROR }
.onErrorReturn { null } .onErrorReturn { null }
.doOnNext { page.imageUrl = it } .doOnNext { page.imageUrl = it }
.map { page } .map { page }

View File

@@ -106,12 +106,13 @@ object MangaAPI {
get("start", DownloadController.start) get("start", DownloadController.start)
get("stop", DownloadController.stop) get("stop", DownloadController.stop)
get("clear", DownloadController.stop) get("clear", DownloadController.clear)
} }
path("download") { path("download") {
get("{mangaId}/chapter/{chapterIndex}", DownloadController.queueChapter) get("{mangaId}/chapter/{chapterIndex}", DownloadController.queueChapter)
delete("{mangaId}/chapter/{chapterIndex}", DownloadController.unqueueChapter) delete("{mangaId}/chapter/{chapterIndex}", DownloadController.unqueueChapter)
patch("{mangaId}/chapter/{chapterIndex}/reorder/{to}", DownloadController.reorderChapter)
post("batch", DownloadController.queueChapters) post("batch", DownloadController.queueChapters)
} }

View File

@@ -46,10 +46,8 @@ object DownloadController {
description("Start the downloader") description("Start the downloader")
} }
}, },
behaviorOf = { ctx -> behaviorOf = {
DownloadManager.start() DownloadManager.start()
ctx.status(200)
}, },
withResults = { withResults = {
httpCode(HttpCode.OK) httpCode(HttpCode.OK)
@@ -65,9 +63,9 @@ object DownloadController {
} }
}, },
behaviorOf = { ctx -> behaviorOf = { ctx ->
DownloadManager.stop() ctx.future(
future { DownloadManager.stop() }
ctx.status(200) )
}, },
withResults = { withResults = {
httpCode(HttpCode.OK) httpCode(HttpCode.OK)
@@ -83,9 +81,9 @@ object DownloadController {
} }
}, },
behaviorOf = { ctx -> behaviorOf = { ctx ->
DownloadManager.clear() ctx.future(
future { DownloadManager.clear() }
ctx.status(200) )
}, },
withResults = { withResults = {
httpCode(HttpCode.OK) httpCode(HttpCode.OK)
@@ -155,4 +153,23 @@ object DownloadController {
httpCode(HttpCode.OK) httpCode(HttpCode.OK)
} }
) )
/** clear download queue */
val reorderChapter = handler(
pathParam<Int>("chapterIndex"),
pathParam<Int>("mangaId"),
pathParam<Int>("to"),
documentWith = {
withOperation {
summary("Downloader reorder chapter")
description("Reorder chapter in download queue")
}
},
behaviorOf = { _, chapterIndex, mangaId, to ->
DownloadManager.reorder(chapterIndex, mangaId, to)
},
withResults = {
httpCode(HttpCode.OK)
}
)
} }

View File

@@ -10,6 +10,7 @@ package suwayomi.tachidesk.manga.impl
import eu.kanade.tachiyomi.source.local.LocalSource import eu.kanade.tachiyomi.source.local.LocalSource
import eu.kanade.tachiyomi.source.model.Page import eu.kanade.tachiyomi.source.model.Page
import eu.kanade.tachiyomi.source.online.HttpSource import eu.kanade.tachiyomi.source.online.HttpSource
import kotlinx.coroutines.flow.StateFlow
import org.jetbrains.exposed.sql.and import org.jetbrains.exposed.sql.and
import org.jetbrains.exposed.sql.select import org.jetbrains.exposed.sql.select
import org.jetbrains.exposed.sql.transactions.transaction import org.jetbrains.exposed.sql.transactions.transaction
@@ -37,7 +38,7 @@ object Page {
return page.imageUrl!! return page.imageUrl!!
} }
suspend fun getPageImage(mangaId: Int, chapterIndex: Int, index: Int, useCache: Boolean = true): Pair<InputStream, String> { suspend fun getPageImage(mangaId: Int, chapterIndex: Int, index: Int, useCache: Boolean = true, progressFlow: ((StateFlow<Int>) -> Unit)? = null): Pair<InputStream, String> {
val mangaEntry = transaction { MangaTable.select { MangaTable.id eq mangaId }.first() } val mangaEntry = transaction { MangaTable.select { MangaTable.id eq mangaId }.first() }
val source = getCatalogueSourceOrStub(mangaEntry[MangaTable.sourceReference]) val source = getCatalogueSourceOrStub(mangaEntry[MangaTable.sourceReference])
val chapterEntry = transaction { val chapterEntry = transaction {
@@ -55,6 +56,7 @@ object Page {
pageEntry[PageTable.url], pageEntry[PageTable.url],
pageEntry[PageTable.imageUrl] pageEntry[PageTable.imageUrl]
) )
progressFlow?.invoke(tachiyomiPage.progress)
// we treat Local source differently // we treat Local source differently
if (source.id == LocalSource.ID) { if (source.id == LocalSource.ID) {

View File

@@ -9,6 +9,16 @@ package suwayomi.tachidesk.manga.impl.download
import io.javalin.websocket.WsContext import io.javalin.websocket.WsContext
import io.javalin.websocket.WsMessageContext import io.javalin.websocket.WsMessageContext
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.sample
import kotlinx.coroutines.launch
import kotlinx.serialization.Serializable import kotlinx.serialization.Serializable
import mu.KotlinLogging import mu.KotlinLogging
import org.jetbrains.exposed.sql.and import org.jetbrains.exposed.sql.and
@@ -24,13 +34,17 @@ import suwayomi.tachidesk.manga.model.table.MangaTable
import suwayomi.tachidesk.manga.model.table.toDataClass import suwayomi.tachidesk.manga.model.table.toDataClass
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.CopyOnWriteArrayList import java.util.concurrent.CopyOnWriteArrayList
import kotlin.time.Duration.Companion.seconds
private val logger = KotlinLogging.logger {} private val logger = KotlinLogging.logger {}
private const val MAX_SOURCES_IN_PARAllEL = 5
object DownloadManager { object DownloadManager {
private val scope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
private val clients = ConcurrentHashMap<String, WsContext>() private val clients = ConcurrentHashMap<String, WsContext>()
private val downloadQueue = CopyOnWriteArrayList<DownloadChapter>() private val downloadQueue = CopyOnWriteArrayList<DownloadChapter>()
private var downloader: Downloader? = null private val downloaders = ConcurrentHashMap<Long, Downloader>()
fun addClient(ctx: WsContext) { fun addClient(ctx: WsContext) {
clients[ctx.sessionId] = ctx clients[ctx.sessionId] = ctx
@@ -61,23 +75,73 @@ object DownloadManager {
} }
} }
private fun notifyAllClients() { private val notifyFlow = MutableSharedFlow<Unit>(extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)
init {
scope.launch {
notifyFlow.sample(1.seconds).collect {
val status = getStatus() val status = getStatus()
clients.forEach { clients.forEach {
it.value.send(status) it.value.send(status)
} }
} }
}
}
private fun notifyAllClients() {
scope.launch {
notifyFlow.emit(Unit)
}
}
private fun getStatus(): DownloadStatus { private fun getStatus(): DownloadStatus {
return DownloadStatus( return DownloadStatus(
if (downloader == null || if (downloadQueue.none { it.state == Downloading }) {
downloadQueue.none { it.state == Downloading }
) {
"Stopped" "Stopped"
} else { } else {
"Started" "Started"
}, },
downloadQueue downloadQueue.toList()
)
}
private val downloaderWatch = MutableSharedFlow<Unit>(extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)
init {
scope.launch {
downloaderWatch.sample(1.seconds).collect {
val runningDownloaders = downloaders.values.filter { it.isActive }
logger.info { "Running: ${runningDownloaders.size}" }
if (runningDownloaders.size < MAX_SOURCES_IN_PARAllEL) {
downloadQueue.asSequence()
.map { it.manga.sourceId.toLong() }
.distinct()
.minus(
runningDownloaders.map { it.sourceId }.toSet()
)
.take(MAX_SOURCES_IN_PARAllEL - runningDownloaders.size)
.map { getDownloader(it) }
.forEach {
it.start()
}
notifyAllClients()
}
}
}
}
private fun refreshDownloaders() {
scope.launch {
downloaderWatch.emit(Unit)
}
}
private fun getDownloader(sourceId: Long) = downloaders.getOrPut(sourceId) {
Downloader(
scope = scope,
sourceId = sourceId,
downloadQueue = downloadQueue,
notifier = ::notifyAllClients,
onComplete = ::refreshDownloaders
) )
} }
@@ -99,7 +163,7 @@ object DownloadManager {
) )
fun enqueue(input: EnqueueInput) { fun enqueue(input: EnqueueInput) {
if (input.chapterIds == null) return if (input.chapterIds.isNullOrEmpty()) return
val chapters = transaction { val chapters = transaction {
(ChapterTable innerJoin MangaTable) (ChapterTable innerJoin MangaTable)
@@ -136,6 +200,9 @@ object DownloadManager {
start() start()
notifyAllClients() notifyAllClients()
} }
scope.launch {
downloaderWatch.emit(Unit)
}
} }
/** /**
@@ -163,31 +230,32 @@ object DownloadManager {
notifyAllClients() notifyAllClients()
} }
fun reorder(chapterIndex: Int, mangaId: Int, to: Int) {
require(to >= 0) { "'to' must be over or equal to 0" }
val download = downloadQueue.find { it.mangaId == mangaId && it.chapterIndex == chapterIndex }
?: return
downloadQueue -= download
downloadQueue.add(to, download)
}
fun start() { fun start() {
if (downloader != null && !downloader?.isAlive!!) { scope.launch {
// doesn't exist or is dead downloaderWatch.emit(Unit)
downloader = null }
} }
if (downloader == null) { suspend fun stop() {
downloader = Downloader(downloadQueue) { notifyAllClients() } coroutineScope {
downloader!!.start() downloaders.map { (_, downloader) ->
async {
downloader.stop()
}
}.awaitAll()
} }
notifyAllClients() notifyAllClients()
} }
fun stop() { suspend fun clear() {
downloader?.let {
synchronized(it.shouldStop) {
it.shouldStop = true
}
}
downloader = null
notifyAllClients()
}
fun clear() {
stop() stop()
downloadQueue.clear() downloadQueue.clear()
notifyAllClients() notifyAllClients()

View File

@@ -7,7 +7,18 @@ package suwayomi.tachidesk.manga.impl.download
* License, v. 2.0. If a copy of the MPL was not distributed with this * License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at https://mozilla.org/MPL/2.0/. */ * file, You can obtain one at https://mozilla.org/MPL/2.0/. */
import kotlinx.coroutines.runBlocking import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.currentCoroutineContext
import kotlinx.coroutines.ensureActive
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.sample
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import mu.KotlinLogging import mu.KotlinLogging
import org.jetbrains.exposed.sql.and import org.jetbrains.exposed.sql.and
import org.jetbrains.exposed.sql.transactions.transaction import org.jetbrains.exposed.sql.transactions.transaction
@@ -24,40 +35,92 @@ import java.util.concurrent.CopyOnWriteArrayList
private val logger = KotlinLogging.logger {} private val logger = KotlinLogging.logger {}
class Downloader(private val downloadQueue: CopyOnWriteArrayList<DownloadChapter>, val notifier: () -> Unit) : Thread() { class Downloader(
var shouldStop: Boolean = false private val scope: CoroutineScope,
val sourceId: Long,
private val downloadQueue: CopyOnWriteArrayList<DownloadChapter>,
private val notifier: () -> Unit,
private val onComplete: () -> Unit
) {
private var job: Job? = null
class StopDownloadException : Exception("Cancelled download")
class PauseDownloadException : Exception("Pause download")
class DownloadShouldStopException : Exception() private suspend fun step(download: DownloadChapter?) {
fun step() {
notifier() notifier()
synchronized(shouldStop) { currentCoroutineContext().ensureActive()
if (shouldStop) throw DownloadShouldStopException() if (download != null && download != downloadQueue.firstOrNull { it.manga.sourceId.toLong() == sourceId && it.state != Error }) {
if (download in downloadQueue) {
throw PauseDownloadException()
} else {
throw StopDownloadException()
}
} }
} }
override fun run() { val isActive
do { get() = job?.isActive == true
fun start() {
if (!isActive) {
job = scope.launch {
run()
}.also { job ->
job.invokeOnCompletion {
if (it !is CancellationException) {
onComplete()
}
}
}
}
notifier()
}
suspend fun stop() {
job?.cancelAndJoin()
}
private suspend fun run() {
while (downloadQueue.isNotEmpty() && currentCoroutineContext().isActive) {
val download = downloadQueue.firstOrNull { val download = downloadQueue.firstOrNull {
it.state == Queued || it.manga.sourceId.toLong() == sourceId &&
(it.state == Error && it.tries < 3) // 3 re-tries per download (it.state == Queued || (it.state == Error && it.tries < 3)) // 3 re-tries per download
} ?: break } ?: break
try { try {
download.state = Downloading download.state = Downloading
step() step(download)
download.chapter = runBlocking { getChapterDownloadReady(download.chapterIndex, download.mangaId) } download.chapter = getChapterDownloadReady(download.chapterIndex, download.mangaId)
step() step(download)
val pageCount = download.chapter.pageCount val pageCount = download.chapter.pageCount
for (pageNum in 0 until pageCount) { for (pageNum in 0 until pageCount) {
runBlocking { getPageImage(download.mangaId, download.chapterIndex, pageNum) }.first.close() var pageProgressJob: Job? = null
try {
getPageImage(
mangaId = download.mangaId,
chapterIndex = download.chapterIndex,
index = pageNum,
progressFlow = { flow ->
pageProgressJob = flow
.sample(100)
.distinctUntilChanged()
.onEach {
download.progress = (pageNum.toFloat() + (it.toFloat() * 0.01f)) / pageCount
step(null) // don't throw on canceled download here since we can't do anything
}
.launchIn(scope)
}
).first.close()
} finally {
// always cancel the page progress job even if it throws an exception to avoid memory leaks
pageProgressJob?.cancel()
}
// TODO: retry on error with 2,4,8 seconds of wait // TODO: retry on error with 2,4,8 seconds of wait
// TODO: download multiple pages at once, possible solution: rx observer's strategy is used in Tachiyomi download.progress = ((pageNum + 1).toFloat()) / pageCount
// TODO: fine grained download percentage step(download)
download.progress = (pageNum + 1).toFloat() / pageCount
step()
} }
download.state = Finished download.state = Finished
transaction { transaction {
@@ -65,20 +128,22 @@ class Downloader(private val downloadQueue: CopyOnWriteArrayList<DownloadChapter
it[isDownloaded] = true it[isDownloaded] = true
} }
} }
step() step(download)
downloadQueue.removeIf { it.mangaId == download.mangaId && it.chapterIndex == download.chapterIndex } downloadQueue.removeIf { it.mangaId == download.mangaId && it.chapterIndex == download.chapterIndex }
step() step(null)
} catch (e: DownloadShouldStopException) { } catch (e: CancellationException) {
logger.debug("Downloader was stopped") logger.debug("Downloader was stopped")
downloadQueue.filter { it.state == Downloading }.forEach { it.state = Queued } downloadQueue.filter { it.state == Downloading }.forEach { it.state = Queued }
} catch (e: PauseDownloadException) {
download.state = Queued
} catch (e: Exception) { } catch (e: Exception) {
logger.debug("Downloader faced an exception") logger.info("Downloader faced an exception", e)
downloadQueue.filter { it.state == Downloading }.forEach { it.state = Error; it.tries++ } download.tries++
e.printStackTrace() download.state = Error
} finally { } finally {
notifier() notifier()
} }
} while (!shouldStop) }
} }
} }