From 14cf028bfcb78451bf9be7035f366e62c6f11ec4 Mon Sep 17 00:00:00 2001 From: JeremyStarTM Date: Fri, 27 Dec 2024 23:52:56 +0100 Subject: [PATCH] Add stream piping, redirecting and flushing --- .../base/implementable/stream/ReadStream.kt | 6 +- .../base/implementable/stream/Stream.kt | 389 ++++++++++++++---- .../base/implementable/stream/WriteStream.kt | 8 +- .../base/implementation/stream/ByteStream.kt | 2 + .../implementation/stream/ByteWriteStream.kt | 6 +- .../implementation/stream/FileAccessStream.kt | 2 + .../implementation/stream/LoggerStream.kt | 14 +- .../base/implementation/stream/NullStream.kt | 1 + 8 files changed, 346 insertions(+), 82 deletions(-) diff --git a/base/src/main/kotlin/de/staropensource/engine/base/implementable/stream/ReadStream.kt b/base/src/main/kotlin/de/staropensource/engine/base/implementable/stream/ReadStream.kt index 28b9f35..c70be96 100644 --- a/base/src/main/kotlin/de/staropensource/engine/base/implementable/stream/ReadStream.kt +++ b/base/src/main/kotlin/de/staropensource/engine/base/implementable/stream/ReadStream.kt @@ -24,7 +24,11 @@ package de.staropensource.engine.base.implementable.stream * * @since v1-alpha10 */ -abstract class ReadStream : Stream(streamMode = StreamMode.READ) { +abstract class ReadStream : Stream( + streamMode = StreamMode.READ, + autoFlushAfter = 0UL, +) { override fun writeByte(byte: Byte): Stream = this override fun writeBytes(bytes: ByteArray): Stream = this + override fun flush(): Stream = this } diff --git a/base/src/main/kotlin/de/staropensource/engine/base/implementable/stream/Stream.kt b/base/src/main/kotlin/de/staropensource/engine/base/implementable/stream/Stream.kt index 26bbcab..923aae4 100644 --- a/base/src/main/kotlin/de/staropensource/engine/base/implementable/stream/Stream.kt +++ b/base/src/main/kotlin/de/staropensource/engine/base/implementable/stream/Stream.kt @@ -19,8 +19,11 @@ package de.staropensource.engine.base.implementable.stream +import de.staropensource.engine.base.Engine.Companion.logger import de.staropensource.engine.base.annotation.NonKotlinContact import de.staropensource.engine.base.exception.io.IOAccessException +import de.staropensource.engine.base.implementation.stream.JavaReadStream +import de.staropensource.engine.base.implementation.stream.JavaWriteStream import java.io.IOException import java.io.InputStream import java.io.OutputStream @@ -29,9 +32,14 @@ import java.io.OutputStream * Makes streaming data easy. * * @param streamMode supported [StreamMode]s + * @param autoFlushAfter after which amount of bytes to automatically flush. May be ignored by some implementations. Implementations should expose this argument in their constructor * @since v1-alpha10 */ -abstract class Stream(val streamMode: StreamMode) : AutoCloseable { +@Suppress("Unused") +abstract class Stream( + val streamMode: StreamMode, + val autoFlushAfter: ULong = 100UL, +) : AutoCloseable { /** * Companion object of [Stream]. * @@ -45,26 +53,9 @@ abstract class Stream(val streamMode: StreamMode) : AutoCloseable { * @see stdin(3) * @since v1-alpha10 */ - val standardInput: ReadStream = object : ReadStream() { - /** - * Contains the Java [InputStream] - * provided by [System.in]. - * - * @since v1-alpha10 - */ - val stream: InputStream = System.`in` - - // -----> Closure + val standardInput: ReadStream = object : JavaReadStream(System.`in`) { override fun close() = Unit override fun closeStream() = Unit - - - // -----> Reading - override fun remaining(): Boolean? = null - override fun available(): UInt = stream.available().toUInt() - override fun readNextByte(): Byte? = stream.read().toByte() - override fun readNBytes(n: UInt): ByteArray = stream.readNBytes(n.toInt()) - override fun readRemainingBytes(): ByteArray = stream.readAllBytes() } /** @@ -74,29 +65,9 @@ abstract class Stream(val streamMode: StreamMode) : AutoCloseable { * @see stdout(3) * @since v1-alpha10 */ - val standardOutput: WriteStream = object : WriteStream() { - /** - * Contains the Java [OutputStream] - * provided by [System.out]. - * - * @since v1-alpha10 - */ - val stream: OutputStream = System.out - - // -----> Closure + val standardOutput: WriteStream = object : JavaWriteStream(System.out) { override fun close() = Unit override fun closeStream() = Unit - - - // -----> Writing - override fun writeByte(byte: Byte): Stream { - stream.write(byte.toInt()) - return this - } - override fun writeBytes(bytes: ByteArray): Stream { - stream.write(bytes) - return this - } } /** @@ -105,29 +76,9 @@ abstract class Stream(val streamMode: StreamMode) : AutoCloseable { * @see stderr(3) * @since v1-alpha10 */ - val standardError: WriteStream = object : WriteStream() { - /** - * Contains the Java [OutputStream] - * provided by [System.err]. - * - * @since v1-alpha10 - */ - val stream: OutputStream = System.err - - // -----> Closure + val standardError: WriteStream = object : JavaWriteStream(System.err) { override fun close() = Unit override fun closeStream() = Unit - - - // -----> Writing - override fun writeByte(byte: Byte): Stream { - stream.write(byte.toInt()) - return this - } - override fun writeBytes(bytes: ByteArray): Stream { - stream.write(bytes) - return this - } } } @@ -141,6 +92,46 @@ abstract class Stream(val streamMode: StreamMode) : AutoCloseable { var closed: Boolean = false private set + /** + * Indicates whether this stream + * is currently being [watch]ed. + * + * @since v1-alpha10 + */ + var watching: Boolean = false + private set + + /** + * Contains a list of [Stream]s + * to redirect writes to. + * + * Redirecting is only performed + * upon invoking either [writeByte], + * [writeBytes], [writeString] or + * [write]. + * + * @see pipes + * @since v1-alpha10 + */ + val redirects: MutableList = mutableListOf() + + /** + * Contains a list of [Stream]s + * to pipe reads to. + * + * Piping is only performed upon + * invoking either [readNextByte], + * [readNBytes] or + * [readRemainingBytes]. + * + * To watch for changes, see [watch]. + * + * @see redirects + * @see watch + * @since v1-alpha10 + */ + val pipes: MutableList = mutableListOf() + /** * Contains the Java [InputStream] * for this [Stream] instance. @@ -199,6 +190,12 @@ abstract class Stream(val streamMode: StreamMode) : AutoCloseable { * override fun writeByte(byte: Byte): Stream = writeBytes(byteArrayOf(byte)) * ``` * + * Implementations should make sure + * to call [internalRedirect] before + * returning from this method to + * ensure that the written [Byte] + * is piped to all registered [pipes]. + * * @param byte [Byte] to write * @return this instance * @throws IOAccessException on IO error @@ -211,6 +208,12 @@ abstract class Stream(val streamMode: StreamMode) : AutoCloseable { * Writes (or rather: appends) * the specified [Byte]s. * + * Implementations should make sure + * to call [internalRedirect] before + * returning from this method to + * ensure that the written [Byte]s + * is piped to all registered [pipes]. + * * @param bytes [Byte]s to write * @return this instance * @throws IOAccessException on IO error @@ -243,6 +246,17 @@ abstract class Stream(val streamMode: StreamMode) : AutoCloseable { @Throws(IOAccessException::class) fun write(value: Any): Stream = writeBytes(value.toString().toByteArray()) + /** + * Flushes all currently + * buffered bytes out. + * + * @return this instance + * @throws IOAccessException on IO error + * @since v1-alpha10 + */ + @Throws(IOAccessException::class) + abstract fun flush(): Stream + // -----> Reading /** @@ -278,6 +292,8 @@ abstract class Stream(val streamMode: StreamMode) : AutoCloseable { /** * Reads the next [Byte]. * + * Will not block the execution flow. + * * @return next [Byte] or `null` if no bytes are remaining * @throws IOAccessException on IO error * @since v1-alpha10 @@ -288,6 +304,9 @@ abstract class Stream(val streamMode: StreamMode) : AutoCloseable { /** * Reads the specified next [n] [Byte]s. * + * May block the execution flow + * until [n] bytes have been read. + * * @param n [Byte]s to read * @return next [n] bytes or an empty array if no bytes are remaining * @throws IOAccessException on IO error @@ -299,6 +318,10 @@ abstract class Stream(val streamMode: StreamMode) : AutoCloseable { /** * Reads all remaining [Byte]s. * + * May block the execution flow + * until all remaining bytes + * have been read. + * * @return remaining [Byte]s or an empty array if no bytes are remaining * @throws IOAccessException on IO error * @since v1-alpha10 @@ -309,6 +332,8 @@ abstract class Stream(val streamMode: StreamMode) : AutoCloseable { /** * Discards the next [Byte]. * + * Will not block the execution flow. + * * @return this instance * @throws IOAccessException on IO error * @since v1-alpha10 @@ -322,6 +347,9 @@ abstract class Stream(val streamMode: StreamMode) : AutoCloseable { /** * Discards the specified next [n] bytes. * + * May block the execution flow + * until [n] bytes have been skipped. + * * @param n [Byte]s to discard * @return this instance * @throws IOAccessException on IO error @@ -336,6 +364,10 @@ abstract class Stream(val streamMode: StreamMode) : AutoCloseable { /** * Discards all remaining [Byte]s. * + * May block the execution flow + * until all remaining bytes + * have been skipped. + * * @return this instance * @throws IOAccessException on IO error * @since v1-alpha10 @@ -347,19 +379,151 @@ abstract class Stream(val streamMode: StreamMode) : AutoCloseable { } - // -----> Redirection + // -----> Watching /** - * Reads the next byte of this - * [Stream] and writes it - * to the specified [Stream]. + * Starts a thread solely responsible + * for reading bytes continuously + * until either [stopWatching] is + * called or this [Stream] is closed. * - * @param stream stream to write to + * This method will not block the + * flow of execution. + * + * Using this method is very useful + * if wanting to only pipe the bytes + * of this [Stream] to other streams. + * + * @param threshold amount of bytes to be read before terminating or `0` to watch indefinitely * @return this instance * @throws IOAccessException on IO error * @since v1-alpha10 */ @Throws(IOAccessException::class) - fun writeNextByteTo(stream: Stream): Stream { + fun watch(): Stream { + if (watching) + return this + + watching = true + + Thread + .ofVirtual() + .name("Stream watch thread for Stream ${javaClass.name}#${hashCode()}") + .uncaughtExceptionHandler { thread, ueh -> logger.crash("Unable to watch Stream ${javaClass.name}#${hashCode()}", throwable = ueh.cause, fatal = true) } + .start { + while (!closed && watching) + skipNextByte() + } + + return this + } + + /** + * Stops the running watching thread. + * + * @return this instance + * @since v1-alpha10 + */ + fun stopWatching(): Stream { + watching = false + return this + } + + + // -----> Piping + /** + * Redirects the specified byte + * to all registered [redirects]. + * + * Make sure to call this method + * before returning from + * [writeByte] and [writeBytes]. + * + * @param byte [Byte] to redirect + * @return supplied [Byte] + * @throws IOAccessException on IO error + * @since v1-alpha10 + */ + @Throws(IOAccessException::class) + protected fun internalRedirect(byte: Byte): Byte { + pipes.forEach { stream -> stream.writeByte(byte) } + return byte + } + + /** + * Redirects the specified bytes + * to all registered [redirects]. + * + * Make sure to call this method + * before returning from + * [writeByte] and [writeBytes]. + * + * @param bytes [ByteArray] to redirect + * @return supplied [ByteArray] + * @throws IOAccessException on IO error + * @since v1-alpha10 + */ + @Throws(IOAccessException::class) + protected fun internalRedirect(bytes: ByteArray): ByteArray { + pipes.forEach { stream -> stream.writeBytes(bytes) } + return bytes + } + + /** + * Pipes the specified byte. + * + * Make sure to call this method + * before returning from + * [readBytes], [readNBytes] + * and [readRemainingBytes]. + * + * @param byte [Byte] to pipe + * @return supplied [Byte] + * @throws IOAccessException on IO error + * @since v1-alpha10 + */ + @Throws(IOAccessException::class) + protected fun internalPipe(byte: Byte): Byte { + pipes.forEach { stream -> stream.writeByte(byte) } + return byte + } + + /** + * Pipes the specified bytes + * to all registered [pipes]. + * + * Make sure to call this method + * before returning from + * [readBytes], [readNBytes] + * and [readRemainingBytes]. + * + * @param bytes [ByteArray] to pipe + * @return supplied [ByteArray] + * @throws IOAccessException on IO error + * @since v1-alpha10 + */ + @Throws(IOAccessException::class) + protected fun internalPipe(bytes: ByteArray): ByteArray { + pipes.forEach { stream -> stream.writeBytes(bytes) } + return bytes + } + + /** + * Reads the next byte of this + * [Stream] and pipes it into + * the specified [Stream]. + * + * Will not block the execution flow. + * + * Will not pipe `null` if + * [readNextByte] returns it. + * + * @param stream [Stream] to write to + * @return this instance + * @throws IOAccessException on IO error + * @since v1-alpha10 + */ + @Throws(IOAccessException::class) + fun pipeNextByteTo(stream: Stream): Stream { val byte: Byte? = readNextByte() if (byte != null) @@ -370,37 +534,110 @@ abstract class Stream(val streamMode: StreamMode) : AutoCloseable { /** * Reads the next [n] byte of this - * [Stream] and writes it - * to the specified [Stream]. + * [Stream] and pipes it into + * the specified [Stream]. * - * @param stream stream to write to + * May block the execution flow + * until [n] bytes have been piped. + * + * @param stream [Stream] to write to * @return this instance * @throws IOAccessException on IO error * @since v1-alpha10 */ @Throws(IOAccessException::class) - fun writeNBytesTo(stream: Stream, n: UInt): Stream { + fun pipeNBytesTo(stream: Stream, n: UInt): Stream { stream.writeBytes(readNBytes(n)) return this } /** * Reads all bytes of this - * [Stream] and writes - * them to the specified - * [Stream]. + * [Stream] and pipes them + * into the specified [Stream]. * - * @param stream stream to write to + * May block the execution flow + * until all remaining bytes + * have been piped. + * + * @param stream [Stream] to write to * @return this instance * @throws IOAccessException on IO error * @since v1-alpha10 */ @Throws(IOAccessException::class) - fun writeRemainingBytesTo(stream: Stream): Stream { + fun pipeRemainingBytesTo(stream: Stream): Stream { stream.writeBytes(readRemainingBytes()) return this } + /** + * Reads the next byte of this + * [Stream] and pipes it into + * the specified [Stream]s. + * + * Will not block the execution flow. + * + * Will not pipe `null` if + * [readNextByte] returns it. + * + * @param streams array of [Stream]s to write to + * @return this instance + * @throws IOAccessException on IO error + * @since v1-alpha10 + */ + @Throws(IOAccessException::class) + fun pipeNextByteTo(streams: Array): Stream { + val byte: Byte? = readNextByte() + + if (byte != null) + streams.forEach { stream -> stream.writeByte(byte) } + + return this + } + + /** + * Reads the next [n] byte of this + * [Stream] and pipes it into + * the specified [Stream]s. + * + * May block the execution flow + * until [n] bytes have been piped. + * + * @param streams array of [Stream]s to write to + * @return this instance + * @throws IOAccessException on IO error + * @since v1-alpha10 + */ + @Throws(IOAccessException::class) + fun pipeNBytesTo(streams: Array, n: UInt): Stream { + val bytes: ByteArray = readNBytes(n) + streams.forEach { stream -> stream.writeBytes(bytes) } + return this + } + + /** + * Reads all bytes of this + * [Stream] and pipes them + * into the specified [Stream]s. + * + * May block the execution flow + * until all remaining bytes + * have been piped. + * + * @param streams array of [Stream]s to write to + * @return this instance + * @throws IOAccessException on IO error + * @since v1-alpha10 + */ + @Throws(IOAccessException::class) + fun pipeRemainingBytesTo(streams: Array): Stream { + val bytes: ByteArray = readRemainingBytes() + + streams.forEach { stream -> stream.writeBytes(bytes) } + return this + } + // -----> Java interoperability /** diff --git a/base/src/main/kotlin/de/staropensource/engine/base/implementable/stream/WriteStream.kt b/base/src/main/kotlin/de/staropensource/engine/base/implementable/stream/WriteStream.kt index 4547724..1a0106b 100644 --- a/base/src/main/kotlin/de/staropensource/engine/base/implementable/stream/WriteStream.kt +++ b/base/src/main/kotlin/de/staropensource/engine/base/implementable/stream/WriteStream.kt @@ -22,9 +22,15 @@ package de.staropensource.engine.base.implementable.stream /** * A write-only [Stream]. * + * @param autoFlushAfter after which amount of bytes to automatically flush. May be ignored by some implementations. Implementations should expose this argument in their constructor * @since v1-alpha10 */ -abstract class WriteStream : Stream(streamMode = StreamMode.WRITE) { +abstract class WriteStream( + autoFlushAfter: ULong = 100UL, +) : Stream( + streamMode = StreamMode.WRITE, + autoFlushAfter = autoFlushAfter, +) { override fun remaining(): Boolean = false override fun available(): UInt = 0u override fun readNextByte(): Byte? = 0x00 diff --git a/base/src/main/kotlin/de/staropensource/engine/base/implementation/stream/ByteStream.kt b/base/src/main/kotlin/de/staropensource/engine/base/implementation/stream/ByteStream.kt index 7f577ad..e0551d7 100644 --- a/base/src/main/kotlin/de/staropensource/engine/base/implementation/stream/ByteStream.kt +++ b/base/src/main/kotlin/de/staropensource/engine/base/implementation/stream/ByteStream.kt @@ -72,6 +72,8 @@ open class ByteStream(val bytesRead: ByteArray) : Stream(streamMode = StreamMode return this } + override fun flush(): Stream = this + // -----> Reading override fun remaining(): Boolean? { diff --git a/base/src/main/kotlin/de/staropensource/engine/base/implementation/stream/ByteWriteStream.kt b/base/src/main/kotlin/de/staropensource/engine/base/implementation/stream/ByteWriteStream.kt index 104776f..6f68d3f 100644 --- a/base/src/main/kotlin/de/staropensource/engine/base/implementation/stream/ByteWriteStream.kt +++ b/base/src/main/kotlin/de/staropensource/engine/base/implementation/stream/ByteWriteStream.kt @@ -29,7 +29,9 @@ import de.staropensource.engine.base.implementable.stream.WriteStream * * @since v1-alpha10 */ -open class ByteWriteStream : WriteStream() { +open class ByteWriteStream : WriteStream( + autoFlushAfter = 0UL, +) { /** * Contains the bytes written * to this stream. @@ -60,4 +62,6 @@ open class ByteWriteStream : WriteStream() { bytesWritten = bytesWritten.plus(bytes) return this } + + override fun flush(): Stream = this } diff --git a/base/src/main/kotlin/de/staropensource/engine/base/implementation/stream/FileAccessStream.kt b/base/src/main/kotlin/de/staropensource/engine/base/implementation/stream/FileAccessStream.kt index c9db190..82f771b 100644 --- a/base/src/main/kotlin/de/staropensource/engine/base/implementation/stream/FileAccessStream.kt +++ b/base/src/main/kotlin/de/staropensource/engine/base/implementation/stream/FileAccessStream.kt @@ -71,6 +71,8 @@ class FileAccessStream internal constructor(val file: FileAccess) : Stream(strea return this } + override fun flush(): Stream = this + // -----> Reading override fun remaining(): Boolean? = null diff --git a/base/src/main/kotlin/de/staropensource/engine/base/implementation/stream/LoggerStream.kt b/base/src/main/kotlin/de/staropensource/engine/base/implementation/stream/LoggerStream.kt index 505335b..ade4e00 100644 --- a/base/src/main/kotlin/de/staropensource/engine/base/implementation/stream/LoggerStream.kt +++ b/base/src/main/kotlin/de/staropensource/engine/base/implementation/stream/LoggerStream.kt @@ -36,7 +36,9 @@ import de.staropensource.engine.base.utility.FileAccess * @see FileAccess.toStream * @since v1-alpha10 */ -class LoggerStream internal constructor(val logger: Logger, val level: Level) : WriteStream() { +class LoggerStream internal constructor(val logger: Logger, val level: Level) : WriteStream( + autoFlushAfter = 0UL, +) { /** * Contains the current line. * @@ -58,15 +60,21 @@ class LoggerStream internal constructor(val logger: Logger, val level: Level) : return this } + override fun flush(): Stream { + checkAndPrint(skipCheck = true) + return this + } + /** * Verifies whether [line] * contains a newline and * if so, prints it. * + * @param skipCheck if to skip the newline check * @since v1-alpha10 */ - private fun checkAndPrint() { - if (line.contains("\n")) { + private fun checkAndPrint(skipCheck: Boolean = false) { + if (skipCheck || line.contains("\n")) { var lineFinal: String = line.toString() // Remove last newline diff --git a/base/src/main/kotlin/de/staropensource/engine/base/implementation/stream/NullStream.kt b/base/src/main/kotlin/de/staropensource/engine/base/implementation/stream/NullStream.kt index a15c41c..b42f699 100644 --- a/base/src/main/kotlin/de/staropensource/engine/base/implementation/stream/NullStream.kt +++ b/base/src/main/kotlin/de/staropensource/engine/base/implementation/stream/NullStream.kt @@ -48,6 +48,7 @@ class NullStream private constructor() : Stream(streamMode = StreamMode.READ_WRI override fun writeByte(byte: Byte): Stream = this override fun writeBytes(bytes: ByteArray): Stream = this + override fun flush(): Stream = this override fun remaining(): Boolean = false override fun available(): UInt = 0u