diff --git a/base/src/main/kotlin/de/staropensource/engine/base/implementable/stream/ReadStream.kt b/base/src/main/kotlin/de/staropensource/engine/base/implementable/stream/ReadStream.kt
index 28b9f35..c70be96 100644
--- a/base/src/main/kotlin/de/staropensource/engine/base/implementable/stream/ReadStream.kt
+++ b/base/src/main/kotlin/de/staropensource/engine/base/implementable/stream/ReadStream.kt
@@ -24,7 +24,11 @@ package de.staropensource.engine.base.implementable.stream
*
* @since v1-alpha10
*/
-abstract class ReadStream : Stream(streamMode = StreamMode.READ) {
+abstract class ReadStream : Stream(
+ streamMode = StreamMode.READ,
+ autoFlushAfter = 0UL,
+) {
override fun writeByte(byte: Byte): Stream = this
override fun writeBytes(bytes: ByteArray): Stream = this
+ override fun flush(): Stream = this
}
diff --git a/base/src/main/kotlin/de/staropensource/engine/base/implementable/stream/Stream.kt b/base/src/main/kotlin/de/staropensource/engine/base/implementable/stream/Stream.kt
index 26bbcab..923aae4 100644
--- a/base/src/main/kotlin/de/staropensource/engine/base/implementable/stream/Stream.kt
+++ b/base/src/main/kotlin/de/staropensource/engine/base/implementable/stream/Stream.kt
@@ -19,8 +19,11 @@
package de.staropensource.engine.base.implementable.stream
+import de.staropensource.engine.base.Engine.Companion.logger
import de.staropensource.engine.base.annotation.NonKotlinContact
import de.staropensource.engine.base.exception.io.IOAccessException
+import de.staropensource.engine.base.implementation.stream.JavaReadStream
+import de.staropensource.engine.base.implementation.stream.JavaWriteStream
import java.io.IOException
import java.io.InputStream
import java.io.OutputStream
@@ -29,9 +32,14 @@ import java.io.OutputStream
* Makes streaming data easy.
*
* @param streamMode supported [StreamMode]s
+ * @param autoFlushAfter after which amount of bytes to automatically flush. May be ignored by some implementations. Implementations should expose this argument in their constructor
* @since v1-alpha10
*/
-abstract class Stream(val streamMode: StreamMode) : AutoCloseable {
+@Suppress("Unused")
+abstract class Stream(
+ val streamMode: StreamMode,
+ val autoFlushAfter: ULong = 100UL,
+) : AutoCloseable {
/**
* Companion object of [Stream].
*
@@ -45,26 +53,9 @@ abstract class Stream(val streamMode: StreamMode) : AutoCloseable {
* @see stdin(3)
* @since v1-alpha10
*/
- val standardInput: ReadStream = object : ReadStream() {
- /**
- * Contains the Java [InputStream]
- * provided by [System.in].
- *
- * @since v1-alpha10
- */
- val stream: InputStream = System.`in`
-
- // -----> Closure
+ val standardInput: ReadStream = object : JavaReadStream(System.`in`) {
override fun close() = Unit
override fun closeStream() = Unit
-
-
- // -----> Reading
- override fun remaining(): Boolean? = null
- override fun available(): UInt = stream.available().toUInt()
- override fun readNextByte(): Byte? = stream.read().toByte()
- override fun readNBytes(n: UInt): ByteArray = stream.readNBytes(n.toInt())
- override fun readRemainingBytes(): ByteArray = stream.readAllBytes()
}
/**
@@ -74,29 +65,9 @@ abstract class Stream(val streamMode: StreamMode) : AutoCloseable {
* @see stdout(3)
* @since v1-alpha10
*/
- val standardOutput: WriteStream = object : WriteStream() {
- /**
- * Contains the Java [OutputStream]
- * provided by [System.out].
- *
- * @since v1-alpha10
- */
- val stream: OutputStream = System.out
-
- // -----> Closure
+ val standardOutput: WriteStream = object : JavaWriteStream(System.out) {
override fun close() = Unit
override fun closeStream() = Unit
-
-
- // -----> Writing
- override fun writeByte(byte: Byte): Stream {
- stream.write(byte.toInt())
- return this
- }
- override fun writeBytes(bytes: ByteArray): Stream {
- stream.write(bytes)
- return this
- }
}
/**
@@ -105,29 +76,9 @@ abstract class Stream(val streamMode: StreamMode) : AutoCloseable {
* @see stderr(3)
* @since v1-alpha10
*/
- val standardError: WriteStream = object : WriteStream() {
- /**
- * Contains the Java [OutputStream]
- * provided by [System.err].
- *
- * @since v1-alpha10
- */
- val stream: OutputStream = System.err
-
- // -----> Closure
+ val standardError: WriteStream = object : JavaWriteStream(System.err) {
override fun close() = Unit
override fun closeStream() = Unit
-
-
- // -----> Writing
- override fun writeByte(byte: Byte): Stream {
- stream.write(byte.toInt())
- return this
- }
- override fun writeBytes(bytes: ByteArray): Stream {
- stream.write(bytes)
- return this
- }
}
}
@@ -141,6 +92,46 @@ abstract class Stream(val streamMode: StreamMode) : AutoCloseable {
var closed: Boolean = false
private set
+ /**
+ * Indicates whether this stream
+ * is currently being [watch]ed.
+ *
+ * @since v1-alpha10
+ */
+ var watching: Boolean = false
+ private set
+
+ /**
+ * Contains a list of [Stream]s
+ * to redirect writes to.
+ *
+ * Redirecting is only performed
+ * upon invoking either [writeByte],
+ * [writeBytes], [writeString] or
+ * [write].
+ *
+ * @see pipes
+ * @since v1-alpha10
+ */
+ val redirects: MutableList = mutableListOf()
+
+ /**
+ * Contains a list of [Stream]s
+ * to pipe reads to.
+ *
+ * Piping is only performed upon
+ * invoking either [readNextByte],
+ * [readNBytes] or
+ * [readRemainingBytes].
+ *
+ * To watch for changes, see [watch].
+ *
+ * @see redirects
+ * @see watch
+ * @since v1-alpha10
+ */
+ val pipes: MutableList = mutableListOf()
+
/**
* Contains the Java [InputStream]
* for this [Stream] instance.
@@ -199,6 +190,12 @@ abstract class Stream(val streamMode: StreamMode) : AutoCloseable {
* override fun writeByte(byte: Byte): Stream = writeBytes(byteArrayOf(byte))
* ```
*
+ * Implementations should make sure
+ * to call [internalRedirect] before
+ * returning from this method to
+ * ensure that the written [Byte]
+ * is piped to all registered [pipes].
+ *
* @param byte [Byte] to write
* @return this instance
* @throws IOAccessException on IO error
@@ -211,6 +208,12 @@ abstract class Stream(val streamMode: StreamMode) : AutoCloseable {
* Writes (or rather: appends)
* the specified [Byte]s.
*
+ * Implementations should make sure
+ * to call [internalRedirect] before
+ * returning from this method to
+ * ensure that the written [Byte]s
+ * is piped to all registered [pipes].
+ *
* @param bytes [Byte]s to write
* @return this instance
* @throws IOAccessException on IO error
@@ -243,6 +246,17 @@ abstract class Stream(val streamMode: StreamMode) : AutoCloseable {
@Throws(IOAccessException::class)
fun write(value: Any): Stream = writeBytes(value.toString().toByteArray())
+ /**
+ * Flushes all currently
+ * buffered bytes out.
+ *
+ * @return this instance
+ * @throws IOAccessException on IO error
+ * @since v1-alpha10
+ */
+ @Throws(IOAccessException::class)
+ abstract fun flush(): Stream
+
// -----> Reading
/**
@@ -278,6 +292,8 @@ abstract class Stream(val streamMode: StreamMode) : AutoCloseable {
/**
* Reads the next [Byte].
*
+ * Will not block the execution flow.
+ *
* @return next [Byte] or `null` if no bytes are remaining
* @throws IOAccessException on IO error
* @since v1-alpha10
@@ -288,6 +304,9 @@ abstract class Stream(val streamMode: StreamMode) : AutoCloseable {
/**
* Reads the specified next [n] [Byte]s.
*
+ * May block the execution flow
+ * until [n] bytes have been read.
+ *
* @param n [Byte]s to read
* @return next [n] bytes or an empty array if no bytes are remaining
* @throws IOAccessException on IO error
@@ -299,6 +318,10 @@ abstract class Stream(val streamMode: StreamMode) : AutoCloseable {
/**
* Reads all remaining [Byte]s.
*
+ * May block the execution flow
+ * until all remaining bytes
+ * have been read.
+ *
* @return remaining [Byte]s or an empty array if no bytes are remaining
* @throws IOAccessException on IO error
* @since v1-alpha10
@@ -309,6 +332,8 @@ abstract class Stream(val streamMode: StreamMode) : AutoCloseable {
/**
* Discards the next [Byte].
*
+ * Will not block the execution flow.
+ *
* @return this instance
* @throws IOAccessException on IO error
* @since v1-alpha10
@@ -322,6 +347,9 @@ abstract class Stream(val streamMode: StreamMode) : AutoCloseable {
/**
* Discards the specified next [n] bytes.
*
+ * May block the execution flow
+ * until [n] bytes have been skipped.
+ *
* @param n [Byte]s to discard
* @return this instance
* @throws IOAccessException on IO error
@@ -336,6 +364,10 @@ abstract class Stream(val streamMode: StreamMode) : AutoCloseable {
/**
* Discards all remaining [Byte]s.
*
+ * May block the execution flow
+ * until all remaining bytes
+ * have been skipped.
+ *
* @return this instance
* @throws IOAccessException on IO error
* @since v1-alpha10
@@ -347,19 +379,151 @@ abstract class Stream(val streamMode: StreamMode) : AutoCloseable {
}
- // -----> Redirection
+ // -----> Watching
/**
- * Reads the next byte of this
- * [Stream] and writes it
- * to the specified [Stream].
+ * Starts a thread solely responsible
+ * for reading bytes continuously
+ * until either [stopWatching] is
+ * called or this [Stream] is closed.
*
- * @param stream stream to write to
+ * This method will not block the
+ * flow of execution.
+ *
+ * Using this method is very useful
+ * if wanting to only pipe the bytes
+ * of this [Stream] to other streams.
+ *
+ * @param threshold amount of bytes to be read before terminating or `0` to watch indefinitely
* @return this instance
* @throws IOAccessException on IO error
* @since v1-alpha10
*/
@Throws(IOAccessException::class)
- fun writeNextByteTo(stream: Stream): Stream {
+ fun watch(): Stream {
+ if (watching)
+ return this
+
+ watching = true
+
+ Thread
+ .ofVirtual()
+ .name("Stream watch thread for Stream ${javaClass.name}#${hashCode()}")
+ .uncaughtExceptionHandler { thread, ueh -> logger.crash("Unable to watch Stream ${javaClass.name}#${hashCode()}", throwable = ueh.cause, fatal = true) }
+ .start {
+ while (!closed && watching)
+ skipNextByte()
+ }
+
+ return this
+ }
+
+ /**
+ * Stops the running watching thread.
+ *
+ * @return this instance
+ * @since v1-alpha10
+ */
+ fun stopWatching(): Stream {
+ watching = false
+ return this
+ }
+
+
+ // -----> Piping
+ /**
+ * Redirects the specified byte
+ * to all registered [redirects].
+ *
+ * Make sure to call this method
+ * before returning from
+ * [writeByte] and [writeBytes].
+ *
+ * @param byte [Byte] to redirect
+ * @return supplied [Byte]
+ * @throws IOAccessException on IO error
+ * @since v1-alpha10
+ */
+ @Throws(IOAccessException::class)
+ protected fun internalRedirect(byte: Byte): Byte {
+ pipes.forEach { stream -> stream.writeByte(byte) }
+ return byte
+ }
+
+ /**
+ * Redirects the specified bytes
+ * to all registered [redirects].
+ *
+ * Make sure to call this method
+ * before returning from
+ * [writeByte] and [writeBytes].
+ *
+ * @param bytes [ByteArray] to redirect
+ * @return supplied [ByteArray]
+ * @throws IOAccessException on IO error
+ * @since v1-alpha10
+ */
+ @Throws(IOAccessException::class)
+ protected fun internalRedirect(bytes: ByteArray): ByteArray {
+ pipes.forEach { stream -> stream.writeBytes(bytes) }
+ return bytes
+ }
+
+ /**
+ * Pipes the specified byte.
+ *
+ * Make sure to call this method
+ * before returning from
+ * [readBytes], [readNBytes]
+ * and [readRemainingBytes].
+ *
+ * @param byte [Byte] to pipe
+ * @return supplied [Byte]
+ * @throws IOAccessException on IO error
+ * @since v1-alpha10
+ */
+ @Throws(IOAccessException::class)
+ protected fun internalPipe(byte: Byte): Byte {
+ pipes.forEach { stream -> stream.writeByte(byte) }
+ return byte
+ }
+
+ /**
+ * Pipes the specified bytes
+ * to all registered [pipes].
+ *
+ * Make sure to call this method
+ * before returning from
+ * [readBytes], [readNBytes]
+ * and [readRemainingBytes].
+ *
+ * @param bytes [ByteArray] to pipe
+ * @return supplied [ByteArray]
+ * @throws IOAccessException on IO error
+ * @since v1-alpha10
+ */
+ @Throws(IOAccessException::class)
+ protected fun internalPipe(bytes: ByteArray): ByteArray {
+ pipes.forEach { stream -> stream.writeBytes(bytes) }
+ return bytes
+ }
+
+ /**
+ * Reads the next byte of this
+ * [Stream] and pipes it into
+ * the specified [Stream].
+ *
+ * Will not block the execution flow.
+ *
+ * Will not pipe `null` if
+ * [readNextByte] returns it.
+ *
+ * @param stream [Stream] to write to
+ * @return this instance
+ * @throws IOAccessException on IO error
+ * @since v1-alpha10
+ */
+ @Throws(IOAccessException::class)
+ fun pipeNextByteTo(stream: Stream): Stream {
val byte: Byte? = readNextByte()
if (byte != null)
@@ -370,37 +534,110 @@ abstract class Stream(val streamMode: StreamMode) : AutoCloseable {
/**
* Reads the next [n] byte of this
- * [Stream] and writes it
- * to the specified [Stream].
+ * [Stream] and pipes it into
+ * the specified [Stream].
*
- * @param stream stream to write to
+ * May block the execution flow
+ * until [n] bytes have been piped.
+ *
+ * @param stream [Stream] to write to
* @return this instance
* @throws IOAccessException on IO error
* @since v1-alpha10
*/
@Throws(IOAccessException::class)
- fun writeNBytesTo(stream: Stream, n: UInt): Stream {
+ fun pipeNBytesTo(stream: Stream, n: UInt): Stream {
stream.writeBytes(readNBytes(n))
return this
}
/**
* Reads all bytes of this
- * [Stream] and writes
- * them to the specified
- * [Stream].
+ * [Stream] and pipes them
+ * into the specified [Stream].
*
- * @param stream stream to write to
+ * May block the execution flow
+ * until all remaining bytes
+ * have been piped.
+ *
+ * @param stream [Stream] to write to
* @return this instance
* @throws IOAccessException on IO error
* @since v1-alpha10
*/
@Throws(IOAccessException::class)
- fun writeRemainingBytesTo(stream: Stream): Stream {
+ fun pipeRemainingBytesTo(stream: Stream): Stream {
stream.writeBytes(readRemainingBytes())
return this
}
+ /**
+ * Reads the next byte of this
+ * [Stream] and pipes it into
+ * the specified [Stream]s.
+ *
+ * Will not block the execution flow.
+ *
+ * Will not pipe `null` if
+ * [readNextByte] returns it.
+ *
+ * @param streams array of [Stream]s to write to
+ * @return this instance
+ * @throws IOAccessException on IO error
+ * @since v1-alpha10
+ */
+ @Throws(IOAccessException::class)
+ fun pipeNextByteTo(streams: Array): Stream {
+ val byte: Byte? = readNextByte()
+
+ if (byte != null)
+ streams.forEach { stream -> stream.writeByte(byte) }
+
+ return this
+ }
+
+ /**
+ * Reads the next [n] byte of this
+ * [Stream] and pipes it into
+ * the specified [Stream]s.
+ *
+ * May block the execution flow
+ * until [n] bytes have been piped.
+ *
+ * @param streams array of [Stream]s to write to
+ * @return this instance
+ * @throws IOAccessException on IO error
+ * @since v1-alpha10
+ */
+ @Throws(IOAccessException::class)
+ fun pipeNBytesTo(streams: Array, n: UInt): Stream {
+ val bytes: ByteArray = readNBytes(n)
+ streams.forEach { stream -> stream.writeBytes(bytes) }
+ return this
+ }
+
+ /**
+ * Reads all bytes of this
+ * [Stream] and pipes them
+ * into the specified [Stream]s.
+ *
+ * May block the execution flow
+ * until all remaining bytes
+ * have been piped.
+ *
+ * @param streams array of [Stream]s to write to
+ * @return this instance
+ * @throws IOAccessException on IO error
+ * @since v1-alpha10
+ */
+ @Throws(IOAccessException::class)
+ fun pipeRemainingBytesTo(streams: Array): Stream {
+ val bytes: ByteArray = readRemainingBytes()
+
+ streams.forEach { stream -> stream.writeBytes(bytes) }
+ return this
+ }
+
// -----> Java interoperability
/**
diff --git a/base/src/main/kotlin/de/staropensource/engine/base/implementable/stream/WriteStream.kt b/base/src/main/kotlin/de/staropensource/engine/base/implementable/stream/WriteStream.kt
index 4547724..1a0106b 100644
--- a/base/src/main/kotlin/de/staropensource/engine/base/implementable/stream/WriteStream.kt
+++ b/base/src/main/kotlin/de/staropensource/engine/base/implementable/stream/WriteStream.kt
@@ -22,9 +22,15 @@ package de.staropensource.engine.base.implementable.stream
/**
* A write-only [Stream].
*
+ * @param autoFlushAfter after which amount of bytes to automatically flush. May be ignored by some implementations. Implementations should expose this argument in their constructor
* @since v1-alpha10
*/
-abstract class WriteStream : Stream(streamMode = StreamMode.WRITE) {
+abstract class WriteStream(
+ autoFlushAfter: ULong = 100UL,
+) : Stream(
+ streamMode = StreamMode.WRITE,
+ autoFlushAfter = autoFlushAfter,
+) {
override fun remaining(): Boolean = false
override fun available(): UInt = 0u
override fun readNextByte(): Byte? = 0x00
diff --git a/base/src/main/kotlin/de/staropensource/engine/base/implementation/stream/ByteStream.kt b/base/src/main/kotlin/de/staropensource/engine/base/implementation/stream/ByteStream.kt
index 7f577ad..e0551d7 100644
--- a/base/src/main/kotlin/de/staropensource/engine/base/implementation/stream/ByteStream.kt
+++ b/base/src/main/kotlin/de/staropensource/engine/base/implementation/stream/ByteStream.kt
@@ -72,6 +72,8 @@ open class ByteStream(val bytesRead: ByteArray) : Stream(streamMode = StreamMode
return this
}
+ override fun flush(): Stream = this
+
// -----> Reading
override fun remaining(): Boolean? {
diff --git a/base/src/main/kotlin/de/staropensource/engine/base/implementation/stream/ByteWriteStream.kt b/base/src/main/kotlin/de/staropensource/engine/base/implementation/stream/ByteWriteStream.kt
index 104776f..6f68d3f 100644
--- a/base/src/main/kotlin/de/staropensource/engine/base/implementation/stream/ByteWriteStream.kt
+++ b/base/src/main/kotlin/de/staropensource/engine/base/implementation/stream/ByteWriteStream.kt
@@ -29,7 +29,9 @@ import de.staropensource.engine.base.implementable.stream.WriteStream
*
* @since v1-alpha10
*/
-open class ByteWriteStream : WriteStream() {
+open class ByteWriteStream : WriteStream(
+ autoFlushAfter = 0UL,
+) {
/**
* Contains the bytes written
* to this stream.
@@ -60,4 +62,6 @@ open class ByteWriteStream : WriteStream() {
bytesWritten = bytesWritten.plus(bytes)
return this
}
+
+ override fun flush(): Stream = this
}
diff --git a/base/src/main/kotlin/de/staropensource/engine/base/implementation/stream/FileAccessStream.kt b/base/src/main/kotlin/de/staropensource/engine/base/implementation/stream/FileAccessStream.kt
index c9db190..82f771b 100644
--- a/base/src/main/kotlin/de/staropensource/engine/base/implementation/stream/FileAccessStream.kt
+++ b/base/src/main/kotlin/de/staropensource/engine/base/implementation/stream/FileAccessStream.kt
@@ -71,6 +71,8 @@ class FileAccessStream internal constructor(val file: FileAccess) : Stream(strea
return this
}
+ override fun flush(): Stream = this
+
// -----> Reading
override fun remaining(): Boolean? = null
diff --git a/base/src/main/kotlin/de/staropensource/engine/base/implementation/stream/LoggerStream.kt b/base/src/main/kotlin/de/staropensource/engine/base/implementation/stream/LoggerStream.kt
index 505335b..ade4e00 100644
--- a/base/src/main/kotlin/de/staropensource/engine/base/implementation/stream/LoggerStream.kt
+++ b/base/src/main/kotlin/de/staropensource/engine/base/implementation/stream/LoggerStream.kt
@@ -36,7 +36,9 @@ import de.staropensource.engine.base.utility.FileAccess
* @see FileAccess.toStream
* @since v1-alpha10
*/
-class LoggerStream internal constructor(val logger: Logger, val level: Level) : WriteStream() {
+class LoggerStream internal constructor(val logger: Logger, val level: Level) : WriteStream(
+ autoFlushAfter = 0UL,
+) {
/**
* Contains the current line.
*
@@ -58,15 +60,21 @@ class LoggerStream internal constructor(val logger: Logger, val level: Level) :
return this
}
+ override fun flush(): Stream {
+ checkAndPrint(skipCheck = true)
+ return this
+ }
+
/**
* Verifies whether [line]
* contains a newline and
* if so, prints it.
*
+ * @param skipCheck if to skip the newline check
* @since v1-alpha10
*/
- private fun checkAndPrint() {
- if (line.contains("\n")) {
+ private fun checkAndPrint(skipCheck: Boolean = false) {
+ if (skipCheck || line.contains("\n")) {
var lineFinal: String = line.toString()
// Remove last newline
diff --git a/base/src/main/kotlin/de/staropensource/engine/base/implementation/stream/NullStream.kt b/base/src/main/kotlin/de/staropensource/engine/base/implementation/stream/NullStream.kt
index a15c41c..b42f699 100644
--- a/base/src/main/kotlin/de/staropensource/engine/base/implementation/stream/NullStream.kt
+++ b/base/src/main/kotlin/de/staropensource/engine/base/implementation/stream/NullStream.kt
@@ -48,6 +48,7 @@ class NullStream private constructor() : Stream(streamMode = StreamMode.READ_WRI
override fun writeByte(byte: Byte): Stream = this
override fun writeBytes(bytes: ByteArray): Stream = this
+ override fun flush(): Stream = this
override fun remaining(): Boolean = false
override fun available(): UInt = 0u