diff --git a/core/apple/src/AppleCore.kt b/core/apple/src/AppleCore.kt index 2eaf59cc7..7083d6ec7 100644 --- a/core/apple/src/AppleCore.kt +++ b/core/apple/src/AppleCore.kt @@ -8,6 +8,7 @@ package kotlinx.io import kotlinx.cinterop.* +import kotlinx.io.unsafe.UnsafeBufferOperations import platform.Foundation.NSInputStream import platform.Foundation.NSOutputStream import platform.Foundation.NSStreamStatusClosed @@ -31,29 +32,28 @@ private open class OutputStreamSink( if (out.streamStatus == NSStreamStatusNotOpen) out.open() } + @OptIn(UnsafeIoApi::class) override fun write(source: Buffer, byteCount: Long) { if (out.streamStatus == NSStreamStatusClosed) throw IOException("Stream Closed") checkOffsetAndCount(source.size, 0, byteCount) var remaining = byteCount + var bytesWritten = 0L while (remaining > 0) { - val head = source.head!! - val toCopy = minOf(remaining, head.limit - head.pos).toInt() - val bytesWritten = head.data.usePinned { - val bytes = it.addressOf(head.pos).reinterpret() - out.write(bytes, toCopy.convert()).toLong() + UnsafeBufferOperations.readFromHead(source) { data, pos, limit -> + val toCopy = minOf(remaining, limit - pos).toInt() + bytesWritten = data.usePinned { + val bytes = it.addressOf(pos).reinterpret() + out.write(bytes, toCopy.convert()).toLong() + } + 0 } if (bytesWritten < 0L) throw IOException(out.streamError?.localizedDescription ?: "Unknown error") if (bytesWritten == 0L) throw IOException("NSOutputStream reached capacity") - head.pos += bytesWritten.toInt() + source.skip(bytesWritten) remaining -= bytesWritten - source.sizeMut -= bytesWritten - - if (head.pos == head.limit) { - source.recycleHead() - } } } @@ -83,29 +83,26 @@ private open class NSInputStreamSource( if (input.streamStatus == NSStreamStatusNotOpen) input.open() } + @OptIn(UnsafeIoApi::class) override fun readAtMostTo(sink: Buffer, byteCount: Long): Long { if (input.streamStatus == NSStreamStatusClosed) throw IOException("Stream Closed") if (byteCount == 0L) return 0L checkByteCount(byteCount) - val tail = sink.writableSegment(1) - val maxToCopy = minOf(byteCount, Segment.SIZE - tail.limit) - val bytesRead = tail.data.usePinned { - val bytes = it.addressOf(tail.limit).reinterpret() - input.read(bytes, maxToCopy.convert()).toLong() + var bytesRead = 0L + UnsafeBufferOperations.writeToTail(sink, 1) { data, pos, limit -> + val maxToCopy = minOf(byteCount, limit - pos) + val read = data.usePinned { ba -> + val bytes = ba.addressOf(pos).reinterpret() + input.read(bytes, maxToCopy.convert()).toLong() + } + bytesRead = read + maxOf(read.toInt(), 0) } if (bytesRead < 0L) throw IOException(input.streamError?.localizedDescription ?: "Unknown error") - if (bytesRead == 0L) { - if (tail.pos == tail.limit) { - // We allocated a tail segment, but didn't end up needing it. Recycle! - sink.recycleTail() - } - return -1 - } - tail.limit += bytesRead.toInt() - sink.sizeMut += bytesRead + if (bytesRead == 0L) return -1 return bytesRead } diff --git a/core/apple/src/BuffersApple.kt b/core/apple/src/BuffersApple.kt index 292b9e262..98e2c0eab 100644 --- a/core/apple/src/BuffersApple.kt +++ b/core/apple/src/BuffersApple.kt @@ -8,51 +8,48 @@ package kotlinx.io import kotlinx.cinterop.* +import kotlinx.io.unsafe.UnsafeBufferOperations +import kotlinx.io.unsafe.withData import platform.Foundation.NSData import platform.Foundation.create import platform.Foundation.data import platform.darwin.NSUIntegerMax import platform.posix.* -@OptIn(ExperimentalForeignApi::class) +@OptIn(ExperimentalForeignApi::class, UnsafeIoApi::class) internal fun Buffer.write(source: CPointer, maxLength: Int) { require(maxLength >= 0) { "maxLength ($maxLength) must not be negative" } var currentOffset = 0 while (currentOffset < maxLength) { - val tail = writableSegment(1) - - val toCopy = minOf(maxLength - currentOffset, Segment.SIZE - tail.limit) - tail.data.usePinned { - memcpy(it.addressOf(tail.limit), source + currentOffset, toCopy.convert()) + UnsafeBufferOperations.writeToTail(this, 1) { data, pos, limit -> + val toCopy = minOf(maxLength - currentOffset, limit - pos) + data.usePinned { + memcpy(it.addressOf(pos), source + currentOffset, toCopy.convert()) + } + currentOffset += toCopy + toCopy } - - currentOffset += toCopy - tail.limit += toCopy } - this.sizeMut += maxLength } +@OptIn(UnsafeIoApi::class) internal fun Buffer.readAtMostTo(sink: CPointer, maxLength: Int): Int { require(maxLength >= 0) { "maxLength ($maxLength) must not be negative" } - val s = head ?: return 0 - val toCopy = minOf(maxLength, s.limit - s.pos) - s.data.usePinned { - memcpy(sink, it.addressOf(s.pos), toCopy.convert()) - } - - s.pos += toCopy - this.sizeMut -= toCopy.toLong() - - if (s.pos == s.limit) { - recycleHead() + var toCopy = 0 + UnsafeBufferOperations.readFromHead(this) { data, pos, limit -> + toCopy = minOf(maxLength, limit - pos) + data.usePinned { + memcpy(sink, it.addressOf(pos), toCopy.convert()) + } + toCopy } return toCopy } -@OptIn(BetaInteropApi::class) +@OptIn(BetaInteropApi::class, UnsafeIoApi::class) internal fun Buffer.snapshotAsNSData(): NSData { if (size == 0L) return NSData.data() @@ -60,17 +57,21 @@ internal fun Buffer.snapshotAsNSData(): NSData { val bytes = malloc(size.convert())?.reinterpret() ?: throw Error("malloc failed: ${strerror(errno)?.toKString()}") - var curr = head - var index = 0 - do { - check(curr != null) { "Current segment is null" } - val pos = curr.pos - val length = curr.limit - pos - curr.data.usePinned { - memcpy(bytes + index, it.addressOf(pos), length.convert()) + + UnsafeBufferOperations.iterate(this) { ctx, head -> + var curr: Segment? = head + var index = 0 + while (curr != null) { + val segment: Segment = curr + ctx.withData(segment) { data, pos, limit -> + val length = limit - pos + data.usePinned { + memcpy(bytes + index, it.addressOf(pos), length.convert()) + } + index += length + } + curr = ctx.next(segment) } - curr = curr.next - index += length - } while (curr != null) + } return NSData.create(bytesNoCopy = bytes, length = size.convert()) } diff --git a/core/common/src/Buffer.kt b/core/common/src/Buffer.kt index aea7e4263..5d59c7d7a 100644 --- a/core/common/src/Buffer.kt +++ b/core/common/src/Buffer.kt @@ -291,6 +291,9 @@ public class Buffer : Source, Sink { if (position < 0 || position >= size) { throw IndexOutOfBoundsException("position ($position) is not within the range [0..size($size))") } + if (position == 0L) { + return head!!.getUnchecked(0) + } seek(position) { s, offset -> return s!!.data[(s.pos + position - offset).toInt()] } diff --git a/core/common/src/Buffers.kt b/core/common/src/Buffers.kt index f5b986b60..c25b0a2fc 100644 --- a/core/common/src/Buffers.kt +++ b/core/common/src/Buffers.kt @@ -7,24 +7,28 @@ package kotlinx.io import kotlinx.io.bytestring.ByteString import kotlinx.io.bytestring.buildByteString +import kotlinx.io.unsafe.UnsafeBufferOperations +import kotlinx.io.unsafe.withData /** * Creates a byte string containing a copy of all the data from this buffer. * * This call doesn't consume data from the buffer, but instead copies it. */ +@OptIn(UnsafeIoApi::class) public fun Buffer.snapshot(): ByteString { if (size == 0L) return ByteString() check(size <= Int.MAX_VALUE) { "Buffer is too long ($size) to be converted into a byte string." } return buildByteString(size.toInt()) { - var curr = head - do { - check(curr != null) { "Current segment is null" } - append(curr.data, curr.pos, curr.limit) - curr = curr.next - } while (curr != null) + UnsafeBufferOperations.iterate(this@snapshot) { ctx, head -> + var curr = head + while (curr != null) { + ctx.withData(curr, this::append) + curr = ctx.next(curr) + } + } } } diff --git a/core/common/src/ByteStrings.kt b/core/common/src/ByteStrings.kt index 4c8587ebe..7963e0ca2 100644 --- a/core/common/src/ByteStrings.kt +++ b/core/common/src/ByteStrings.kt @@ -9,6 +9,7 @@ import kotlinx.io.bytestring.ByteString import kotlinx.io.bytestring.isEmpty import kotlinx.io.bytestring.unsafe.UnsafeByteStringApi import kotlinx.io.bytestring.unsafe.UnsafeByteStringOperations +import kotlinx.io.unsafe.UnsafeBufferOperations import kotlin.math.min /** @@ -24,7 +25,7 @@ import kotlin.math.min * * @sample kotlinx.io.samples.ByteStringSamples.writeByteString */ -@OptIn(DelicateIoApi::class) +@OptIn(DelicateIoApi::class, UnsafeByteStringApi::class, UnsafeIoApi::class) public fun Sink.write(byteString: ByteString, startIndex: Int = 0, endIndex: Int = byteString.size) { checkBounds(byteString.size, startIndex, endIndex) if (endIndex == startIndex) { @@ -33,21 +34,17 @@ public fun Sink.write(byteString: ByteString, startIndex: Int = 0, endIndex: Int writeToInternalBuffer { buffer -> var offset = startIndex - val tail = buffer.head?.prev - if (tail != null) { - val bytesToWrite = min(tail.data.size - tail.limit, endIndex - offset) - byteString.copyInto(tail.data, tail.limit, offset, offset + bytesToWrite) - offset += bytesToWrite - tail.limit += bytesToWrite - buffer.sizeMut += bytesToWrite - } - while (offset < endIndex) { - val bytesToWrite = min(endIndex - offset, Segment.SIZE) - val seg = buffer.writableSegment(bytesToWrite) - byteString.copyInto(seg.data, seg.limit, offset, offset + bytesToWrite) - seg.limit += bytesToWrite - buffer.sizeMut += bytesToWrite - offset += bytesToWrite + + UnsafeByteStringOperations.withByteArrayUnsafe(byteString) { data -> + while (offset < endIndex) { + var written = 0 + UnsafeBufferOperations.writeToTail(buffer, 1) { segData, pos, limit -> + written = min(endIndex - offset, limit - pos) + data.copyInto(segData, pos, offset, offset + written) + written + } + offset += written + } } } } diff --git a/core/common/src/Sinks.kt b/core/common/src/Sinks.kt index 9c60950f8..e94741916 100644 --- a/core/common/src/Sinks.kt +++ b/core/common/src/Sinks.kt @@ -8,6 +8,7 @@ package kotlinx.io import kotlin.contracts.ExperimentalContracts import kotlin.contracts.InvocationKind.EXACTLY_ONCE import kotlin.contracts.contract +import kotlinx.io.unsafe.UnsafeBufferOperations private val HEX_DIGIT_BYTES = ByteArray(16) { ((if (it < 10) '0'.code else ('a'.code - 10)) + it).toByte() @@ -63,7 +64,7 @@ public fun Sink.writeLongLe(long: Long) { * * @sample kotlinx.io.samples.KotlinxIoCoreCommonSamples.writeDecimalLong */ -@OptIn(DelicateIoApi::class) +@OptIn(DelicateIoApi::class, UnsafeIoApi::class) public fun Sink.writeDecimalLong(long: Long) { var v = long if (v == 0L) { @@ -116,20 +117,17 @@ public fun Sink.writeDecimalLong(long: Long) { } writeToInternalBuffer { buffer -> - val tail = buffer.writableSegment(width) - val data = tail.data - var pos = tail.limit + width // We write backwards from right to left. - while (v != 0L) { - val digit = (v % 10).toInt() - data[--pos] = HEX_DIGIT_BYTES[digit] - v /= 10 + UnsafeBufferOperations.writeToTail(buffer, width) { ctx, segment -> + for (pos in width - 1 downTo if (negative) 1 else 0) { + val digit = (v % 10).toByte() + ctx.setUnchecked(segment, pos, HEX_DIGIT_BYTES[digit.toInt()]) + v /= 10 + } + if (negative) { + ctx.setUnchecked(segment, 0, '-'.code.toByte()) + } + width } - if (negative) { - data[--pos] = '-'.code.toByte() - } - - tail.limit += width - buffer.sizeMut += width.toLong() } } @@ -144,7 +142,7 @@ public fun Sink.writeDecimalLong(long: Long) { * * @sample kotlinx.io.samples.KotlinxIoCoreCommonSamples.writeHexLong */ -@OptIn(DelicateIoApi::class) +@OptIn(DelicateIoApi::class, UnsafeIoApi::class) public fun Sink.writeHexadecimalUnsignedLong(long: Long) { var v = long if (v == 0L) { @@ -156,17 +154,13 @@ public fun Sink.writeHexadecimalUnsignedLong(long: Long) { val width = hexNumberLength(v) writeToInternalBuffer { buffer -> - val tail = buffer.writableSegment(width) - val data = tail.data - var pos = tail.limit + width - 1 - val start = tail.limit - while (pos >= start) { - data[pos] = HEX_DIGIT_BYTES[(v and 0xF).toInt()] - v = v ushr 4 - pos-- + UnsafeBufferOperations.writeToTail(buffer, width) { ctx, segment -> + for (pos in width - 1 downTo 0) { + ctx.setUnchecked(segment, pos, HEX_DIGIT_BYTES[v.toInt().and(0xF)]) + v = v ushr 4 + } + width } - tail.limit += width - buffer.sizeMut += width.toLong() } } diff --git a/core/common/src/Sources.kt b/core/common/src/Sources.kt index d4583c0b5..fa5b0dd8d 100644 --- a/core/common/src/Sources.kt +++ b/core/common/src/Sources.kt @@ -62,20 +62,22 @@ internal const val OVERFLOW_DIGIT_START = Long.MIN_VALUE % 10L + 1 public fun Source.readDecimalLong(): Long { require(1L) - var currIdx = 0L var negative = false var value = 0L - var seen = 0 var overflowDigit = OVERFLOW_DIGIT_START - when (val b = buffer[currIdx++]) { + + when (val b = buffer[0]) { '-'.code.toByte() -> { negative = true overflowDigit-- + require(2) + if (buffer[1] !in '0'.code .. '9'.code) { + throw NumberFormatException("Expected a digit but was 0x${buffer[1].toHexString()}") + } } in '0'.code..'9'.code -> { value = ('0'.code - b).toLong() - seen = 1 } else -> { @@ -83,35 +85,39 @@ public fun Source.readDecimalLong(): Long { } } - while (request(currIdx + 1L)) { - val b = buffer[currIdx++] - if (b in '0'.code..'9'.code) { - val digit = '0'.code - b - - // Detect when the digit would cause an overflow. - if (value < OVERFLOW_ZONE || value == OVERFLOW_ZONE && digit < overflowDigit) { - with(Buffer()) { - writeDecimalLong(value) - writeByte(b) - - if (!negative) readByte() // Skip negative sign. - throw NumberFormatException("Number too large: ${readString()}") + var bufferOffset = 1L + while (request(bufferOffset + 1)) { + val finished = buffer.seek(bufferOffset) { seg, offset -> + seg!! + var currIdx = (bufferOffset - offset).toInt() + val size = seg.size + while (currIdx < size) { + val b = seg.getUnchecked(currIdx) + if (b in '0'.code..'9'.code) { + val digit = '0'.code - b + + // Detect when the digit would cause an overflow. + if (value < OVERFLOW_ZONE || value == OVERFLOW_ZONE && digit < overflowDigit) { + with(Buffer()) { + writeDecimalLong(value) + writeByte(b) + + if (!negative) readByte() // Skip negative sign. + throw NumberFormatException("Number too large: ${readString()}") + } + } + value = value * 10L + digit + currIdx++ + bufferOffset++ + } else { + return@seek true } } - value = value * 10L + digit - seen++ - } else { - break + false } + if (finished) break } - - if (seen < 1) { - require(2) - val expected = if (negative) "Expected a digit" else "Expected a digit or '-'" - throw NumberFormatException("$expected but was 0x${buffer[1].toHexString()}") - } - - skip(currIdx.toLong() - 1) + skip(bufferOffset) return if (negative) value else -value } @@ -142,22 +148,30 @@ public fun Source.readHexadecimalUnsignedLong(): Long { var bytesRead = 1L while (request(bytesRead + 1L)) { - val b = buffer[bytesRead] - val bDigit = when (b) { - in '0'.code..'9'.code -> b - '0'.code - in 'a'.code..'f'.code -> b - 'a'.code + 10 - in 'A'.code..'F'.code -> b - 'A'.code + 10 - else -> break - } - if (result and -0x1000000000000000L != 0L) { - with(Buffer()) { - writeHexadecimalUnsignedLong(result) - writeByte(b) - throw NumberFormatException("Number too large: " + readString()) + val stop = buffer.seek(bytesRead) { seg, offset -> + seg!! + val startIndex = (bytesRead - offset).toInt() + for (localOffset in startIndex until seg.size) { + val b = seg.getUnchecked(localOffset) + val bDigit = when (b) { + in '0'.code..'9'.code -> b - '0'.code + in 'a'.code..'f'.code -> b - 'a'.code + 10 + in 'A'.code..'F'.code -> b - 'A'.code + 10 + else -> return@seek true + } + if (result and -0x1000000000000000L != 0L) { + with(Buffer()) { + writeHexadecimalUnsignedLong(result) + writeByte(b) + throw NumberFormatException("Number too large: " + readString()) + } + } + result = result.shl(4) + bDigit + bytesRead++ } + false } - result = result.shl(4) + bDigit - bytesRead++ + if (stop) break } skip(bytesRead) return result diff --git a/core/common/src/Utf8.kt b/core/common/src/Utf8.kt index 31da1471e..55d0d703a 100644 --- a/core/common/src/Utf8.kt +++ b/core/common/src/Utf8.kt @@ -70,6 +70,9 @@ package kotlinx.io import kotlinx.io.internal.* +import kotlinx.io.unsafe.UnsafeBufferOperations +import kotlinx.io.unsafe.withData +import kotlin.math.min /** * Returns the number of bytes used to encode the slice of `string` as UTF-8 when using [Sink.writeString]. @@ -457,6 +460,7 @@ private fun Buffer.commonReadUtf8CodePoint(): Int { } } +@OptIn(UnsafeIoApi::class) private inline fun Buffer.commonWriteUtf8(beginIndex: Int, endIndex: Int, charAt: (Int) -> Char) { // Transcode a UTF-16 chars to UTF-8 bytes. var i = beginIndex @@ -465,45 +469,49 @@ private inline fun Buffer.commonWriteUtf8(beginIndex: Int, endIndex: Int, charAt when { c < 0x80 -> { - val tail = writableSegment(1) - val data = tail.data - val segmentOffset = tail.limit - i - val runLimit = minOf(endIndex, Segment.SIZE - segmentOffset) - - // Emit a 7-bit character with 1 byte. - data[segmentOffset + i++] = c.toByte() // 0xxxxxxx - - // Fast-path contiguous runs of ASCII characters. This is ugly, but yields a ~4x performance - // improvement over independent calls to writeByte(). - while (i < runLimit) { - c = charAt(i).code - if (c >= 0x80) break - data[segmentOffset + i++] = c.toByte() // 0xxxxxxx + UnsafeBufferOperations.writeToTail(this, 1) { ctx, segment -> + val segmentOffset = -i + val runLimit = minOf(endIndex, i + segment.remainingCapacity) + + // Emit a 7-bit character with 1 byte. + ctx.setUnchecked(segment, segmentOffset + i++, c.toByte()) // 0xxxxxxx + + // Fast-path contiguous runs of ASCII characters. This is ugly, but yields a ~4x performance + // improvement over independent calls to writeByte(). + while (i < runLimit) { + c = charAt(i).code + if (c >= 0x80) break + ctx.setUnchecked(segment, segmentOffset + i++, c.toByte()) // 0xxxxxxx + } + + i + segmentOffset // Equivalent to i - (previous i). } - - val runSize = i + segmentOffset - tail.limit // Equivalent to i - (previous i). - tail.limit += runSize - sizeMut += runSize.toLong() } c < 0x800 -> { // Emit a 11-bit character with 2 bytes. - val tail = writableSegment(2) - tail.data[tail.limit] = (c shr 6 or 0xc0).toByte() // 110xxxxx - tail.data[tail.limit + 1] = (c and 0x3f or 0x80).toByte() // 10xxxxxx - tail.limit += 2 - sizeMut += 2L + UnsafeBufferOperations.writeToTail(this, 2) { ctx, segment -> + ctx.setUnchecked( + segment, 0, + (c shr 6 or 0xc0).toByte(), // 110xxxxx + (c and 0x3f or 0x80).toByte() // 10xxxxxx + ) + 2 + } i++ } c < 0xd800 || c > 0xdfff -> { // Emit a 16-bit character with 3 bytes. - val tail = writableSegment(3) - tail.data[tail.limit] = (c shr 12 or 0xe0).toByte() // 1110xxxx - tail.data[tail.limit + 1] = (c shr 6 and 0x3f or 0x80).toByte() // 10xxxxxx - tail.data[tail.limit + 2] = (c and 0x3f or 0x80).toByte() // 10xxxxxx - tail.limit += 3 - sizeMut += 3L + UnsafeBufferOperations.writeToTail(this, 3) { ctx, segment -> + ctx.setUnchecked( + segment, 0, + (c shr 12 or 0xe0).toByte(), // 1110xxxx + (c shr 6 and 0x3f or 0x80).toByte(), // 10xxxxxx + (c and 0x3f or 0x80).toByte() // 10xxxxxx + ) + 3 + } i++ } @@ -522,13 +530,15 @@ private inline fun Buffer.commonWriteUtf8(beginIndex: Int, endIndex: Int, charAt val codePoint = 0x010000 + (c and 0x03ff shl 10 or (low and 0x03ff)) // Emit a 21-bit character with 4 bytes. - val tail = writableSegment(4) - tail.data[tail.limit] = (codePoint shr 18 or 0xf0).toByte() // 11110xxx - tail.data[tail.limit + 1] = (codePoint shr 12 and 0x3f or 0x80).toByte() // 10xxxxxx - tail.data[tail.limit + 2] = (codePoint shr 6 and 0x3f or 0x80).toByte() // 10xxyyyy - tail.data[tail.limit + 3] = (codePoint and 0x3f or 0x80).toByte() // 10yyyyyy - tail.limit += 4 - sizeMut += 4L + UnsafeBufferOperations.writeToTail(this, 4) { ctx, segment -> + ctx.setUnchecked(segment, 0, + (codePoint shr 18 or 0xf0).toByte(), // 11110xxx + (codePoint shr 12 and 0x3f or 0x80).toByte(), // 10xxxxxx + (codePoint shr 6 and 0x3f or 0x80).toByte(), // 10xxyyyy + (codePoint and 0x3f or 0x80).toByte() // 10yyyyyy + ) + 4 + } i += 2 } } @@ -536,6 +546,7 @@ private inline fun Buffer.commonWriteUtf8(beginIndex: Int, endIndex: Int, charAt } } +@OptIn(UnsafeIoApi::class) private fun Buffer.commonWriteUtf8CodePoint(codePoint: Int) { when { codePoint < 0 || codePoint > 0x10ffff -> { @@ -551,11 +562,11 @@ private fun Buffer.commonWriteUtf8CodePoint(codePoint: Int) { codePoint < 0x800 -> { // Emit a 11-bit code point with 2 bytes. - val tail = writableSegment(2) - tail.data[tail.limit] = (codePoint shr 6 or 0xc0).toByte() // 110xxxxx - tail.data[tail.limit + 1] = (codePoint and 0x3f or 0x80).toByte() // 10xxxxxx - tail.limit += 2 - sizeMut += 2L + UnsafeBufferOperations.writeToTail(this, 2) { ctx, segment -> + ctx.setUnchecked(segment, 0, (codePoint shr 6 or 0xc0).toByte()) // 110xxxxx + ctx.setUnchecked(segment, 1, (codePoint and 0x3f or 0x80).toByte()) // 10xxxxxx + 2 + } } codePoint in 0xd800..0xdfff -> { @@ -565,27 +576,28 @@ private fun Buffer.commonWriteUtf8CodePoint(codePoint: Int) { codePoint < 0x10000 -> { // Emit a 16-bit code point with 3 bytes. - val tail = writableSegment(3) - tail.data[tail.limit] = (codePoint shr 12 or 0xe0).toByte() // 1110xxxx - tail.data[tail.limit + 1] = (codePoint shr 6 and 0x3f or 0x80).toByte() // 10xxxxxx - tail.data[tail.limit + 2] = (codePoint and 0x3f or 0x80).toByte() // 10xxxxxx - tail.limit += 3 - sizeMut += 3L + UnsafeBufferOperations.writeToTail(this, 3) { ctx, segment -> + ctx.setUnchecked(segment, 0, (codePoint shr 12 or 0xe0).toByte()) // 1110xxxx + ctx.setUnchecked(segment, 1, (codePoint shr 6 and 0x3f or 0x80).toByte()) // 10xxxxxx + ctx.setUnchecked(segment, 2, (codePoint and 0x3f or 0x80).toByte()) // 10xxxxxx + 3 + } } else -> { // [0x10000, 0x10ffff] // Emit a 21-bit code point with 4 bytes. - val tail = writableSegment(4) - tail.data[tail.limit] = (codePoint shr 18 or 0xf0).toByte() // 11110xxx - tail.data[tail.limit + 1] = (codePoint shr 12 and 0x3f or 0x80).toByte() // 10xxxxxx - tail.data[tail.limit + 2] = (codePoint shr 6 and 0x3f or 0x80).toByte() // 10xxyyyy - tail.data[tail.limit + 3] = (codePoint and 0x3f or 0x80).toByte() // 10yyyyyy - tail.limit += 4 - sizeMut += 4L + UnsafeBufferOperations.writeToTail(this, 4) { ctx, segment -> + ctx.setUnchecked(segment,0, (codePoint shr 18 or 0xf0).toByte()) // 11110xxx + ctx.setUnchecked(segment,1, (codePoint shr 12 and 0x3f or 0x80).toByte()) // 10xxxxxx + ctx.setUnchecked(segment,2, (codePoint shr 6 and 0x3f or 0x80).toByte()) // 10xxyyyy + ctx.setUnchecked(segment,3, (codePoint and 0x3f or 0x80).toByte()) // 10yyyyyy + 4 + } } } } +@OptIn(UnsafeIoApi::class) private fun Buffer.commonReadUtf8(byteCount: Long): String { require(byteCount >= 0 && byteCount <= Int.MAX_VALUE) { "byteCount ($byteCount) is not within the range [0..${Int.MAX_VALUE})" @@ -593,20 +605,18 @@ private fun Buffer.commonReadUtf8(byteCount: Long): String { require(byteCount) if (byteCount == 0L) return "" - val s = head!! - if (s.pos + byteCount > s.limit) { - // If the string spans multiple segments, delegate to readBytes(). - - return readByteArray(byteCount.toInt()).commonToUtf8String() - } - - val result = s.data.commonToUtf8String(s.pos, s.pos + byteCount.toInt()) - s.pos += byteCount.toInt() - sizeMut -= byteCount - - if (s.pos == s.limit) { - recycleHead() + UnsafeBufferOperations.iterate(this) { ctx, head -> + head!! + if (head.size >= byteCount) { + var result = "" + ctx.withData(head) { data, pos, limit -> + result = data.commonToUtf8String(pos, min(limit, pos + byteCount.toInt())) + skip(byteCount) + return result + } + } } - return result + // If the string spans multiple segments, delegate to readBytes(). + return readByteArray(byteCount.toInt()).commonToUtf8String() } diff --git a/core/common/src/unsafe/UnsafeBufferOperations.kt b/core/common/src/unsafe/UnsafeBufferOperations.kt index 08943ed52..1f0ce7f34 100644 --- a/core/common/src/unsafe/UnsafeBufferOperations.kt +++ b/core/common/src/unsafe/UnsafeBufferOperations.kt @@ -254,12 +254,11 @@ public object UnsafeBufferOperations { public inline fun writeToTail( buffer: Buffer, minimumCapacity: Int, - writeAction: (SegmentWriteContext, Segment) -> Int + writeAction: (context: SegmentWriteContext, tail: Segment) -> Int ): Int { contract { callsInPlace(writeAction, EXACTLY_ONCE) } - val tail = buffer.writableSegment(minimumCapacity) val bytesWritten = writeAction(SegmentWriteContextImpl, tail) @@ -304,7 +303,10 @@ public object UnsafeBufferOperations { * @sample kotlinx.io.samples.unsafe.UnsafeReadWriteSamplesJvm.messageDigest * @sample kotlinx.io.samples.unsafe.UnsafeBufferOperationsSamples.crc32Unsafe */ - public inline fun iterate(buffer: Buffer, iterationAction: (BufferIterationContext, Segment?) -> Unit) { + public inline fun iterate( + buffer: Buffer, + iterationAction: (context: BufferIterationContext, head: Segment?) -> Unit + ) { contract { callsInPlace(iterationAction, EXACTLY_ONCE) } @@ -335,7 +337,7 @@ public object UnsafeBufferOperations { */ public inline fun iterate( buffer: Buffer, offset: Long, - iterationAction: (BufferIterationContext, Segment?, Long) -> Unit + iterationAction: (context: BufferIterationContext, segment: Segment?, startOfTheSegmentOffset: Long) -> Unit ) { contract { callsInPlace(iterationAction, EXACTLY_ONCE) @@ -393,7 +395,10 @@ public interface SegmentReadContext { @UnsafeIoApi @JvmSynthetic @OptIn(ExperimentalContracts::class) -public inline fun SegmentReadContext.withData(segment: Segment, readAction: (ByteArray, Int, Int) -> Unit) { +public inline fun SegmentReadContext.withData( + segment: Segment, + readAction: (bytes: ByteArray, startIndexInclusive: Int, endIndexExclusive: Int) -> Unit +) { contract { callsInPlace(readAction, EXACTLY_ONCE) } diff --git a/core/jvm/src/BuffersJvm.kt b/core/jvm/src/BuffersJvm.kt index cb5625f31..6b488a53d 100644 --- a/core/jvm/src/BuffersJvm.kt +++ b/core/jvm/src/BuffersJvm.kt @@ -20,6 +20,8 @@ */ package kotlinx.io +import kotlinx.io.unsafe.UnsafeBufferOperations +import kotlinx.io.unsafe.withData import java.io.EOFException import java.io.IOException import java.io.InputStream @@ -57,23 +59,25 @@ public fun Buffer.write(input: InputStream, byteCount: Long): Buffer { return this } +@OptIn(UnsafeIoApi::class) private fun Buffer.write(input: InputStream, byteCount: Long, forever: Boolean) { var remainingByteCount = byteCount - while (remainingByteCount > 0L || forever) { - val tail = writableSegment(1) - val maxToCopy = minOf(remainingByteCount, Segment.SIZE - tail.limit).toInt() - val bytesRead = input.read(tail.data, tail.limit, maxToCopy) - if (bytesRead == -1) { - if (tail.pos == tail.limit) { - // We allocated a tail segment, but didn't end up needing it. Recycle! - recycleTail() + var exchaused = false + while (!exchaused && (remainingByteCount > 0L || forever)) { + UnsafeBufferOperations.writeToTail(this, 1) { data, pos, limit -> + val maxToCopy = minOf(remainingByteCount, limit - pos).toInt() + val bytesRead = input.read(data, pos, maxToCopy) + if (bytesRead == -1) { + if (!forever) { + throw EOFException("Stream exhausted before $byteCount bytes were read.") + } + exchaused = true + 0 + } else { + remainingByteCount -= bytesRead + bytesRead } - if (forever) return - throw EOFException("Stream exhausted before $byteCount bytes were read.") } - tail.limit += bytesRead - sizeMut += bytesRead.toLong() - remainingByteCount -= bytesRead.toLong() } } @@ -87,22 +91,17 @@ private fun Buffer.write(input: InputStream, byteCount: Long, forever: Boolean) * * @sample kotlinx.io.samples.KotlinxIoSamplesJvm.bufferTransferToStream */ +@OptIn(UnsafeIoApi::class) public fun Buffer.readTo(out: OutputStream, byteCount: Long = size) { checkOffsetAndCount(size, 0, byteCount) var remainingByteCount = byteCount - var s = head while (remainingByteCount > 0L) { - val toCopy = minOf(remainingByteCount, s!!.limit - s.pos).toInt() - out.write(s.data, s.pos, toCopy) - - s.pos += toCopy - sizeMut -= toCopy.toLong() - remainingByteCount -= toCopy.toLong() - - if (s.pos == s.limit) { - recycleHead() - s = head + UnsafeBufferOperations.readFromHead(this) { data, pos, limit -> + val toCopy = minOf(remainingByteCount, limit - pos).toInt() + out.write(data, pos, toCopy) + remainingByteCount -= toCopy + toCopy } } } @@ -120,6 +119,7 @@ public fun Buffer.readTo(out: OutputStream, byteCount: Long = size) { * * @sample kotlinx.io.samples.KotlinxIoSamplesJvm.copyBufferToOutputStream */ +@OptIn(UnsafeIoApi::class) public fun Buffer.copyTo( out: OutputStream, startIndex: Long = 0L, @@ -128,24 +128,20 @@ public fun Buffer.copyTo( checkBounds(size, startIndex, endIndex) if (startIndex == endIndex) return - var currentOffset = startIndex var remainingByteCount = endIndex - startIndex - // Skip segments that we aren't copying from. - var s = head - while (currentOffset >= s!!.limit - s.pos) { - currentOffset -= (s.limit - s.pos).toLong() - s = s.next - } - - // Copy from one segment at a time. - while (remainingByteCount > 0L) { - val pos = (s!!.pos + currentOffset).toInt() - val toCopy = minOf(s.limit - pos, remainingByteCount).toInt() - out.write(s.data, pos, toCopy) - remainingByteCount -= toCopy.toLong() - currentOffset = 0L - s = s.next + UnsafeBufferOperations.iterate(this, startIndex) { ctx, seg, segOffset -> + var curr = seg!! + var currentOffset = (startIndex - segOffset).toInt() + while (remainingByteCount > 0) { + ctx.withData(curr) { data, pos, limit -> + val toCopy = minOf(limit - pos - currentOffset, remainingByteCount).toInt() + out.write(data, pos + currentOffset, toCopy) + remainingByteCount -= toCopy + } + curr = ctx.next(curr) ?: break + currentOffset = 0 + } } } @@ -157,17 +153,14 @@ public fun Buffer.copyTo( * * @sample kotlinx.io.samples.KotlinxIoSamplesJvm.readWriteByteBuffer */ +@OptIn(UnsafeIoApi::class) public fun Buffer.readAtMostTo(sink: ByteBuffer): Int { - val s = head ?: return -1 - - val toCopy = minOf(sink.remaining(), s.limit - s.pos) - sink.put(s.data, s.pos, toCopy) - - s.pos += toCopy - sizeMut -= toCopy.toLong() - - if (s.pos == s.limit) { - recycleHead() + if (exhausted()) return -1 + var toCopy = 0 + UnsafeBufferOperations.readFromHead(this) { data, pos, limit -> + toCopy = minOf(sink.remaining(), limit - pos) + sink.put(data, pos, toCopy) + toCopy } return toCopy @@ -178,20 +171,20 @@ public fun Buffer.readAtMostTo(sink: ByteBuffer): Int { * * @sample kotlinx.io.samples.KotlinxIoSamplesJvm.transferBufferFromByteBuffer */ +@OptIn(UnsafeIoApi::class) public fun Buffer.transferFrom(source: ByteBuffer): Buffer { val byteCount = source.remaining() var remaining = byteCount - while (remaining > 0) { - val tail = writableSegment(1) - val toCopy = minOf(remaining, Segment.SIZE - tail.limit) - source.get(tail.data, tail.limit, toCopy) - - remaining -= toCopy - tail.limit += toCopy + while (remaining > 0) { + UnsafeBufferOperations.writeToTail(this, 1) { data, pos, limit -> + val toCopy = minOf(remaining, limit - pos) + source.get(data, pos, toCopy) + remaining -= toCopy + toCopy + } } - sizeMut += byteCount.toLong() return this } diff --git a/core/jvm/src/JvmCore.kt b/core/jvm/src/JvmCore.kt index 67acedf73..e714f7740 100644 --- a/core/jvm/src/JvmCore.kt +++ b/core/jvm/src/JvmCore.kt @@ -21,6 +21,7 @@ package kotlinx.io +import kotlinx.io.unsafe.UnsafeBufferOperations import java.io.IOException import java.io.InputStream import java.io.OutputStream @@ -38,21 +39,17 @@ private open class OutputStreamSink( private val out: OutputStream, ) : RawSink { + @OptIn(UnsafeIoApi::class) override fun write(source: Buffer, byteCount: Long) { checkOffsetAndCount(source.size, 0, byteCount) var remaining = byteCount while (remaining > 0) { // kotlinx.io TODO: detect Interruption. - val head = source.head!! - val toCopy = minOf(remaining, head.limit - head.pos).toInt() - out.write(head.data, head.pos, toCopy) - - head.pos += toCopy - remaining -= toCopy - source.sizeMut -= toCopy - - if (head.pos == head.limit) { - source.recycleHead() + UnsafeBufferOperations.readFromHead(source) { data, pos, limit -> + val toCopy = minOf(remaining, limit - pos).toInt() + out.write(data, pos, toCopy) + remaining -= toCopy + toCopy } } } @@ -77,23 +74,22 @@ private open class InputStreamSource( private val input: InputStream, ) : RawSource { + @OptIn(UnsafeIoApi::class) override fun readAtMostTo(sink: Buffer, byteCount: Long): Long { if (byteCount == 0L) return 0L checkByteCount(byteCount) try { - val tail = sink.writableSegment(1) - val maxToCopy = minOf(byteCount, Segment.SIZE - tail.limit).toInt() - val bytesRead = input.read(tail.data, tail.limit, maxToCopy) - if (bytesRead == -1) { - if (tail.pos == tail.limit) { - // We allocated a tail segment, but didn't end up needing it. Recycle! - sink.recycleTail() + var readTotal = 0L + UnsafeBufferOperations.writeToTail(sink, 1) { data, pos, limit -> + val maxToCopy = minOf(byteCount, limit - pos).toInt() + readTotal = input.read(data, pos, maxToCopy).toLong() + if (readTotal == -1L) { + 0 + } else { + readTotal.toInt() } - return -1 } - tail.limit += bytesRead - sink.sizeMut += bytesRead - return bytesRead.toLong() + return readTotal } catch (e: AssertionError) { if (e.isAndroidGetsocknameError) throw IOException(e) throw e diff --git a/core/jvm/src/SourcesJvm.kt b/core/jvm/src/SourcesJvm.kt index e34946058..12c73c626 100644 --- a/core/jvm/src/SourcesJvm.kt +++ b/core/jvm/src/SourcesJvm.kt @@ -20,12 +20,14 @@ */ package kotlinx.io +import kotlinx.io.unsafe.UnsafeBufferOperations import java.io.EOFException import java.io.InputStream import java.nio.ByteBuffer import java.nio.channels.ReadableByteChannel import java.nio.charset.Charset +@OptIn(UnsafeIoApi::class) private fun Buffer.readStringImpl(byteCount: Long, charset: Charset): String { require(byteCount >= 0 && byteCount <= Int.MAX_VALUE) { "byteCount ($byteCount) is not within the range [0..${Int.MAX_VALUE})" @@ -35,21 +37,21 @@ private fun Buffer.readStringImpl(byteCount: Long, charset: Charset): String { } if (byteCount == 0L) return "" - val s = head!! - if (s.pos + byteCount > s.limit) { - // If the string spans multiple segments, delegate to readBytes(). - return String(readByteArray(byteCount.toInt()), charset) + var result: String? = null + UnsafeBufferOperations.readFromHead(this) { data, pos, limit -> + val len = limit - pos + if (len >= byteCount) { + result = String(data, pos, byteCount.toInt(), charset) + byteCount.toInt() + } else { + 0 + } } - - val result = String(s.data, s.pos, byteCount.toInt(), charset) - s.pos += byteCount.toInt() - sizeMut -= byteCount - - if (s.pos == s.limit) { - recycleHead() + return if (result == null) { + String(readByteArray(byteCount.toInt()), charset) + } else { + result!! } - - return result } /** diff --git a/core/nodeFilesystemShared/src/files/PathsNodeJs.kt b/core/nodeFilesystemShared/src/files/PathsNodeJs.kt index efced6b9f..a612c352a 100644 --- a/core/nodeFilesystemShared/src/files/PathsNodeJs.kt +++ b/core/nodeFilesystemShared/src/files/PathsNodeJs.kt @@ -8,6 +8,7 @@ package kotlinx.io.files import kotlinx.io.* import kotlinx.io.node.buffer import kotlinx.io.node.fs +import kotlinx.io.unsafe.UnsafeBufferOperations import kotlinx.io.node.path as nodeJsPath public actual class Path internal constructor( @@ -143,6 +144,7 @@ internal class FileSink(path: Path, append: Boolean) : RawSink { return fd } + @OptIn(UnsafeIoApi::class) override fun write(source: Buffer, byteCount: Long) { check(!closed) { "Sink is closed." } if (byteCount == 0L) { @@ -151,20 +153,20 @@ internal class FileSink(path: Path, append: Boolean) : RawSink { var remainingBytes = minOf(byteCount, source.size) while (remainingBytes > 0) { - val head = source.head!! - val segmentBytes = head.limit - head.pos - val buf = buffer.Buffer.allocUnsafe(segmentBytes) - val data = head.data - val pos = head.pos - for (offset in 0 until segmentBytes) { - buf.writeInt8(data[pos + offset], offset) - } - withCaughtException { - fs.writeFileSync(fd, buf) - }?.also { - throw IOException("Write failed", it) + var segmentBytes = 0 + UnsafeBufferOperations.readFromHead(source) { headData, headPos, headLimit -> + segmentBytes = headLimit - headPos + val buf = buffer.Buffer.allocUnsafe(segmentBytes) + for (offset in 0 until segmentBytes) { + buf.writeInt8(headData[headPos + offset], offset) + } + withCaughtException { + fs.writeFileSync(fd, buf) + }?.also { + throw IOException("Write failed", it) + } + segmentBytes } - source.skip(segmentBytes.toLong()) remainingBytes -= segmentBytes } } diff --git a/core/wasmWasi/src/-WasmUtils.kt b/core/wasmWasi/src/-WasmUtils.kt index 3aa455151..3a644b525 100644 --- a/core/wasmWasi/src/-WasmUtils.kt +++ b/core/wasmWasi/src/-WasmUtils.kt @@ -6,6 +6,8 @@ package kotlinx.io +import kotlinx.io.unsafe.UnsafeBufferOperations +import kotlin.math.min import kotlin.wasm.unsafe.MemoryAllocator import kotlin.wasm.unsafe.Pointer import kotlin.wasm.unsafe.UnsafeWasmMemoryApi @@ -34,47 +36,39 @@ internal fun Pointer.storeBytes(bytes: ByteArray) { } } -@OptIn(UnsafeWasmMemoryApi::class) +@OptIn(UnsafeWasmMemoryApi::class, UnsafeIoApi::class) internal fun Buffer.readToLinearMemory(pointer: Pointer, bytes: Int) { checkBounds(size, 0L, bytes.toLong()) - var current: Segment? = head var remaining = bytes var currentPtr = pointer - do { - current!! - val data = current.data - val pos = current.pos - val limit = current.limit - val read = minOf(remaining, limit - pos) - for (offset in 0 until read) { - currentPtr.storeByte(offset, data[pos + offset]) + while (remaining > 0 && !exhausted()) { + UnsafeBufferOperations.readFromHead(this) { ctx, seg -> + val read = minOf(remaining, seg.size) + for (offset in 0.. 0) - check(remaining == 0) - skip(bytes.toLong()) + } } - +@OptIn(UnsafeIoApi::class, UnsafeWasmMemoryApi::class) internal fun Buffer.writeFromLinearMemory(pointer: Pointer, bytes: Int) { var remaining = bytes var currentPtr = pointer while (remaining > 0) { - val segment = writableSegment(1) - val limit = segment.limit - val data = segment.data - val toWrite = minOf(data.size - limit, remaining) + UnsafeBufferOperations.writeToTail(this, 1) { ctx, seg -> - for (offset in 0 until toWrite) { - data[limit + offset] = currentPtr.loadByte(offset) + val cap = min(seg.remainingCapacity, remaining) + for (offset in 0..