Skip to content

Commit

Permalink
Buffer iterator now will return a truncated token when
Browse files Browse the repository at this point in the history
underlying buffer is smaller than tokenized token.

Add small buffer test BufferedReader tokenize to work as expected.

And BufferedReader peek method will work when least param
greater than zero.
  • Loading branch information
hawkbee committed Jul 14, 2023
1 parent 1d6fe9f commit 74361d4
Showing 1 changed file with 120 additions and 24 deletions.
144 changes: 120 additions & 24 deletions lib/std/io/buffered_reader.zig
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ pub fn TokenIterator(comptime Context: type, comptime delimiter_type: mem.Delimi

/// Returns a slice of the current token, or null if tokenization is
/// complete, and advances to the next token.
/// If the underlying buffer is smaller than the token length,
/// token will be truncated to buffer length each time `next` called.
pub fn next(self: *Self) ?[]const u8 {
if (self.lastpos > self.context.start) {
self.context.discard(self.lastpos - self.context.start) catch return null;
Expand All @@ -28,25 +30,41 @@ pub fn TokenIterator(comptime Context: type, comptime delimiter_type: mem.Delimi
.any, .scalar => 1,
};

var index: usize = 0;
var buffer = self.context.peek(0);
while (index < buffer.len and self.isDelimiter(buffer, index)) : (index += delimiter_len) {
if (index >= buffer.len) {
var start: usize = 0;
var end: usize = 0;
var buffer: []const u8 = undefined;
while (true) {
var index: usize = 0;
buffer = self.context.peek(0);
while (index < buffer.len and self.isDelimiter(buffer, index)) {
index += delimiter_len;
if (index >= buffer.len) {
self.context.discard(buffer.len) catch return null;
buffer = self.context.peek(0);
index = 0;
}
}

start = index;
if (start == buffer.len) {
self.context.discard(buffer.len) catch return null;
buffer = self.context.peek(0);
index = 0;
return null;
}
}

const start = index;
if (start == buffer.len) {
self.context.discard(buffer.len) catch return null;
return null;
}
// move to end of token
end = start;
while (end < buffer.len and !self.isDelimiter(buffer, end)) : (end += 1) {}

if (end < buffer.len) {
break;
}

// move to end of token
var end = start;
while (end < buffer.len and !self.isDelimiter(buffer, end)) : (end += 1) {}
// Grab one more byte
const newbuf = self.context.peek(buffer.len + 1);
if (newbuf.len == buffer.len) {
break;
}
}

self.lastpos = self.context.start + end;
return buffer[start..end];
Expand Down Expand Up @@ -81,23 +99,42 @@ pub fn BufferedReader(comptime buffer_size: usize, comptime pushback_size: usize

const Self = @This();

/// Returns the buffer slice to Application directly. Application can later
/// read the data again or forward the buffer cursor by calling discard.
/// Returns the buffered data slice to application directly. Application can later
/// read the data again by `read`, or forward the buffered data cursor by `discard`.
///
/// If the `least` param is zero, it will return the entire buffered data slice.
/// If the returned slice length is lesser than `least`, it means the reader had reached end.
/// It will automatically fill buffer if buffer is empty or buffered data
/// is less than the request `least` length.
///
/// If this method was used with pushback buffer, data move may occur
/// when there is a hole in the buffer.
pub fn peek(self: *Self, least: usize) []const u8 {
if (self.start == self.end) {
self.start = 0;
self.end = 0;
const n = self.unbuffered_reader.read(self.buf[pushback_size..][0..]) catch 0;
self.end += n;
}
var remain = self.end - self.start;
if (self.start == self.end or remain < least) {
if (self.start == self.end) {
if (least > remain) {
const delta = least - remain;
if (self.start > 0 and (self.buf.len - self.end < delta)) {
std.mem.copyForwards(u8, self.buf[pushback_size..][0..remain], self.buf[pushback_size..][self.start..self.end]);
self.start = 0;
self.end = 0;
self.end = remain;
}
var nread: usize = 0;
while (nread < delta) {
const n = self.unbuffered_reader.read(self.buf[pushback_size..][self.end..]) catch 0;
if (n == 0) break;
self.end += n;
nread += n;
}
const n = self.unbuffered_reader.read(self.buf[pushback_size..][self.start..]) catch 0;
self.end += n;
remain = self.end - self.start;
}

var nsize: usize = if (least > 0) least else remain;
var nsize: usize = if (least > 0) @min(least, remain) else remain;
if (pushback_size == 0 or self.pushback == 0) {
return self.buf[pushback_size..][self.start..(self.start + nsize)];
}
Expand All @@ -111,7 +148,7 @@ pub fn BufferedReader(comptime buffer_size: usize, comptime pushback_size: usize
self.end = remain;
}
remain += self.pushback;
nsize = if (least > 0) least else remain;
nsize = if (least > 0) @min(least, remain) else remain;
return self.buf[@intCast(pushback_size - self.pushback)..][0..nsize];
}

Expand Down Expand Up @@ -325,6 +362,31 @@ test "tokenize" {
try testing.expect(lines.next() == null);
}

test "tokenize on small buffer" {
const gpa = std.testing.allocator;
const data = "GET / HTTP/1.1\r\nHost: localhost.testing.run\r\n\r\n2\r\nHe\r\n2\r\nll\r\n1\r\no\r\n0\r\n\r\n";
var br = try gpa.create(BufferedReader(8, 0, std.io.FixedBufferStream([]const u8)));
br.* = .{ .unbuffered_reader = std.io.fixedBufferStream(data) };

defer gpa.destroy(br);

var lines = br.lines();
try testing.expectEqualSlices(u8, lines.next().?, "GET / HT");
try testing.expectEqualSlices(u8, lines.next().?, "TP/1.1");
try testing.expectEqualSlices(u8, lines.next().?, "Host: lo");
try testing.expectEqualSlices(u8, lines.next().?, "calhost.");
try testing.expectEqualSlices(u8, lines.next().?, "testing.");
try testing.expectEqualSlices(u8, lines.next().?, "run");
try testing.expectEqualSlices(u8, lines.next().?, "2");
try testing.expectEqualSlices(u8, lines.next().?, "He");
try testing.expectEqualSlices(u8, lines.next().?, "2");
try testing.expectEqualSlices(u8, lines.next().?, "ll");
try testing.expectEqualSlices(u8, lines.next().?, "1");
try testing.expectEqualSlices(u8, lines.next().?, "o");
try testing.expectEqualSlices(u8, lines.next().?, "0");
try testing.expect(lines.next() == null);
}

test "toWriter" {
var buf: [255]u8 = undefined;
var fbs = std.io.fixedBufferStream(&buf);
Expand Down Expand Up @@ -417,6 +479,10 @@ test "io.BufferedReader OneByte" {
fn reader(self: *Self) Reader {
return .{ .context = self };
}

fn reset(self: *Self) void {
self.curr = 0;
}
};

const str = "This is a test";
Expand All @@ -427,6 +493,36 @@ test "io.BufferedReader OneByte" {
const res = try stream.readAllAlloc(testing.allocator, str.len + 1);
defer testing.allocator.free(res);
try testing.expectEqualSlices(u8, str, res);

// tokenize
{
const data = "GET / HTTP/1.1\r\nHost: localhost.testing.run\r\n\r\n2\r\nHe\r\n2\r\nll\r\n1\r\no\r\n0\r\n\r\n";
var onebyte = OneByteReadReader.init(data);
var br = bufferedReaderSize(8, onebyte.reader());

var lines = br.lines();
try testing.expectEqualSlices(u8, lines.next().?, "GET / HT");
try testing.expectEqualSlices(u8, lines.next().?, "TP/1.1");
try testing.expectEqualSlices(u8, lines.next().?, "Host: lo");
try testing.expectEqualSlices(u8, lines.next().?, "calhost.");
try testing.expectEqualSlices(u8, lines.next().?, "testing.");
try testing.expectEqualSlices(u8, lines.next().?, "run");
try testing.expectEqualSlices(u8, lines.next().?, "2");
try testing.expectEqualSlices(u8, lines.next().?, "He");
try testing.expectEqualSlices(u8, lines.next().?, "2");
try testing.expectEqualSlices(u8, lines.next().?, "ll");
try testing.expectEqualSlices(u8, lines.next().?, "1");
try testing.expectEqualSlices(u8, lines.next().?, "o");
try testing.expectEqualSlices(u8, lines.next().?, "0");
try testing.expect(lines.next() == null);

onebyte.reset();
try testing.expectEqualSlices(u8, br.peek(3), "GET");
try br.discard(2);
try testing.expectEqualSlices(u8, br.peek(3), "T /");
try br.discard(2);
try testing.expectEqualSlices(u8, br.peek(3), "/ H");
}
}

fn smallBufferedReader(underlying_stream: anytype) BufferedReader(8, 0, @TypeOf(underlying_stream)) {
Expand Down

0 comments on commit 74361d4

Please sign in to comment.