This module is designed to simplify multi-threading in PowerShell. PowerShell jobs are a great way to work with background tasks, but lack the necessary throttling and resource-sharing mechanisms. There is also a performance advantage to using a RunspacePool over jobs, as detailed in this blog post.
Use the New-Thread
function to create a thread template:
$ScriptBlock = {
Param($ThreadId, $WorkQueue)
$item = ""
while ($WorkQueue.TryDequeue([ref]$item)) {
# do work here
Write-Output "$ThreadId -> $item"
}
}
$worker = New-Thread -ScriptBlock $ScriptBlock -Number $PsThreading.Utility.LogicalCpus
Then use the Invoke-ThreadPool
function to create and execute the threads, and wait (poll) for them to complete:
$workQueue = New-Object System.Collections.Concurrent.ConcurrentQueue[object]
1..10000 | % { $workQueue.Enqueue("item number $_") }
$params = @{
WorkQueue = $workQueue
}
Invoke-ThreadPool -Thread $worker -Parameters $params
There is a helper function Split-FileToStream
that creates n MemoryStreams from a file using the nearest delimiter. By default, it will split a file into the number of logical CPUs by finding the nearest new line.
$Path = '\path\to\large-file.csv'
$workQueue = New-Object System.Collections.Concurrent.ConcurrentQueue[object]
Split-FileToStream -Path $Path | % { $workQueue.Enqueue($_) }
The $PsThreading
variable contains two helper properties:
$PsThreading.Utility.CpuCores # Number of physical cores
$PsThreading.Utility.LogicalCpus # Number of logical CPUs
and some sample parameter sets and thread script blocks for various threading patterns:
$PsThreading.Parameter.WorkerOnly
$PsThreading.Parameter.ProducerConsumer
$PsThreading.Parameter.ProducerWorkerWriter
$PsThreading.Thread.Consumer
$PsThreading.Thread.Producer
$PsThreading.Thread.Worker
$PsThreading.Thread.Writer
Have a look at the PsThreading.Patterns.ps1 and PsThreading.Patterns.Tests.ps1 files for some example implementations.
Producer - Consumer example where the producer uses the Split-FileToStream function to create file chunks for the consumer threads to process.
$FileToProcess = '\path\to\large-file.csv'
$PsThreadingPath = '\path\to\PsThreading'
$threads = $PsThreading.Utility.LogicalCpus
$params = $PsThreading.Parameter.ProducerConsumer
$params.Settings['FileToProcess'] = $FileToProcess
$params.Settings['NumberToProduce'] = $threads
$params.ResultSet = New-Object 'System.Collections.Concurrent.ConcurrentDictionary`2[string,int]'
$producer = New-Thread -Type "Producer" -Weight 100 -ScriptBlock {
Param($ThreadId, $Settings, $WorkQueue)
$path = $Settings['FileToProcess']
$split = $Settings['NumberToProduce']
Split-FileToStream -Path $path -SplitNumber $split | % {
$WorkQueue.Enqueue($_)
}
$Settings['ProducerIsDone'] = $true
}
$consumer = New-Thread -Type "Consumer" -Number $threads -ScriptBlock {
Param($ThreadId, $WorkQueue, $Settings, $ResultSet)
$item = ""
while (!$Settings['ProducerIsDone'] -or $WorkQueue.Count -gt 0) {
if ($WorkQueue.TryDequeue([ref]$item)) {
$reader = New-Object System.IO.StreamReader($item.Stream)
$count = 0
while (($line = $reader.ReadLine()) -ne $null) {
$count++
}
$ResultSet.AddOrUpdate($ThreadId, $count, { param($key, $val) $val + $count }) | Out-Null
} else {
Start-Sleep -Milliseconds 10
}
}
}
Invoke-ThreadPool -Thread $producer, $consumer -Parameters $params -PathsToImport $PsThreadingPath
$params.ResultSet
This work is licensed under a Creative Commons Attribution-ShareAlike 4.0 International License.