mirror of
https://github.com/Suwayomi/Suwayomi-Server.git
synced 2026-07-04 03:14:40 -05:00
Fix/ha scheduler rescheduling ha tasks (#613)
* Correctly set the "firstExecutionTime" of a "HATask" In case an initial delay is used for "Timer::scheduleAtFixedRate" (e.g. when rescheduling) then the "firstExecutionTime" of the "HATask" was incorrect, since it considered the first execution to be based on the actual interval. This caused - calculations for execution times (e.g. "timeToNextExecution", "nextExecutionTime") to be incorrect - the ordering of the "scheduledTasks" queue to be incorrect * Add logging * Do not modify queue during forEach loop Caused a "ConcurrentModificationException" and broke the system suspension detection due to the unhandled exception canceling the task * Log all uncaught exceptions In case an exception is uncaught/unhandled, it only gets logged in the console, but is not included in the log file. E.g. the "HAScheduler::scheduleHibernateCheckerTask" task caused an unhandled "ConcurrentModificationException" which caused the task to get dropped. In the log files this error could not be seen and thus, analysing the issue of the suspension detection to stop working was not possible via the logs * Schedule "HATask" immediately when its last execution was missed The missed execution was never triggered * Calculate the "HATask" "last execution time" correctly When scheduling a task for the first time, the "first execution time" is in the future. This time is used for by all functions calculating times for this task (e.g. next/last execution time). In case the first execution didn't happen yet and the current time, would have been an "execution time" based on the interval, the "hibernation detection" would trigger for this task, since it would think that the last execution was missed, due to the "last execution" being in the future. To prevent this, it has to be made sure, that the "last execution time" is in the past.
This commit is contained in:
@@ -61,6 +61,11 @@ val systemTrayInstance by lazy { systemTray() }
|
|||||||
val androidCompat by lazy { AndroidCompat() }
|
val androidCompat by lazy { AndroidCompat() }
|
||||||
|
|
||||||
fun applicationSetup() {
|
fun applicationSetup() {
|
||||||
|
Thread.setDefaultUncaughtExceptionHandler {
|
||||||
|
_, throwable ->
|
||||||
|
KotlinLogging.logger { }.error(throwable) { "unhandled exception" }
|
||||||
|
}
|
||||||
|
|
||||||
// register Tachidesk's config which is dubbed "ServerConfig"
|
// register Tachidesk's config which is dubbed "ServerConfig"
|
||||||
GlobalConfigManager.registerModule(
|
GlobalConfigManager.registerModule(
|
||||||
ServerConfig.register { GlobalConfigManager.config }
|
ServerConfig.register { GlobalConfigManager.config }
|
||||||
|
|||||||
@@ -10,6 +10,7 @@ import it.sauronsoftware.cron4j.Task
|
|||||||
import it.sauronsoftware.cron4j.TaskExecutionContext
|
import it.sauronsoftware.cron4j.TaskExecutionContext
|
||||||
import mu.KotlinLogging
|
import mu.KotlinLogging
|
||||||
import java.time.ZonedDateTime
|
import java.time.ZonedDateTime
|
||||||
|
import java.util.Date
|
||||||
import java.util.PriorityQueue
|
import java.util.PriorityQueue
|
||||||
import java.util.Timer
|
import java.util.Timer
|
||||||
import java.util.TimerTask
|
import java.util.TimerTask
|
||||||
@@ -30,6 +31,10 @@ abstract class BaseHATask(val id: String, val execute: () -> Unit, val name: Str
|
|||||||
override fun compareTo(other: BaseHATask): Int {
|
override fun compareTo(other: BaseHATask): Int {
|
||||||
return getTimeToNextExecution().compareTo(other.getTimeToNextExecution())
|
return getTimeToNextExecution().compareTo(other.getTimeToNextExecution())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
override fun toString(): String {
|
||||||
|
return "Task \"$name\" ($id) lastExecution= ${Date(getLastExecutionTime())} nextExecution= ${Date(getNextExecutionTime())}"
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class HACronTask(id: String, val cronExpr: String, execute: () -> Unit, name: String?) : BaseHATask(id, execute, name) {
|
class HACronTask(id: String, val cronExpr: String, execute: () -> Unit, name: String?) : BaseHATask(id, execute, name) {
|
||||||
@@ -46,10 +51,14 @@ class HACronTask(id: String, val cronExpr: String, execute: () -> Unit, name: St
|
|||||||
override fun getTimeToNextExecution(): Long {
|
override fun getTimeToNextExecution(): Long {
|
||||||
return executionTime.timeToNextExecution(ZonedDateTime.now()).get().toMillis()
|
return executionTime.timeToNextExecution(ZonedDateTime.now()).get().toMillis()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
override fun toString(): String {
|
||||||
|
return "${super.toString()} interval= $cronExpr"
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class HATask(id: String, val interval: Long, execute: () -> Unit, val timerTask: TimerTask, name: String?) : BaseHATask(id, execute, name) {
|
class HATask(id: String, val interval: Long, execute: () -> Unit, val timerTask: TimerTask, name: String?, val initialDelay: Long = interval) : BaseHATask(id, execute, name) {
|
||||||
private val firstExecutionTime = System.currentTimeMillis() + interval
|
private val firstExecutionTime = System.currentTimeMillis() + initialDelay
|
||||||
|
|
||||||
private fun getElapsedTimeOfCurrentInterval(): Long {
|
private fun getElapsedTimeOfCurrentInterval(): Long {
|
||||||
val timeSinceFirstExecution = System.currentTimeMillis() - firstExecutionTime
|
val timeSinceFirstExecution = System.currentTimeMillis() - firstExecutionTime
|
||||||
@@ -57,7 +66,13 @@ class HATask(id: String, val interval: Long, execute: () -> Unit, val timerTask:
|
|||||||
}
|
}
|
||||||
|
|
||||||
override fun getLastExecutionTime(): Long {
|
override fun getLastExecutionTime(): Long {
|
||||||
return System.currentTimeMillis() - getElapsedTimeOfCurrentInterval()
|
var lastExecutionTime = System.currentTimeMillis() - getElapsedTimeOfCurrentInterval()
|
||||||
|
|
||||||
|
while (lastExecutionTime > System.currentTimeMillis()) {
|
||||||
|
lastExecutionTime -= interval
|
||||||
|
}
|
||||||
|
|
||||||
|
return lastExecutionTime
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun getNextExecutionTime(): Long {
|
override fun getNextExecutionTime(): Long {
|
||||||
@@ -67,6 +82,10 @@ class HATask(id: String, val interval: Long, execute: () -> Unit, val timerTask:
|
|||||||
override fun getTimeToNextExecution(): Long {
|
override fun getTimeToNextExecution(): Long {
|
||||||
return interval - getElapsedTimeOfCurrentInterval()
|
return interval - getElapsedTimeOfCurrentInterval()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
override fun toString(): String {
|
||||||
|
return "${super.toString()} interval= $interval, initialDelay= $initialDelay"
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -101,18 +120,17 @@ object HAScheduler {
|
|||||||
val systemWasInHibernation = elapsedTime > interval.inWholeMilliseconds + HIBERNATION_THRESHOLD
|
val systemWasInHibernation = elapsedTime > interval.inWholeMilliseconds + HIBERNATION_THRESHOLD
|
||||||
if (systemWasInHibernation) {
|
if (systemWasInHibernation) {
|
||||||
logger.debug { "System hibernation detected, task was delayed by ${elapsedTime - interval.inWholeMilliseconds}ms" }
|
logger.debug { "System hibernation detected, task was delayed by ${elapsedTime - interval.inWholeMilliseconds}ms" }
|
||||||
scheduledTasks.forEach {
|
scheduledTasks.toList().forEach {
|
||||||
val wasLastExecutionMissed = currentTime - it.getLastExecutionTime() - elapsedTime < 0
|
val wasLastExecutionMissed = currentTime - it.getLastExecutionTime() - elapsedTime < 0
|
||||||
if (wasLastExecutionMissed) {
|
if (wasLastExecutionMissed) {
|
||||||
logger.debug { "Task \"${it.name ?: it.id}\" missed its execution, executing now..." }
|
logger.debug { "$it missed its execution, executing now..." }
|
||||||
|
|
||||||
when (it) {
|
when (it) {
|
||||||
is HATask -> reschedule(it.id, it.interval)
|
is HATask -> reschedule(it.id, it.interval)
|
||||||
is HACronTask -> {
|
is HACronTask -> rescheduleCron(it.id, it.cronExpr)
|
||||||
rescheduleCron(it.id, it.cronExpr)
|
|
||||||
it.execute()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
it.execute()
|
||||||
}
|
}
|
||||||
|
|
||||||
// queue is ordered by next execution time, thus, loop can be exited early
|
// queue is ordered by next execution time, thus, loop can be exited early
|
||||||
@@ -154,10 +172,13 @@ object HAScheduler {
|
|||||||
|
|
||||||
fun schedule(execute: () -> Unit, interval: Long, delay: Long, name: String?): String {
|
fun schedule(execute: () -> Unit, interval: Long, delay: Long, name: String?): String {
|
||||||
val taskId = UUID.randomUUID().toString()
|
val taskId = UUID.randomUUID().toString()
|
||||||
val task = createTimerTask(interval, execute)
|
val timerTask = createTimerTask(interval, execute)
|
||||||
|
|
||||||
scheduledTasks.add(HATask(taskId, interval, execute, task, name))
|
val task = HATask(taskId, interval, execute, timerTask, name, delay)
|
||||||
timer.scheduleAtFixedRate(task, delay, interval)
|
scheduledTasks.add(task)
|
||||||
|
timer.scheduleAtFixedRate(timerTask, delay, interval)
|
||||||
|
|
||||||
|
logger.debug { "schedule: $task" }
|
||||||
|
|
||||||
return taskId
|
return taskId
|
||||||
}
|
}
|
||||||
@@ -167,6 +188,8 @@ object HAScheduler {
|
|||||||
task.timerTask.cancel()
|
task.timerTask.cancel()
|
||||||
scheduledTasks.remove(task)
|
scheduledTasks.remove(task)
|
||||||
|
|
||||||
|
logger.debug { "deschedule: $task" }
|
||||||
|
|
||||||
return task
|
return task
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -179,8 +202,11 @@ object HAScheduler {
|
|||||||
val intervalDifference = interval - task.interval
|
val intervalDifference = interval - task.interval
|
||||||
val remainingTimeTillNextExecution = (timeToNextExecution + intervalDifference).coerceAtLeast(0)
|
val remainingTimeTillNextExecution = (timeToNextExecution + intervalDifference).coerceAtLeast(0)
|
||||||
|
|
||||||
scheduledTasks.add(HATask(taskId, interval, task.execute, timerTask, task.name))
|
val updatedTask = HATask(taskId, interval, task.execute, timerTask, task.name, initialDelay = remainingTimeTillNextExecution)
|
||||||
|
scheduledTasks.add(updatedTask)
|
||||||
timer.scheduleAtFixedRate(timerTask, remainingTimeTillNextExecution, interval)
|
timer.scheduleAtFixedRate(timerTask, remainingTimeTillNextExecution, interval)
|
||||||
|
|
||||||
|
logger.debug { "reschedule: new= $updatedTask, old= $task" }
|
||||||
}
|
}
|
||||||
|
|
||||||
fun scheduleCron(execute: () -> Unit, cronExpr: String, name: String?): String {
|
fun scheduleCron(execute: () -> Unit, cronExpr: String, name: String?): String {
|
||||||
@@ -197,22 +223,31 @@ object HAScheduler {
|
|||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
scheduledTasks.add(HACronTask(taskId, cronExpr, execute, name))
|
val task = HACronTask(taskId, cronExpr, execute, name)
|
||||||
|
scheduledTasks.add(task)
|
||||||
|
|
||||||
|
logger.debug { "scheduleCron: $task" }
|
||||||
|
|
||||||
return taskId
|
return taskId
|
||||||
}
|
}
|
||||||
|
|
||||||
fun descheduleCron(taskId: String) {
|
fun descheduleCron(taskId: String) {
|
||||||
scheduler.deschedule(taskId)
|
scheduler.deschedule(taskId)
|
||||||
scheduledTasks.removeIf { it.id == taskId }
|
val task = scheduledTasks.find { it.id == taskId } ?: return
|
||||||
|
scheduledTasks.remove(task)
|
||||||
|
|
||||||
|
logger.debug { "descheduleCron: $task" }
|
||||||
}
|
}
|
||||||
|
|
||||||
fun rescheduleCron(taskId: String, cronExpr: String) {
|
fun rescheduleCron(taskId: String, cronExpr: String) {
|
||||||
val task = scheduledTasks.find { it.id == taskId } ?: return
|
val task = scheduledTasks.find { it.id == taskId } ?: return
|
||||||
|
|
||||||
|
val updatedTask = HACronTask(taskId, cronExpr, task.execute, task.name)
|
||||||
scheduledTasks.remove(task)
|
scheduledTasks.remove(task)
|
||||||
scheduledTasks.add(HACronTask(taskId, cronExpr, task.execute, task.name))
|
scheduledTasks.add(updatedTask)
|
||||||
|
|
||||||
scheduler.reschedule(taskId, cronExpr)
|
scheduler.reschedule(taskId, cronExpr)
|
||||||
|
|
||||||
|
logger.debug { "rescheduleCron: new= $updatedTask, old= $task" }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user