Skip to content

Commit

Permalink
More WriteGather fixes (dotnet#109826)
Browse files Browse the repository at this point in the history
* don't run these tests in parallel, as each test cases uses more than 4 GB ram and disk!

* fix the test: handle incomplete reads that should happen when we hit the max buffer limit

* incomplete write fix:

- pin the buffers only once
- when re-trying, do that only for the actual reminder

* Use native memory to get OOM a soon as we run out of memory (hoping to avoid the process getting killed on Linux when OOM happens)

* For macOS preadv and pwritev can fail with EINVAL when the total length of all vectors overflows a 32-bit integer.

* add an assert that is going to warn us if vector.Count is ever more than Int32.MaxValue

---------

Co-authored-by: Michał Petryka <35800402+MichalPetryka@users.noreply.github.com>
  • Loading branch information
adamsitnik and MichalPetryka authored Dec 12, 2024
1 parent 7f2f2b9 commit 4951e38
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 68 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -168,38 +168,30 @@ internal static unsafe void WriteGatherAtOffset(SafeFileHandle handle, IReadOnly

var handles = new MemoryHandle[buffersCount];
Span<Interop.Sys.IOVector> vectors = buffersCount <= IovStackThreshold ?
stackalloc Interop.Sys.IOVector[IovStackThreshold] :
stackalloc Interop.Sys.IOVector[IovStackThreshold].Slice(0, buffersCount) :
new Interop.Sys.IOVector[buffersCount];

try
{
int buffersOffset = 0, firstBufferOffset = 0;
while (true)
long totalBytesToWrite = 0;
for (int i = 0; i < buffersCount; i++)
{
long totalBytesToWrite = 0;

for (int i = buffersOffset; i < buffersCount; i++)
{
ReadOnlyMemory<byte> buffer = buffers[i];
totalBytesToWrite += buffer.Length;

MemoryHandle memoryHandle = buffer.Pin();
vectors[i] = new Interop.Sys.IOVector { Base = firstBufferOffset + (byte*)memoryHandle.Pointer, Count = (UIntPtr)(buffer.Length - firstBufferOffset) };
handles[i] = memoryHandle;

firstBufferOffset = 0;
}
ReadOnlyMemory<byte> buffer = buffers[i];
totalBytesToWrite += buffer.Length;

if (totalBytesToWrite == 0)
{
break;
}
MemoryHandle memoryHandle = buffer.Pin();
vectors[i] = new Interop.Sys.IOVector { Base = (byte*)memoryHandle.Pointer, Count = (UIntPtr)buffer.Length };
handles[i] = memoryHandle;
}

int buffersOffset = 0;
while (totalBytesToWrite > 0)
{
long bytesWritten;
Span<Interop.Sys.IOVector> left = vectors.Slice(buffersOffset);
fixed (Interop.Sys.IOVector* pinnedVectors = &MemoryMarshal.GetReference(left))
{
bytesWritten = Interop.Sys.PWriteV(handle, pinnedVectors, buffersCount - buffersOffset, fileOffset);
bytesWritten = Interop.Sys.PWriteV(handle, pinnedVectors, left.Length, fileOffset);
}

FileStreamHelpers.CheckFileCall(bytesWritten, handle.Path);
Expand All @@ -211,22 +203,29 @@ internal static unsafe void WriteGatherAtOffset(SafeFileHandle handle, IReadOnly
// The write completed successfully but for fewer bytes than requested.
// We need to perform next write where the previous one has finished.
fileOffset += bytesWritten;
totalBytesToWrite -= bytesWritten;
// We need to try again for the remainder.
for (int i = 0; i < buffersCount; i++)
while (buffersOffset < buffersCount && bytesWritten > 0)
{
int n = buffers[i].Length;
int n = (int)vectors[buffersOffset].Count;
if (n <= bytesWritten)
{
buffersOffset++;
bytesWritten -= n;
if (bytesWritten == 0)
{
break;
}
buffersOffset++;
}
else
{
firstBufferOffset = (int)(bytesWritten - n);
// A partial read: the vector needs to point to the new offset.
// But that offset needs to be relative to the previous attempt.
// Example: we have a single buffer with 30 bytes and the first read returned 10.
// The next read should try to read the remaining 20 bytes, but in case it also reads just 10,
// the third attempt should read last 10 bytes (not 20 again).
Interop.Sys.IOVector current = vectors[buffersOffset];
vectors[buffersOffset] = new Interop.Sys.IOVector
{
Base = current.Base + (int)(bytesWritten),
Count = current.Count - (UIntPtr)(bytesWritten)
};
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

using System.Buffers;
using System.Collections.Generic;
using System.Linq;
using System.Security.Cryptography;
Expand All @@ -13,6 +14,7 @@
namespace System.IO.Tests
{
[SkipOnPlatform(TestPlatforms.Browser, "async file IO is not supported on browser")]
[Collection(nameof(DisableParallelization))] // don't run in parallel, as some of these tests use a LOT of resources
public class RandomAccess_WriteGatherAsync : RandomAccess_Base<ValueTask>
{
protected override ValueTask MethodUnderTest(SafeFileHandle handle, byte[] bytes, long fileOffset)
Expand Down Expand Up @@ -151,21 +153,6 @@ public async Task NoInt32OverflowForLargeInputs(bool asyncFile, bool asyncMethod
const int BufferSize = int.MaxValue / 1000;
const long FileSize = (long)BufferCount * BufferSize;
string filePath = GetTestFilePath();
ReadOnlyMemory<byte> writeBuffer = RandomNumberGenerator.GetBytes(BufferSize);
List<ReadOnlyMemory<byte>> writeBuffers = Enumerable.Repeat(writeBuffer, BufferCount).ToList();
List<Memory<byte>> readBuffers = new List<Memory<byte>>(BufferCount);

try
{
for (int i = 0; i < BufferCount; i++)
{
readBuffers.Add(new byte[BufferSize]);
}
}
catch (OutOfMemoryException)
{
throw new SkipTestException("Not enough memory.");
}

FileOptions options = asyncFile ? FileOptions.Asynchronous : FileOptions.None; // we need to test both code paths
options |= FileOptions.DeleteOnClose;
Expand All @@ -180,29 +167,86 @@ public async Task NoInt32OverflowForLargeInputs(bool asyncFile, bool asyncMethod
throw new SkipTestException("Not enough disk space.");
}

long fileOffset = 0, bytesRead = 0;
try
using (sfh)
{
if (asyncMethod)
ReadOnlyMemory<byte> writeBuffer = RandomNumberGenerator.GetBytes(BufferSize);
List<ReadOnlyMemory<byte>> writeBuffers = Enumerable.Repeat(writeBuffer, BufferCount).ToList();

List<NativeMemoryManager> memoryManagers = new List<NativeMemoryManager>(BufferCount);
List<Memory<byte>> readBuffers = new List<Memory<byte>>(BufferCount);

try
{
await RandomAccess.WriteAsync(sfh, writeBuffers, fileOffset);
bytesRead = await RandomAccess.ReadAsync(sfh, readBuffers, fileOffset);
try
{
for (int i = 0; i < BufferCount; i++)
{
// We are using native memory here to get OOM as soon as possible.
NativeMemoryManager nativeMemoryManager = new(BufferSize);
memoryManagers.Add(nativeMemoryManager);
readBuffers.Add(nativeMemoryManager.Memory);
}
}
catch (OutOfMemoryException)
{
throw new SkipTestException("Not enough memory.");
}

await Verify(asyncMethod, FileSize, sfh, writeBuffer, writeBuffers, readBuffers);
}
else
finally
{
RandomAccess.Write(sfh, writeBuffers, fileOffset);
bytesRead = RandomAccess.Read(sfh, readBuffers, fileOffset);
foreach (IDisposable memoryManager in memoryManagers)
{
memoryManager.Dispose();
}
}
}
finally
{
sfh.Dispose(); // delete the file ASAP to avoid running out of resources in CI
}

Assert.Equal(FileSize, bytesRead);
for (int i = 0; i < BufferCount; i++)
static async Task Verify(bool asyncMethod, long FileSize, SafeFileHandle sfh, ReadOnlyMemory<byte> writeBuffer, List<ReadOnlyMemory<byte>> writeBuffers, List<Memory<byte>> readBuffers)
{
Assert.Equal(writeBuffer, readBuffers[i]);
if (asyncMethod)
{
await RandomAccess.WriteAsync(sfh, writeBuffers, 0);
}
else
{
RandomAccess.Write(sfh, writeBuffers, 0);
}

Assert.Equal(FileSize, RandomAccess.GetLength(sfh));

long fileOffset = 0;
while (fileOffset < FileSize)
{
long bytesRead = asyncMethod
? await RandomAccess.ReadAsync(sfh, readBuffers, fileOffset)
: RandomAccess.Read(sfh, readBuffers, fileOffset);

Assert.InRange(bytesRead, 0, FileSize);

while (bytesRead > 0)
{
Memory<byte> readBuffer = readBuffers[0];
if (bytesRead >= readBuffer.Length)
{
AssertExtensions.SequenceEqual(writeBuffer.Span, readBuffer.Span);

bytesRead -= readBuffer.Length;
fileOffset += readBuffer.Length;

readBuffers.RemoveAt(0);
}
else
{
// A read has finished somewhere in the middle of one of the read buffers.
// Example: buffer had 30 bytes and only 10 were read.
// We don't read the missing part, but try to read the whole buffer again.
// It's not optimal from performance perspective, but it keeps the test logic simple.
break;
}
}
}
}
}

Expand Down
47 changes: 38 additions & 9 deletions src/native/libs/System.Native/pal_io.c
Original file line number Diff line number Diff line change
Expand Up @@ -1950,19 +1950,48 @@ int32_t SystemNative_PWrite(intptr_t fd, void* buffer, int32_t bufferSize, int64
}

#if (HAVE_PREADV || HAVE_PWRITEV) && !defined(TARGET_WASM)
static int GetAllowedVectorCount(int32_t vectorCount)
static int GetAllowedVectorCount(IOVector* vectors, int32_t vectorCount)
{
#if defined(IOV_MAX)
const int IovMax = IOV_MAX;
#else
// In theory all the platforms that we support define IOV_MAX,
// but we want to be extra safe and provde a fallback
// in case it turns out to not be true.
// 16 is low, but supported on every platform.
const int IovMax = 16;
#endif

int allowedCount = (int)vectorCount;

#if defined(IOV_MAX)
if (IOV_MAX < allowedCount)
// We need to respect the limit of items that can be passed in iov.
// In case of writes, the managed code is responsible for handling incomplete writes.
// In case of reads, we simply returns the number of bytes read and it's up to the users.
if (IovMax < allowedCount)
{
// We need to respect the limit of items that can be passed in iov.
// In case of writes, the managed code is reponsible for handling incomplete writes.
// In case of reads, we simply returns the number of bytes read and it's up to the users.
allowedCount = IOV_MAX;
allowedCount = IovMax;
}

#if defined(TARGET_APPLE)
// For macOS preadv and pwritev can fail with EINVAL when the total length
// of all vectors overflows a 32-bit integer.
size_t totalLength = 0;
for (int i = 0; i < allowedCount; i++)
{
assert(INT_MAX >= vectors[i].Count);

totalLength += vectors[i].Count;

if (totalLength > INT_MAX)
{
allowedCount = i;
break;
}
}
#else
(void)vectors;
#endif

return allowedCount;
}
#endif // (HAVE_PREADV || HAVE_PWRITEV) && !defined(TARGET_WASM)
Expand All @@ -1975,7 +2004,7 @@ int64_t SystemNative_PReadV(intptr_t fd, IOVector* vectors, int32_t vectorCount,
int64_t count = 0;
int fileDescriptor = ToFileDescriptor(fd);
#if HAVE_PREADV && !defined(TARGET_WASM) // preadv is buggy on WASM
int allowedVectorCount = GetAllowedVectorCount(vectorCount);
int allowedVectorCount = GetAllowedVectorCount(vectors, vectorCount);
while ((count = preadv(fileDescriptor, (struct iovec*)vectors, allowedVectorCount, (off_t)fileOffset)) < 0 && errno == EINTR);
#else
int64_t current;
Expand Down Expand Up @@ -2016,7 +2045,7 @@ int64_t SystemNative_PWriteV(intptr_t fd, IOVector* vectors, int32_t vectorCount
int64_t count = 0;
int fileDescriptor = ToFileDescriptor(fd);
#if HAVE_PWRITEV && !defined(TARGET_WASM) // pwritev is buggy on WASM
int allowedVectorCount = GetAllowedVectorCount(vectorCount);
int allowedVectorCount = GetAllowedVectorCount(vectors, vectorCount);
while ((count = pwritev(fileDescriptor, (struct iovec*)vectors, allowedVectorCount, (off_t)fileOffset)) < 0 && errno == EINTR);
#else
int64_t current;
Expand Down

0 comments on commit 4951e38

Please sign in to comment.