From 2f9700e28dd8940f5e4bcb156fdf29709841f057 Mon Sep 17 00:00:00 2001 From: JeremyStarTM Date: Mon, 30 Dec 2024 00:43:06 +0100 Subject: [PATCH] Add thread and coroutine log threading handler --- .../threading/CoroutineThreadingHandler.kt | 125 +++++++++++++++++ .../threading/NativeThreadingHandler.kt | 129 ++++++++++++++++++ .../de/staropensource/engine/testapp/Main.kt | 2 + 3 files changed, 256 insertions(+) create mode 100644 base/src/main/kotlin/de/staropensource/engine/base/implementation/logging/threading/CoroutineThreadingHandler.kt create mode 100644 base/src/main/kotlin/de/staropensource/engine/base/implementation/logging/threading/NativeThreadingHandler.kt diff --git a/base/src/main/kotlin/de/staropensource/engine/base/implementation/logging/threading/CoroutineThreadingHandler.kt b/base/src/main/kotlin/de/staropensource/engine/base/implementation/logging/threading/CoroutineThreadingHandler.kt new file mode 100644 index 0000000..b81ec22 --- /dev/null +++ b/base/src/main/kotlin/de/staropensource/engine/base/implementation/logging/threading/CoroutineThreadingHandler.kt @@ -0,0 +1,125 @@ +/* + * STAROPENSOURCE ENGINE SOURCE FILE + * Copyright (c) 2024 The StarOpenSource Engine Authors + * Licensed under the GNU General Public License v3. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package de.staropensource.engine.base.implementation.logging.threading + +import de.staropensource.engine.base.Engine.Companion.logger +import de.staropensource.engine.base.EngineConfiguration +import de.staropensource.engine.base.implementable.logging.ThreadingHandler +import de.staropensource.engine.base.logging.Processor +import de.staropensource.engine.base.type.logging.Call +import kotlinx.coroutines.* +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock +import kotlin.system.measureTimeMillis + +/** + * Uses a coroutine for processing log calls. + * + * @since v1-alpha10 + */ +@Suppress("Unused") +class CoroutineThreadingHandler : ThreadingHandler { + /** + * Determines whether the + * coroutine should be active. + * + * @since v1-alpha10 + */ + var active: Boolean = false + private set + + /** + * Contains the coroutine job. + * + * @since v1-alpha10 + */ + private var coroutine: Job? = null + + /** + * A [Mutex] protecting the [queue]. + * + * @since v1-alpha10 + */ + private val queueMutex: Mutex = Mutex(false) + + /** + * Contains all queued log [Call]s. + * + * @since v1-alpha10 + */ + private val queue: LinkedHashSet = linkedSetOf() + + + @OptIn(DelicateCoroutinesApi::class) + override fun start() { + active = true + GlobalScope.launch { + var time: ULong + var waitTime: ULong + while (active) { + time = measureTimeMillis { flushSuspend() }.toULong() + + if (EngineConfiguration.logThreadingPollDelay > 0u) + if (time > EngineConfiguration.logThreadingPollDelay) { + logger.warn("Logging coroutine is unable to keep up! Processing for last message batch took ${time}ms, which is longer than the configured logThreadingPollDelay of ${EngineConfiguration.logThreadingPollDelay}ms") + waitTime = 0u + } else + waitTime = EngineConfiguration.logThreadingPollDelay.minus(time) + else + waitTime = 0u + + if (waitTime > 0u) + delay(waitTime.toLong()) + } + } + } + + override fun stop() { + active = false + + while (coroutine!!.isActive) + Thread.onSpinWait() + } + + override fun queue(call: Call) { + runBlocking { + queueMutex.withLock { + queue.add(call) + } + } + } + + override fun flush() { + runBlocking { flushSuspend() } + } + + private suspend fun flushSuspend() { + val queue: LinkedHashSet + + queueMutex.withLock { + @Suppress("UNCHECKED_CAST") + queue = this@CoroutineThreadingHandler.queue.clone() as LinkedHashSet + this@CoroutineThreadingHandler.queue.clear() + } + + for (call: Call in queue) + Processor.process(call) + } +} diff --git a/base/src/main/kotlin/de/staropensource/engine/base/implementation/logging/threading/NativeThreadingHandler.kt b/base/src/main/kotlin/de/staropensource/engine/base/implementation/logging/threading/NativeThreadingHandler.kt new file mode 100644 index 0000000..4faa50e --- /dev/null +++ b/base/src/main/kotlin/de/staropensource/engine/base/implementation/logging/threading/NativeThreadingHandler.kt @@ -0,0 +1,129 @@ +/* + * STAROPENSOURCE ENGINE SOURCE FILE + * Copyright (c) 2024 The StarOpenSource Engine Authors + * Licensed under the GNU General Public License v3. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +package de.staropensource.engine.base.implementation.logging.threading + +import de.staropensource.engine.base.Engine.Companion.logger +import de.staropensource.engine.base.EngineConfiguration +import de.staropensource.engine.base.annotation.NonKotlinContact +import de.staropensource.engine.base.implementable.logging.ThreadingHandler +import de.staropensource.engine.base.logging.Processor +import de.staropensource.engine.base.type.logging.Call +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock +import kotlin.system.measureTimeMillis + +/** + * Uses a thread for processing log calls. + * + * @since v1-alpha10 + */ +@NonKotlinContact +@Suppress("Unused") +class NativeThreadingHandler : ThreadingHandler { + /** + * Determines whether the logging + * thread should be active. + * + * @since v1-alpha10 + */ + var active: Boolean = false + private set + + /** + * Contains the logging thread. + * + * @since v1-alpha10 + */ + private var loggingThread: Thread? = null + + /** + * A [Mutex] protecting the [queue]. + * + * @since v1-alpha10 + */ + private val queueMutex: Mutex = Mutex(false) + + /** + * Contains all queued log [Call]s. + * + * @since v1-alpha10 + */ + private val queue: LinkedHashSet = linkedSetOf() + + + override fun start() { + active = true + loggingThread = Thread + .ofPlatform() + .daemon() + .name("Logging thread") + .uncaughtExceptionHandler { thread, throwable -> logger.crash("Logging thread terminated unexpectedly", throwable) } + .start { + var time: ULong + var waitTime: ULong + while (active) { + time = measureTimeMillis { flush() }.toULong() + + if (EngineConfiguration.logThreadingPollDelay > 0u) + if (time > EngineConfiguration.logThreadingPollDelay) { + logger.warn("Logging thread is unable to keep up! Processing for last message batch took ${time}ms, which is longer than the configured logThreadingPollDelay of ${EngineConfiguration.logThreadingPollDelay}ms") + waitTime = 0u + } else + waitTime = EngineConfiguration.logThreadingPollDelay.minus(time) + else + waitTime = 0u + + if (waitTime > 0u) + Thread.sleep(waitTime.toLong()) + } + } + } + + override fun stop() { + active = false + + while (loggingThread!!.isAlive) + Thread.onSpinWait() + } + + override fun queue(call: Call) { + runBlocking { + queueMutex.withLock { + queue.add(call) + } + } + } + + override fun flush() { + runBlocking { + val queue: LinkedHashSet + + queueMutex.withLock { + @Suppress("UNCHECKED_CAST") + queue = this@NativeThreadingHandler.queue.clone() as LinkedHashSet + this@NativeThreadingHandler.queue.clear() + } + + for (call: Call in queue) + Processor.process(call) + } + } +} diff --git a/testapp/src/main/kotlin/de/staropensource/engine/testapp/Main.kt b/testapp/src/main/kotlin/de/staropensource/engine/testapp/Main.kt index 61183e6..b675d92 100644 --- a/testapp/src/main/kotlin/de/staropensource/engine/testapp/Main.kt +++ b/testapp/src/main/kotlin/de/staropensource/engine/testapp/Main.kt @@ -22,6 +22,7 @@ package de.staropensource.engine.testapp import de.staropensource.engine.ansi.AnsiSubsystem import de.staropensource.engine.base.Engine import de.staropensource.engine.base.EngineConfiguration +import de.staropensource.engine.base.implementation.logging.threading.CoroutineThreadingHandler import de.staropensource.engine.base.logging.Logger import de.staropensource.engine.base.type.logging.Level @@ -55,6 +56,7 @@ class Main private constructor() { fun main(arguments: Array) { // Update engine configuration EngineConfiguration.logLevels = Level.entries.toMutableSet() + EngineConfiguration.logThreadingHandler = CoroutineThreadingHandler() // Register subsystems AnsiSubsystem.register()