Add thread and coroutine log threading handler
All checks were successful
PRs & Pushes / test (push) Successful in 2m32s
PRs & Pushes / build-jars (push) Successful in 3m19s
PRs & Pushes / build-apidoc (push) Successful in 3m20s

This commit is contained in:
JeremyStar™ 2024-12-30 00:43:06 +01:00
parent c20933f835
commit 2f9700e28d
Signed by: JeremyStarTM
GPG key ID: E366BAEF67E4704D
3 changed files with 256 additions and 0 deletions

View file

@ -0,0 +1,125 @@
/*
* STAROPENSOURCE ENGINE SOURCE FILE
* Copyright (c) 2024 The StarOpenSource Engine Authors
* Licensed under the GNU General Public License v3.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package de.staropensource.engine.base.implementation.logging.threading
import de.staropensource.engine.base.Engine.Companion.logger
import de.staropensource.engine.base.EngineConfiguration
import de.staropensource.engine.base.implementable.logging.ThreadingHandler
import de.staropensource.engine.base.logging.Processor
import de.staropensource.engine.base.type.logging.Call
import kotlinx.coroutines.*
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlin.system.measureTimeMillis
/**
* Uses a coroutine for processing log calls.
*
* @since v1-alpha10
*/
@Suppress("Unused")
class CoroutineThreadingHandler : ThreadingHandler {
/**
* Determines whether the
* coroutine should be active.
*
* @since v1-alpha10
*/
var active: Boolean = false
private set
/**
* Contains the coroutine job.
*
* @since v1-alpha10
*/
private var coroutine: Job? = null
/**
* A [Mutex] protecting the [queue].
*
* @since v1-alpha10
*/
private val queueMutex: Mutex = Mutex(false)
/**
* Contains all queued log [Call]s.
*
* @since v1-alpha10
*/
private val queue: LinkedHashSet<Call> = linkedSetOf()
@OptIn(DelicateCoroutinesApi::class)
override fun start() {
active = true
GlobalScope.launch {
var time: ULong
var waitTime: ULong
while (active) {
time = measureTimeMillis { flushSuspend() }.toULong()
if (EngineConfiguration.logThreadingPollDelay > 0u)
if (time > EngineConfiguration.logThreadingPollDelay) {
logger.warn("Logging coroutine is unable to keep up! Processing for last message batch took ${time}ms, which is longer than the configured logThreadingPollDelay of ${EngineConfiguration.logThreadingPollDelay}ms")
waitTime = 0u
} else
waitTime = EngineConfiguration.logThreadingPollDelay.minus(time)
else
waitTime = 0u
if (waitTime > 0u)
delay(waitTime.toLong())
}
}
}
override fun stop() {
active = false
while (coroutine!!.isActive)
Thread.onSpinWait()
}
override fun queue(call: Call) {
runBlocking {
queueMutex.withLock {
queue.add(call)
}
}
}
override fun flush() {
runBlocking { flushSuspend() }
}
private suspend fun flushSuspend() {
val queue: LinkedHashSet<Call>
queueMutex.withLock {
@Suppress("UNCHECKED_CAST")
queue = this@CoroutineThreadingHandler.queue.clone() as LinkedHashSet<Call>
this@CoroutineThreadingHandler.queue.clear()
}
for (call: Call in queue)
Processor.process(call)
}
}

View file

@ -0,0 +1,129 @@
/*
* STAROPENSOURCE ENGINE SOURCE FILE
* Copyright (c) 2024 The StarOpenSource Engine Authors
* Licensed under the GNU General Public License v3.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/
package de.staropensource.engine.base.implementation.logging.threading
import de.staropensource.engine.base.Engine.Companion.logger
import de.staropensource.engine.base.EngineConfiguration
import de.staropensource.engine.base.annotation.NonKotlinContact
import de.staropensource.engine.base.implementable.logging.ThreadingHandler
import de.staropensource.engine.base.logging.Processor
import de.staropensource.engine.base.type.logging.Call
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlin.system.measureTimeMillis
/**
* Uses a thread for processing log calls.
*
* @since v1-alpha10
*/
@NonKotlinContact
@Suppress("Unused")
class NativeThreadingHandler : ThreadingHandler {
/**
* Determines whether the logging
* thread should be active.
*
* @since v1-alpha10
*/
var active: Boolean = false
private set
/**
* Contains the logging thread.
*
* @since v1-alpha10
*/
private var loggingThread: Thread? = null
/**
* A [Mutex] protecting the [queue].
*
* @since v1-alpha10
*/
private val queueMutex: Mutex = Mutex(false)
/**
* Contains all queued log [Call]s.
*
* @since v1-alpha10
*/
private val queue: LinkedHashSet<Call> = linkedSetOf()
override fun start() {
active = true
loggingThread = Thread
.ofPlatform()
.daemon()
.name("Logging thread")
.uncaughtExceptionHandler { thread, throwable -> logger.crash("Logging thread terminated unexpectedly", throwable) }
.start {
var time: ULong
var waitTime: ULong
while (active) {
time = measureTimeMillis { flush() }.toULong()
if (EngineConfiguration.logThreadingPollDelay > 0u)
if (time > EngineConfiguration.logThreadingPollDelay) {
logger.warn("Logging thread is unable to keep up! Processing for last message batch took ${time}ms, which is longer than the configured logThreadingPollDelay of ${EngineConfiguration.logThreadingPollDelay}ms")
waitTime = 0u
} else
waitTime = EngineConfiguration.logThreadingPollDelay.minus(time)
else
waitTime = 0u
if (waitTime > 0u)
Thread.sleep(waitTime.toLong())
}
}
}
override fun stop() {
active = false
while (loggingThread!!.isAlive)
Thread.onSpinWait()
}
override fun queue(call: Call) {
runBlocking {
queueMutex.withLock {
queue.add(call)
}
}
}
override fun flush() {
runBlocking {
val queue: LinkedHashSet<Call>
queueMutex.withLock {
@Suppress("UNCHECKED_CAST")
queue = this@NativeThreadingHandler.queue.clone() as LinkedHashSet<Call>
this@NativeThreadingHandler.queue.clear()
}
for (call: Call in queue)
Processor.process(call)
}
}
}

View file

@ -22,6 +22,7 @@ package de.staropensource.engine.testapp
import de.staropensource.engine.ansi.AnsiSubsystem
import de.staropensource.engine.base.Engine
import de.staropensource.engine.base.EngineConfiguration
import de.staropensource.engine.base.implementation.logging.threading.CoroutineThreadingHandler
import de.staropensource.engine.base.logging.Logger
import de.staropensource.engine.base.type.logging.Level
@ -55,6 +56,7 @@ class Main private constructor() {
fun main(arguments: Array<String>) {
// Update engine configuration
EngineConfiguration.logLevels = Level.entries.toMutableSet()
EngineConfiguration.logThreadingHandler = CoroutineThreadingHandler()
// Register subsystems
AnsiSubsystem.register()