Skip to content

Commit

Permalink
Add CancellationToken to TextReader.ReadXAsync. (dotnet#20824)
Browse files Browse the repository at this point in the history
  • Loading branch information
bgrainger committed Jan 20, 2022
1 parent a5158df commit 4f72069
Show file tree
Hide file tree
Showing 9 changed files with 178 additions and 18 deletions.
11 changes: 11 additions & 0 deletions src/libraries/System.Console/src/System/IO/SyncTextReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

using System.Diagnostics;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;

namespace System.IO
Expand Down Expand Up @@ -95,11 +96,21 @@ public override string ReadToEnd()
return Task.FromResult(ReadLine());
}

public override ValueTask<string?> ReadLineAsync(CancellationToken cancellationToken)
{
return cancellationToken.IsCancellationRequested ? ValueTask.FromCanceled<string?>(cancellationToken) : new ValueTask<string?>(ReadLine());
}

public override Task<string> ReadToEndAsync()
{
return Task.FromResult(ReadToEnd());
}

public override Task<string> ReadToEndAsync(CancellationToken cancellationToken)
{
return cancellationToken.IsCancellationRequested ? Task.FromCanceled<string>(cancellationToken) : Task.FromResult(ReadToEnd());
}

public override Task<int> ReadBlockAsync(char[] buffer, int index, int count)
{
if (buffer == null)
Expand Down
68 changes: 68 additions & 0 deletions src/libraries/System.IO/tests/StreamReader/StreamReaderTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,53 @@ public async Task ReadToEndAsync()
Assert.Equal(5000, result.Length);
}

[Fact]
public async Task ReadToEndAsync_WithCancellationToken()
{
using var sw = new StreamReader(GetLargeStream());
var result = await sw.ReadToEndAsync(default);

Assert.Equal(5000, result.Length);
}

[Fact]
public async Task ReadToEndAsync_WithCanceledCancellationToken()
{
using var sw = new StreamReader(GetLargeStream());
using var cts = new CancellationTokenSource();
cts.Cancel();
await Assert.ThrowsAnyAsync<OperationCanceledException>(async () => await sw.ReadToEndAsync(cts.Token));
}

[Fact]
public async Task ReadToEndAsync_WithCancellation()
{
var path = Path.GetTempFileName();
try
{
// create large (~100MB) file
using (var writer = new StreamWriter(path))
{
for (var i = 0; i < 1_000_000; i++)
writer.WriteLine("A very large file used for testing StreamReader cancellation. 0123456789012345678901234567890123456789.");
}

using var reader = File.OpenText(path);
using var cts = new CancellationTokenSource(TimeSpan.FromMilliseconds(50));
await Assert.ThrowsAnyAsync<OperationCanceledException>(async () => await reader.ReadToEndAsync(cts.Token));
}
finally
{
try
{
File.Delete(path);
}
catch (Exception)
{
}
}
}

[Fact]
public void GetBaseStream()
{
Expand Down Expand Up @@ -301,6 +348,27 @@ public void VanillaReadLines2()
Assert.Equal(valueString.Substring(1, valueString.IndexOf('\r') - 1), data);
}

[Fact]
public async Task VanillaReadLineAsync()
{
var baseInfo = GetCharArrayStream();
var sr = baseInfo.Item2;

string valueString = new string(baseInfo.Item1);

var data = await sr.ReadLineAsync();
Assert.Equal(valueString.Substring(0, valueString.IndexOf('\r')), data);

data = await sr.ReadLineAsync(default);
Assert.Equal(valueString.Substring(valueString.IndexOf('\r') + 1, 3), data);

data = await sr.ReadLineAsync();
Assert.Equal(valueString.Substring(valueString.IndexOf('\n') + 1, 2), data);

data = await sr.ReadLineAsync(default);
Assert.Equal((valueString.Substring(valueString.LastIndexOf('\n') + 1)), data);
}

[Fact]
public async Task ContinuousNewLinesAndTabsAsync()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,23 @@ public static void ReadLine()
}
}

[Fact]
public static async Task ReadLineAsync()
{
string str1 = "Hello\0\t\v \\ World";
string str2 = str1 + Environment.NewLine + str1;

using (StringReader sr = new StringReader(str1))
{
Assert.Equal(str1, await sr.ReadLineAsync());
}
using (StringReader sr = new StringReader(str2))
{
Assert.Equal(str1, await sr.ReadLineAsync(default));
Assert.Equal(str1, await sr.ReadLineAsync(default));
}
}

