Add stream piping, redirecting and flushing

This commit is contained in:
JeremyStar™ 2024-12-27 23:52:56 +01:00
parent 407c601f88
commit 14cf028bfc
Signed by: JeremyStarTM
GPG key ID: E366BAEF67E4704D
8 changed files with 346 additions and 82 deletions

View file

@ -24,7 +24,11 @@ package de.staropensource.engine.base.implementable.stream
*
* @since v1-alpha10
*/
abstract class ReadStream : Stream(streamMode = StreamMode.READ) {
abstract class ReadStream : Stream(
streamMode = StreamMode.READ,
autoFlushAfter = 0UL,
) {
override fun writeByte(byte: Byte): Stream = this
override fun writeBytes(bytes: ByteArray): Stream = this
override fun flush(): Stream = this
}

View file

@ -19,8 +19,11 @@
package de.staropensource.engine.base.implementable.stream
import de.staropensource.engine.base.Engine.Companion.logger
import de.staropensource.engine.base.annotation.NonKotlinContact
import de.staropensource.engine.base.exception.io.IOAccessException
import de.staropensource.engine.base.implementation.stream.JavaReadStream
import de.staropensource.engine.base.implementation.stream.JavaWriteStream
import java.io.IOException
import java.io.InputStream
import java.io.OutputStream
@ -29,9 +32,14 @@ import java.io.OutputStream
* Makes streaming data easy.
*
* @param streamMode supported [StreamMode]s
* @param autoFlushAfter after which amount of bytes to automatically flush. May be ignored by some implementations. Implementations should expose this argument in their constructor
* @since v1-alpha10
*/
abstract class Stream(val streamMode: StreamMode) : AutoCloseable {
@Suppress("Unused")
abstract class Stream(
val streamMode: StreamMode,
val autoFlushAfter: ULong = 100UL,
) : AutoCloseable {
/**
* Companion object of [Stream].
*
@ -45,26 +53,9 @@ abstract class Stream(val streamMode: StreamMode) : AutoCloseable {
* @see <a href="https://man.archlinux.org/man/stdin.3">stdin(3)</a>
* @since v1-alpha10
*/
val standardInput: ReadStream = object : ReadStream() {
/**
* Contains the Java [InputStream]
* provided by [System.in].
*
* @since v1-alpha10
*/
val stream: InputStream = System.`in`
// -----> Closure
val standardInput: ReadStream = object : JavaReadStream(System.`in`) {
override fun close() = Unit
override fun closeStream() = Unit
// -----> Reading
override fun remaining(): Boolean? = null
override fun available(): UInt = stream.available().toUInt()
override fun readNextByte(): Byte? = stream.read().toByte()
override fun readNBytes(n: UInt): ByteArray = stream.readNBytes(n.toInt())
override fun readRemainingBytes(): ByteArray = stream.readAllBytes()
}
/**
@ -74,29 +65,9 @@ abstract class Stream(val streamMode: StreamMode) : AutoCloseable {
* @see <a href="https://man.archlinux.org/man/stdout.3">stdout(3)</a>
* @since v1-alpha10
*/
val standardOutput: WriteStream = object : WriteStream() {
/**
* Contains the Java [OutputStream]
* provided by [System.out].
*
* @since v1-alpha10
*/
val stream: OutputStream = System.out
// -----> Closure
val standardOutput: WriteStream = object : JavaWriteStream(System.out) {
override fun close() = Unit
override fun closeStream() = Unit
// -----> Writing
override fun writeByte(byte: Byte): Stream {
stream.write(byte.toInt())
return this
}
override fun writeBytes(bytes: ByteArray): Stream {
stream.write(bytes)
return this
}
}
/**
@ -105,29 +76,9 @@ abstract class Stream(val streamMode: StreamMode) : AutoCloseable {
* @see <a href="https://man.archlinux.org/man/stderr.3">stderr(3)</a>
* @since v1-alpha10
*/
val standardError: WriteStream = object : WriteStream() {
/**
* Contains the Java [OutputStream]
* provided by [System.err].
*
* @since v1-alpha10
*/
val stream: OutputStream = System.err
// -----> Closure
val standardError: WriteStream = object : JavaWriteStream(System.err) {
override fun close() = Unit
override fun closeStream() = Unit
// -----> Writing
override fun writeByte(byte: Byte): Stream {
stream.write(byte.toInt())
return this
}
override fun writeBytes(bytes: ByteArray): Stream {
stream.write(bytes)
return this
}
}
}
@ -141,6 +92,46 @@ abstract class Stream(val streamMode: StreamMode) : AutoCloseable {
var closed: Boolean = false
private set
/**
* Indicates whether this stream
* is currently being [watch]ed.
*
* @since v1-alpha10
*/
var watching: Boolean = false
private set
/**
* Contains a list of [Stream]s
* to redirect writes to.
*
* Redirecting is only performed
* upon invoking either [writeByte],
* [writeBytes], [writeString] or
* [write].
*
* @see pipes
* @since v1-alpha10
*/
val redirects: MutableList<Stream> = mutableListOf()
/**
* Contains a list of [Stream]s
* to pipe reads to.
*
* Piping is only performed upon
* invoking either [readNextByte],
* [readNBytes] or
* [readRemainingBytes].
*
* To watch for changes, see [watch].
*
* @see redirects
* @see watch
* @since v1-alpha10
*/
val pipes: MutableList<Stream> = mutableListOf()
/**
* Contains the Java [InputStream]
* for this [Stream] instance.
@ -199,6 +190,12 @@ abstract class Stream(val streamMode: StreamMode) : AutoCloseable {
* override fun writeByte(byte: Byte): Stream = writeBytes(byteArrayOf(byte))
* ```
*
* Implementations should make sure
* to call [internalRedirect] before
* returning from this method to
* ensure that the written [Byte]
* is piped to all registered [pipes].
*
* @param byte [Byte] to write
* @return this instance
* @throws IOAccessException on IO error
@ -211,6 +208,12 @@ abstract class Stream(val streamMode: StreamMode) : AutoCloseable {
* Writes (or rather: appends)
* the specified [Byte]s.
*
* Implementations should make sure
* to call [internalRedirect] before
* returning from this method to
* ensure that the written [Byte]s
* is piped to all registered [pipes].
*
* @param bytes [Byte]s to write
* @return this instance
* @throws IOAccessException on IO error
@ -243,6 +246,17 @@ abstract class Stream(val streamMode: StreamMode) : AutoCloseable {
@Throws(IOAccessException::class)
fun write(value: Any): Stream = writeBytes(value.toString().toByteArray())
/**
* Flushes all currently
* buffered bytes out.
*
* @return this instance
* @throws IOAccessException on IO error
* @since v1-alpha10
*/
@Throws(IOAccessException::class)
abstract fun flush(): Stream
// -----> Reading
/**
@ -278,6 +292,8 @@ abstract class Stream(val streamMode: StreamMode) : AutoCloseable {
/**
* Reads the next [Byte].
*
* Will not block the execution flow.
*
* @return next [Byte] or `null` if no bytes are remaining
* @throws IOAccessException on IO error
* @since v1-alpha10
@ -288,6 +304,9 @@ abstract class Stream(val streamMode: StreamMode) : AutoCloseable {
/**
* Reads the specified next [n] [Byte]s.
*
* May block the execution flow
* until [n] bytes have been read.
*
* @param n [Byte]s to read
* @return next [n] bytes or an empty array if no bytes are remaining
* @throws IOAccessException on IO error
@ -299,6 +318,10 @@ abstract class Stream(val streamMode: StreamMode) : AutoCloseable {
/**
* Reads all remaining [Byte]s.
*
* May block the execution flow
* until all remaining bytes
* have been read.
*
* @return remaining [Byte]s or an empty array if no bytes are remaining
* @throws IOAccessException on IO error
* @since v1-alpha10
@ -309,6 +332,8 @@ abstract class Stream(val streamMode: StreamMode) : AutoCloseable {
/**
* Discards the next [Byte].
*
* Will not block the execution flow.
*
* @return this instance
* @throws IOAccessException on IO error
* @since v1-alpha10
@ -322,6 +347,9 @@ abstract class Stream(val streamMode: StreamMode) : AutoCloseable {
/**
* Discards the specified next [n] bytes.
*
* May block the execution flow
* until [n] bytes have been skipped.
*
* @param n [Byte]s to discard
* @return this instance
* @throws IOAccessException on IO error
@ -336,6 +364,10 @@ abstract class Stream(val streamMode: StreamMode) : AutoCloseable {
/**
* Discards all remaining [Byte]s.
*
* May block the execution flow
* until all remaining bytes
* have been skipped.
*
* @return this instance
* @throws IOAccessException on IO error
* @since v1-alpha10
@ -347,19 +379,151 @@ abstract class Stream(val streamMode: StreamMode) : AutoCloseable {
}
// -----> Redirection
// -----> Watching
/**
* Reads the next byte of this
* [Stream] and writes it
* to the specified [Stream].
* Starts a thread solely responsible
* for reading bytes continuously
* until either [stopWatching] is
* called or this [Stream] is closed.
*
* @param stream stream to write to
* This method will not block the
* flow of execution.
*
* Using this method is very useful
* if wanting to only pipe the bytes
* of this [Stream] to other streams.
*
* @param threshold amount of bytes to be read before terminating or `0` to watch indefinitely
* @return this instance
* @throws IOAccessException on IO error
* @since v1-alpha10
*/
@Throws(IOAccessException::class)
fun writeNextByteTo(stream: Stream): Stream {
fun watch(): Stream {
if (watching)
return this
watching = true
Thread
.ofVirtual()
.name("Stream watch thread for Stream ${javaClass.name}#${hashCode()}")
.uncaughtExceptionHandler { thread, ueh -> logger.crash("Unable to watch Stream ${javaClass.name}#${hashCode()}", throwable = ueh.cause, fatal = true) }
.start {
while (!closed && watching)
skipNextByte()
}
return this
}
/**
* Stops the running watching thread.
*
* @return this instance
* @since v1-alpha10
*/
fun stopWatching(): Stream {
watching = false
return this
}
// -----> Piping
/**
* Redirects the specified byte
* to all registered [redirects].
*
* Make sure to call this method
* before returning from
* [writeByte] and [writeBytes].
*
* @param byte [Byte] to redirect
* @return supplied [Byte]
* @throws IOAccessException on IO error
* @since v1-alpha10
*/
@Throws(IOAccessException::class)
protected fun internalRedirect(byte: Byte): Byte {
pipes.forEach { stream -> stream.writeByte(byte) }
return byte
}
/**
* Redirects the specified bytes
* to all registered [redirects].
*
* Make sure to call this method
* before returning from
* [writeByte] and [writeBytes].
*
* @param bytes [ByteArray] to redirect
* @return supplied [ByteArray]
* @throws IOAccessException on IO error
* @since v1-alpha10
*/
@Throws(IOAccessException::class)
protected fun internalRedirect(bytes: ByteArray): ByteArray {
pipes.forEach { stream -> stream.writeBytes(bytes) }
return bytes
}
/**
* Pipes the specified byte.
*
* Make sure to call this method
* before returning from
* [readBytes], [readNBytes]
* and [readRemainingBytes].
*
* @param byte [Byte] to pipe
* @return supplied [Byte]
* @throws IOAccessException on IO error
* @since v1-alpha10
*/
@Throws(IOAccessException::class)
protected fun internalPipe(byte: Byte): Byte {
pipes.forEach { stream -> stream.writeByte(byte) }
return byte
}
/**
* Pipes the specified bytes
* to all registered [pipes].
*
* Make sure to call this method
* before returning from
* [readBytes], [readNBytes]
* and [readRemainingBytes].
*
* @param bytes [ByteArray] to pipe
* @return supplied [ByteArray]
* @throws IOAccessException on IO error
* @since v1-alpha10
*/
@Throws(IOAccessException::class)
protected fun internalPipe(bytes: ByteArray): ByteArray {
pipes.forEach { stream -> stream.writeBytes(bytes) }
return bytes
}
/**
* Reads the next byte of this
* [Stream] and pipes it into
* the specified [Stream].
*
* Will not block the execution flow.
*
* Will not pipe `null` if
* [readNextByte] returns it.
*
* @param stream [Stream] to write to
* @return this instance
* @throws IOAccessException on IO error
* @since v1-alpha10
*/
@Throws(IOAccessException::class)
fun pipeNextByteTo(stream: Stream): Stream {
val byte: Byte? = readNextByte()
if (byte != null)
@ -370,37 +534,110 @@ abstract class Stream(val streamMode: StreamMode) : AutoCloseable {
/**
* Reads the next [n] byte of this
* [Stream] and writes it
* to the specified [Stream].
* [Stream] and pipes it into
* the specified [Stream].
*
* @param stream stream to write to
* May block the execution flow
* until [n] bytes have been piped.
*
* @param stream [Stream] to write to
* @return this instance
* @throws IOAccessException on IO error
* @since v1-alpha10
*/
@Throws(IOAccessException::class)
fun writeNBytesTo(stream: Stream, n: UInt): Stream {
fun pipeNBytesTo(stream: Stream, n: UInt): Stream {
stream.writeBytes(readNBytes(n))
return this
}
/**
* Reads all bytes of this
* [Stream] and writes
* them to the specified
* [Stream].
* [Stream] and pipes them
* into the specified [Stream].
*
* @param stream stream to write to
* May block the execution flow
* until all remaining bytes
* have been piped.
*
* @param stream [Stream] to write to
* @return this instance
* @throws IOAccessException on IO error
* @since v1-alpha10
*/
@Throws(IOAccessException::class)
fun writeRemainingBytesTo(stream: Stream): Stream {
fun pipeRemainingBytesTo(stream: Stream): Stream {
stream.writeBytes(readRemainingBytes())
return this
}
/**
* Reads the next byte of this
* [Stream] and pipes it into
* the specified [Stream]s.
*
* Will not block the execution flow.
*
* Will not pipe `null` if
* [readNextByte] returns it.
*
* @param streams array of [Stream]s to write to
* @return this instance
* @throws IOAccessException on IO error
* @since v1-alpha10
*/
@Throws(IOAccessException::class)
fun pipeNextByteTo(streams: Array<Stream>): Stream {
val byte: Byte? = readNextByte()
if (byte != null)
streams.forEach { stream -> stream.writeByte(byte) }
return this
}
/**
* Reads the next [n] byte of this
* [Stream] and pipes it into
* the specified [Stream]s.
*
* May block the execution flow
* until [n] bytes have been piped.
*
* @param streams array of [Stream]s to write to
* @return this instance
* @throws IOAccessException on IO error
* @since v1-alpha10
*/
@Throws(IOAccessException::class)
fun pipeNBytesTo(streams: Array<Stream>, n: UInt): Stream {
val bytes: ByteArray = readNBytes(n)
streams.forEach { stream -> stream.writeBytes(bytes) }
return this
}
/**
* Reads all bytes of this
* [Stream] and pipes them
* into the specified [Stream]s.
*
* May block the execution flow
* until all remaining bytes
* have been piped.
*
* @param streams array of [Stream]s to write to
* @return this instance
* @throws IOAccessException on IO error
* @since v1-alpha10
*/
@Throws(IOAccessException::class)
fun pipeRemainingBytesTo(streams: Array<Stream>): Stream {
val bytes: ByteArray = readRemainingBytes()
streams.forEach { stream -> stream.writeBytes(bytes) }
return this
}
// -----> Java interoperability
/**

View file

@ -22,9 +22,15 @@ package de.staropensource.engine.base.implementable.stream
/**
* A write-only [Stream].
*
* @param autoFlushAfter after which amount of bytes to automatically flush. May be ignored by some implementations. Implementations should expose this argument in their constructor
* @since v1-alpha10
*/
abstract class WriteStream : Stream(streamMode = StreamMode.WRITE) {
abstract class WriteStream(
autoFlushAfter: ULong = 100UL,
) : Stream(
streamMode = StreamMode.WRITE,
autoFlushAfter = autoFlushAfter,
) {
override fun remaining(): Boolean = false
override fun available(): UInt = 0u
override fun readNextByte(): Byte? = 0x00

View file

@ -72,6 +72,8 @@ open class ByteStream(val bytesRead: ByteArray) : Stream(streamMode = StreamMode
return this
}
override fun flush(): Stream = this
// -----> Reading
override fun remaining(): Boolean? {

View file

@ -29,7 +29,9 @@ import de.staropensource.engine.base.implementable.stream.WriteStream
*
* @since v1-alpha10
*/
open class ByteWriteStream : WriteStream() {
open class ByteWriteStream : WriteStream(
autoFlushAfter = 0UL,
) {
/**
* Contains the bytes written
* to this stream.
@ -60,4 +62,6 @@ open class ByteWriteStream : WriteStream() {
bytesWritten = bytesWritten.plus(bytes)
return this
}
override fun flush(): Stream = this
}

View file

@ -71,6 +71,8 @@ class FileAccessStream internal constructor(val file: FileAccess) : Stream(strea
return this
}
override fun flush(): Stream = this
// -----> Reading
override fun remaining(): Boolean? = null

View file

@ -36,7 +36,9 @@ import de.staropensource.engine.base.utility.FileAccess
* @see FileAccess.toStream
* @since v1-alpha10
*/
class LoggerStream internal constructor(val logger: Logger, val level: Level) : WriteStream() {
class LoggerStream internal constructor(val logger: Logger, val level: Level) : WriteStream(
autoFlushAfter = 0UL,
) {
/**
* Contains the current line.
*
@ -58,15 +60,21 @@ class LoggerStream internal constructor(val logger: Logger, val level: Level) :
return this
}
override fun flush(): Stream {
checkAndPrint(skipCheck = true)
return this
}
/**
* Verifies whether [line]
* contains a newline and
* if so, prints it.
*
* @param skipCheck if to skip the newline check
* @since v1-alpha10
*/
private fun checkAndPrint() {
if (line.contains("\n")) {
private fun checkAndPrint(skipCheck: Boolean = false) {
if (skipCheck || line.contains("\n")) {
var lineFinal: String = line.toString()
// Remove last newline

View file

@ -48,6 +48,7 @@ class NullStream private constructor() : Stream(streamMode = StreamMode.READ_WRI
override fun writeByte(byte: Byte): Stream = this
override fun writeBytes(bytes: ByteArray): Stream = this
override fun flush(): Stream = this
override fun remaining(): Boolean = false
override fun available(): UInt = 0u