Skip to content

Commit

Permalink
Added new MaskedMatrixProcessor to compute batched jobs concurrently.
Browse files Browse the repository at this point in the history
  • Loading branch information
m4rs-mt committed Apr 12, 2023
1 parent 072d5c0 commit f944ca3
Showing 1 changed file with 60 additions and 210 deletions.
270 changes: 60 additions & 210 deletions Src/ILGPU.Algorithms/MatrixOperations/MaskedMatrixProcessor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -12,251 +12,101 @@
using ILGPU.Runtime;
using ILGPU.Util;
using System;
using System.Collections.Generic;

namespace ILGPU.Algorithms.MatrixOperations
{
/// <summary>
/// A processor for masked matrices.
/// A processor for masked matrices to efficiently operate on multiple matrix
/// instances in parallel to maximize occupancy.
/// </summary>
public class MaskedMatrixProcessor : DisposeBase
public class MaskedMatrixProcessor<T, TPredicate, TStride, TProcessor>
: ConcurrentStreamProcessor
where T : unmanaged
where TStride : struct, IStride2D
where TPredicate : struct, InlineList.IPredicate<Index2D>
where TProcessor : struct, IMaskedSparseMatrixProcessor<T>
{
#region Kernels
#region Instance

/// <summary>
/// Computes the dot product between a masked matrix and a sparse matrix.
/// The internal masked matrix multiplier which contains pre-compiled kernels.
/// </summary>
protected internal static void DotProductKernel(
Index1D index,
ArrayView1D<int, Stride1D.Dense> columnIndices,
ArrayView2D<float, Stride2D.DenseY> rows,
ArrayView2D<float, Stride2D.DenseY> cols,
ArrayView1D<float, Stride1D.Dense> dotSums,
SpecializedValue<int> rowLength)
{
int row = index;
int col = columnIndices[row];

float sum = 0.0f;
int i = 0;

// Do fast vectorized IO operations for multiples of 4
if (rowLength / 4 > 0)
{
var v4Rows = rows.Cast<float, Float4>();
var v4Cols = cols.Cast<float, Float4>();
for (int e = rowLength / 4; i < e; ++i)
{
var intermediate = v4Rows[row, i] * v4Cols[row, i];
sum += intermediate.X + intermediate.Y + intermediate.Z +
intermediate.W;
}
i *= 4;
}

// Do fast vectorized IO operations for multiples of 2
if (rowLength / 2 > 0)
{
var v2Rows = rows.Cast<float, Float2>();
var v2Cols = cols.Cast<float, Float2>();
for (int e = rowLength / 2; i < e; ++i)
{
var intermediate = v2Rows[row, i] * v2Cols[row, i];
sum += intermediate.X + intermediate.Y;
}
i *= 2;
}

// Do simple accesses for the rest
for (; i < rowLength; ++i)
{
var intermediate = rows[row, i] * cols[col, i];
sum += intermediate;
}

// Write the
dotSums[index] = sum;
}

#endregion

#region Instance

private readonly MemoryBuffer1D<int, Stride1D.Dense> columnIndices;

private readonly MemoryBuffer2D<float, Stride2D.DenseY> rows;
private readonly MemoryBuffer2D<float, Stride2D.DenseY> columns;
private readonly MemoryBuffer1D<float, Stride1D.Dense> dotSums;

private PageLockedArray1D<float> dotSumsCPU;

private readonly Action<
AcceleratorStream,
Index1D,
ArrayView1D<int, Stride1D.Dense>,
ArrayView2D<float, Stride2D.DenseY>,
ArrayView2D<float, Stride2D.DenseY>,
ArrayView1D<float, Stride1D.Dense>,
SpecializedValue<int>> dotProductKernel;
private readonly MaskedSparseMatrixMultiplier<T, TPredicate, TStride>
matrixMultiplier;

/// <summary>
/// Constructs a new masked processor.
/// </summary>
/// <param name="accelerator">The parent accelerator.</param>
/// <param name="stream">The current stream.</param>
/// <param name="pa">The condensed product row instance.</param>
/// <param name="bShape">The shape of sparse B vector.</param>
/// <param name="maxNumConcurrentStreams">
/// The maximum number of concurrent streams to use (if any).
/// </param>
/// <param name="streamProvider">
/// A custom stream provider function to construct specialized streams.
/// </param>
public MaskedMatrixProcessor(
Accelerator accelerator,
AcceleratorStream stream,
CondensedProductRows pa,
SparseMatrixShape bShape)
int maxNumConcurrentStreams = 0,
Func<Accelerator, AcceleratorStream> streamProvider = null)
: base(accelerator, maxNumConcurrentStreams, streamProvider)
{
int numRows = pa.NumRows;
int maxNumColumns = pa.NumColumnIndices;
int numBRows = bShape.NumRows;

RowLength = maxNumColumns;

columnIndices = accelerator.Allocate1D<int>(pa.NumColumnIndices);
rows = accelerator.Allocate2DDenseY<float>(
new LongIndex2D(numRows, maxNumColumns));
columns = accelerator.Allocate2DDenseY<float>(
new LongIndex2D(numBRows, maxNumColumns));
dotSums = accelerator.Allocate1D<float>(numRows);

dotProductKernel = accelerator.LoadAutoGroupedKernel<
Index1D,
ArrayView1D<int, Stride1D.Dense>,
ArrayView2D<float, Stride2D.DenseY>,
ArrayView2D<float, Stride2D.DenseY>,
ArrayView1D<float, Stride1D.Dense>,
SpecializedValue<int>>(DotProductKernel);

TransferPA(stream, pa);
matrixMultiplier = accelerator.CreateSparseMatrixMultiplierMasked<
T,
TPredicate,
TStride,
TProcessor>();
}

#endregion

/// <summary>
/// Returns the current accelerator.
/// Returns the current predicate to use (if any).
/// </summary>
public Accelerator Accelerator => columnIndices.Accelerator;

/// <summary>
/// Returns the maximum row length.
/// </summary>
public int RowLength { get; }
public TPredicate? Predicate { get; set; }

#region Methods

/// <summary>
/// Transfers a new masked product row instance.
/// </summary>
/// <param name="stream">The current stream.</param>
/// <param name="pa">
/// The condensed product row instance to be transferred to the GPU.
/// </param>
public void TransferPA(AcceleratorStream stream, CondensedProductRows pa)
{
columnIndices.View.CopyFromCPU(stream, pa.GetColumnIndices());
rows.View.CopyFromCPU(pa.GetData());
}

/// <summary>
/// Multiplies a transferred masked matrix pA with the given sparse matrix b.
/// </summary>
/// <param name="stream">The current stream.</param>
/// <param name="b">The sparse matrix b to multiply with.</param>
/// <returns>A GPU view to the resulting dot products.</returns>
public ArrayView<float> Multiply(AcceleratorStream stream, SparseMatrix b)
{
columns.View.CopyFromCPU(stream, b.GetEdgeWeights());
dotSums.View.MemSetToZero(stream);

dotProductKernel(
stream,
dotSums.Extent.ToIntIndex(),
columnIndices.View,
rows.View,
columns.View,
dotSums.View,
SpecializedValue.New(RowLength));

return dotSums.View;
}

/// <summary>
/// Multiplies a transferred masked matrix pA with the given sparse matrix b
/// while transferring the results back into CPU space.
/// </summary>
/// <param name="stream">The current stream.</param>
/// <param name="b">The sparse matrix b to multiply with.</param>
/// <returns>A CPU view to the resulting dot products.</returns>
public ArrayView<float> MultiplyToCPU(AcceleratorStream stream, SparseMatrix b)
{
if (dotSumsCPU == null)
dotSumsCPU = Accelerator.AllocatePageLocked1D<float>(this.dotSums.Extent);
var dotSums = Multiply(stream, b);

dotSums.CopyToPageLockedAsync(stream, dotSumsCPU);
stream.Synchronize();

return dotSumsCPU.ArrayView;
}


/// <summary>
/// Multiplies a transferred masked matrix pA with the given sparse matrix b
/// while transferring the results back into CPU space and expanding them to a
/// dense matrix.
/// Multiplies the given matrices using the currently assigned predicate.
/// </summary>
/// <param name="stream">The current stream.</param>
/// <param name="b">The sparse matrix b to multiply with.</param>
/// <param name="pa">
/// A condensed product row instance to be used for index reconstruction.
/// </param>
/// <param name="denseOutput">
/// The dense output array to store the dense matrix values. Note that it this
/// array is assumed to be already cleared, as only non-zero elements will be
/// written to the output array.
/// </param>
public void MultiplyToCPU(
/// <param name="stream">The current accelerator stream to use.</param>
/// <param name="aView">The dense input matrix a of shape MxK.</param>
/// <param name="bView">The sparse matrix b of shape NxK.</param>
/// <param name="outView">A dense output matrix of shape of aView.</param>
public void Multiply(
AcceleratorStream stream,
SparseMatrix b,
CondensedProductRows pa,
float[,] denseOutput)
ArrayView2D<T, TStride> aView,
SparseMatrixView<T, TStride> bView,
ArrayView2D<T, TStride> outView)
{
var cpuData = MultiplyToCPU(stream, b);

// Copy result to empty dense output matrix
for (int i = 0, e = pa.NumRows; i < e; ++i)
{
int r = pa.GetRowIndex(i);
int c = pa.GetColumnIndex(i);
denseOutput[r, c] = cpuData[i];
}
if (!Predicate.HasValue)
throw new InvalidOperationException();
matrixMultiplier(stream, Predicate.Value, aView, bView, outView);
}

#endregion

#region IDisposable

/// <summary>
/// Frees all internal buffers.
/// Multiplies the given matrices using the currently assigned predicate.
/// </summary>
protected override void Dispose(bool disposing)
/// <param name="stream">The current accelerator stream to use.</param>
/// <param name="aViews">The dense input matrices a of shape MxK.</param>
/// <param name="bViews">The sparse matrices b of shape NxK.</param>
/// <param name="outViews">Dense output matrices of shape of aViews.</param>
public void MultiplyBatched(
AcceleratorStream stream,
IReadOnlyList<ArrayView2D<T, TStride>> aViews,
IReadOnlyList<SparseMatrixView<T, TStride>> bViews,
IReadOnlyList<ArrayView2D<T, TStride>> outViews)
{
if (disposing)
{
dotSumsCPU?.Dispose();

columnIndices.Dispose();
rows.Dispose();
columns.Dispose();
dotSums.Dispose();
}
base.Dispose(disposing);
if (aViews.Count != bViews.Count)
throw new ArgumentOutOfRangeException(nameof(bViews));
if (aViews.Count != outViews.Count)
throw new ArgumentOutOfRangeException(nameof(outViews));

ProcessConcurrently(stream, aViews.Count, (acceleratorStream, i) =>
Multiply(acceleratorStream, aViews[i], bViews[i], outViews[i]));
}

#endregion
}
}

0 comments on commit f944ca3

Please sign in to comment.