[Fact]
public static void ReadPseudoRandomString()
{
Expand Down Expand Up @@ -155,6 +172,14 @@ public static void ReadToEndPseudoRandom() {
Assert.Equal(str1, sr.ReadToEnd());
}

[Fact]
public static async Task ReadToEndAsyncString()
{
string str1 = "Hello\0\t\v \\ World";
StringReader sr = new StringReader(str1);
Assert.Equal(str1, await sr.ReadToEndAsync(default));
}

[Fact]
public static void Closed_DisposedExceptions()
{
Expand Down Expand Up @@ -278,6 +303,8 @@ public async Task Precanceled_ThrowsException()

await Assert.ThrowsAnyAsync<OperationCanceledException>(() => reader.ReadAsync(Memory<char>.Empty, new CancellationToken(true)).AsTask());
await Assert.ThrowsAnyAsync<OperationCanceledException>(() => reader.ReadBlockAsync(Memory<char>.Empty, new CancellationToken(true)).AsTask());
await Assert.ThrowsAnyAsync<OperationCanceledException>(() => reader.ReadLineAsync(new CancellationToken(true)).AsTask());
await Assert.ThrowsAnyAsync<OperationCanceledException>(() => reader.ReadToEndAsync(new CancellationToken(true)));
}

private static void ValidateDisposedExceptions(StringReader sr)
Expand Down
18 changes: 18 additions & 0 deletions src/libraries/System.IO/tests/TextReader/TextReaderTests.cs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Threading;
using System.Threading.Tasks;
using Xunit;

Expand Down Expand Up @@ -54,6 +55,23 @@ public async Task ReadToEndAsync()
}
}

[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
public async Task ReadToEndAsync_WithCancellationToken()
{
using var tr = new CharArrayTextReader(TestDataProvider.LargeData);
var result = await tr.ReadToEndAsync(default);
Assert.Equal(5000, result.Length);
}

[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
public async Task ReadToEndAsync_WithCanceledCancellationToken()
{
using var tr = new CharArrayTextReader(TestDataProvider.LargeData);
using var cts = new CancellationTokenSource();
cts.Cancel();
await Assert.ThrowsAnyAsync<OperationCanceledException>(async () => await tr.ReadToEndAsync(cts.Token));
}

[Fact]
public void TestRead()
{
Expand Down
2 changes: 1 addition & 1 deletion src/libraries/System.Private.CoreLib/src/System/IO/File.cs
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,7 @@ private static async Task<string[]> InternalReadAllLinesAsync(string path, Encod
cancellationToken.ThrowIfCancellationRequested();
string? line;
List<string> lines = new List<string>();
while ((line = await sr.ReadLineAsync().ConfigureAwait(false)) != null)
while ((line = await sr.ReadLineAsync(cancellationToken).ConfigureAwait(false)) != null)
{
lines.Add(line);
cancellationToken.ThrowIfCancellationRequested();
Expand Down
32 changes: 19 additions & 13 deletions src/libraries/System.Private.CoreLib/src/System/IO/StreamReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -845,29 +845,32 @@ private int ReadBuffer(Span<char> userBuffer, out bool readToUserBuffer)
return sb.ToString();
}

public override Task<string?> ReadLineAsync()
public override Task<string?> ReadLineAsync() =>
ReadLineAsync(default).AsTask();

public override ValueTask<string?> ReadLineAsync(CancellationToken cancellationToken)
{
// If we have been inherited into a subclass, the following implementation could be incorrect
// since it does not call through to Read() which a subclass might have overridden.
// To be safe we will only use this implementation in cases where we know it is safe to do so,
// and delegate to our base class (which will call into Read) when we are not sure.
if (GetType() != typeof(StreamReader))
{
return base.ReadLineAsync();
return base.ReadLineAsync(cancellationToken);
}

ThrowIfDisposed();
CheckAsyncTaskInProgress();

Task<string?> task = ReadLineAsyncInternal();
Task<string?> task = ReadLineAsyncInternal(cancellationToken);
_asyncReadTask = task;

return task;
return new ValueTask<string?>(task);
}

private async Task<string?> ReadLineAsyncInternal()
private async Task<string?> ReadLineAsyncInternal(CancellationToken cancellationToken)
{
if (_charPos == _charLen && (await ReadBufferAsync(CancellationToken.None).ConfigureAwait(false)) == 0)
if (_charPos == _charLen && (await ReadBufferAsync(cancellationToken).ConfigureAwait(false)) == 0)
{
return null;
}
Expand Down Expand Up @@ -903,7 +906,7 @@ private int ReadBuffer(Span<char> userBuffer, out bool readToUserBuffer)

_charPos = tmpCharPos = i + 1;

if (ch == '\r' && (tmpCharPos < tmpCharLen || (await ReadBufferAsync(CancellationToken.None).ConfigureAwait(false)) > 0))
if (ch == '\r' && (tmpCharPos < tmpCharLen || (await ReadBufferAsync(cancellationToken).ConfigureAwait(false)) > 0))
{
tmpCharPos = _charPos;
if (_charBuffer[tmpCharPos] == '\n')
Expand All @@ -921,32 +924,35 @@ private int ReadBuffer(Span<char> userBuffer, out bool readToUserBuffer)
i = tmpCharLen - tmpCharPos;
sb ??= new StringBuilder(i + 80);
sb.Append(tmpCharBuffer, tmpCharPos, i);
} while (await ReadBufferAsync(CancellationToken.None).ConfigureAwait(false) > 0);
} while (await ReadBufferAsync(cancellationToken).ConfigureAwait(false) > 0);

return sb.ToString();
}

public override Task<string> ReadToEndAsync()
public override Task<string> ReadToEndAsync() =>
ReadToEndAsync(default);

public override Task<string> ReadToEndAsync(CancellationToken cancellationToken)
{
// If we have been inherited into a subclass, the following implementation could be incorrect
// since it does not call through to Read() which a subclass might have overridden.
// To be safe we will only use this implementation in cases where we know it is safe to do so,
// and delegate to our base class (which will call into Read) when we are not sure.
if (GetType() != typeof(StreamReader))
{
return base.ReadToEndAsync();
return base.ReadToEndAsync(cancellationToken);
}

ThrowIfDisposed();
CheckAsyncTaskInProgress();

Task<string> task = ReadToEndAsyncInternal();
Task<string> task = ReadToEndAsyncInternal(cancellationToken);
_asyncReadTask = task;

return task;
}

private async Task<string> ReadToEndAsyncInternal()
private async Task<string> ReadToEndAsyncInternal(CancellationToken cancellationToken)
{
// Call ReadBuffer, then pull data out of charBuffer.
StringBuilder sb = new StringBuilder(_charLen - _charPos);
Expand All @@ -955,7 +961,7 @@ private async Task<string> ReadToEndAsyncInternal()
int tmpCharPos = _charPos;
sb.Append(_charBuffer, tmpCharPos, _charLen - tmpCharPos);
_charPos = _charLen; // We consumed these characters
await ReadBufferAsync(CancellationToken.None).ConfigureAwait(false);
await ReadBufferAsync(cancellationToken).ConfigureAwait(false);
} while (_charLen > 0);

return sb.ToString();
Expand Down
10 changes: 10 additions & 0 deletions src/libraries/System.Private.CoreLib/src/System/IO/StringReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -224,11 +224,21 @@ public override string ReadToEnd()
return Task.FromResult(ReadLine());
}

public override ValueTask<string?> ReadLineAsync(CancellationToken cancellationToken) =>
cancellationToken.IsCancellationRequested
? ValueTask.FromCanceled<string?>(cancellationToken)
: new ValueTask<string?>(ReadLine());

public override Task<string> ReadToEndAsync()
{
return Task.FromResult(ReadToEnd());
}

public override Task<string> ReadToEndAsync(CancellationToken cancellationToken) =>
cancellationToken.IsCancellationRequested
? Task.FromCanceled<string>(cancellationToken)
: Task.FromResult(ReadToEnd());

public override Task<int> ReadBlockAsync(char[] buffer, int index, int count)
{
if (buffer == null)
Expand Down
22 changes: 18 additions & 4 deletions src/libraries/System.Private.CoreLib/src/System/IO/TextReader.cs
Original file line number Diff line number Diff line change
Expand Up @@ -203,18 +203,26 @@ public virtual int ReadBlock(Span<char> buffer)
}

#region Task based Async APIs
public virtual Task<string?> ReadLineAsync() =>
public virtual Task<string?> ReadLineAsync() => ReadLineCoreAsync(default);

public virtual ValueTask<string?> ReadLineAsync(CancellationToken cancellationToken) =>
new ValueTask<string?>(ReadLineCoreAsync(cancellationToken));

private Task<string?> ReadLineCoreAsync(CancellationToken cancellationToken) =>
Task<string?>.Factory.StartNew(static state => ((TextReader)state!).ReadLine(), this,
CancellationToken.None, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);
cancellationToken, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);

public virtual async Task<string> ReadToEndAsync()
public virtual Task<string> ReadToEndAsync() =>
ReadToEndAsync(default);

public virtual async Task<string> ReadToEndAsync(CancellationToken cancellationToken)
{
var sb = new StringBuilder(4096);
char[] chars = ArrayPool<char>.Shared.Rent(4096);
try
{
int len;
while ((len = await ReadAsyncInternal(chars, default).ConfigureAwait(false)) != 0)
while ((len = await ReadAsyncInternal(chars, cancellationToken).ConfigureAwait(false)) != 0)
{
sb.Append(chars, 0, len);
}
Expand Down Expand Up @@ -368,9 +376,15 @@ protected override void Dispose(bool disposing)
[MethodImpl(MethodImplOptions.Synchronized)]
public override Task<string?> ReadLineAsync() => Task.FromResult(ReadLine());

[MethodImpl(MethodImplOptions.Synchronized)]
public override ValueTask<string?> ReadLineAsync(CancellationToken cancellationToken) => cancellationToken.IsCancellationRequested ? ValueTask.FromCanceled<string?>(cancellationToken) : new ValueTask<string?>(ReadLine());

[MethodImpl(MethodImplOptions.Synchronized)]
public override Task<string> ReadToEndAsync() => Task.FromResult(ReadToEnd());

[MethodImpl(MethodImplOptions.Synchronized)]
public override Task<string> ReadToEndAsync(CancellationToken cancellationToken) => cancellationToken.IsCancellationRequested ? Task.FromCanceled<string>(cancellationToken) : Task.FromResult(ReadToEnd());

[MethodImpl(MethodImplOptions.Synchronized)]
public override Task<int> ReadBlockAsync(char[] buffer, int index, int count)
{
Expand Down
Loading

0 comments on commit 4f72069

Please sign in to comment.