Invoke-Async – Asynchronous parallel processing for any workload in PowerShell

Why asynchronous processing?

If it takes a man 10 hours to do something, technically, it would take just one hour if 10 men did it. Right? Not quite, but it is somewhat true in the analogous computing world if resources are hardly maxed out when 10 threads are at work. It is more true when the jobs happen to be waiting for another computer to respond with an answer. For example, if you wanted to find the free disk space on a 100 hosts, with “asynchronous processing”, you could be simultaneously querying 10 hosts or more at a time.

Traditional asynchronous processing in PowerShell:

Traditionally, if you wanted to process things in parallel using PowerShell, one would use the Start-Job, Stop-Job, Receive-Job, Remove-Job, Get-Job etc. (i.e., the *-Job set of commands)

Get-Command -Name '*-Job'

CommandType     Name
-----------     ----
Cmdlet          Get-Job
Cmdlet          Receive-Job
Cmdlet          Remove-Job
Cmdlet          Resume-Job
Cmdlet          Start-Job
Cmdlet          Stop-Job
Cmdlet          Suspend-Job
Cmdlet          Wait-Job

Additionally, you would have to come-up with your own mechanism to manage starting, processing, stopping and aggregating the results. There are numerous variations on the internet. I just did a quick search and came up with these

The last link uses Runspaces instead of *-Job functions. It is written by  Jason Walker a Microsoft employee whose presentation on asynchronous processing I had the pleasure of attending. Be sure to check out Part 1 of the article.

Why I use Invoke-Aysnc?

Although it is not the best way to parallelize (see performance comparison between different methods), I started using the below function from TechNet gallery and I love it

The primary reasons I like it are

  1. Can parallelize anything. It simply does not matter what operation it is.
  2. Allows one to specify the number of threads
  3. Aggregates all the results
  4. Allows specification of parameters other than the list one needs to iterate through
  5. It is ingeniously simple and elegant. I was able to easily make code changes
  6. The author (jrich) is very responsive to questions/comments

Once you download, it is easy to follow the examples and get going. I will only point out the things that are not obvious.

Each launched job gets its own new session

What you are asking it to run asynchronously will get its own PowerShell session. So, if you are doing this from the ISE, everything that you have dot-sourced or imported (modules) in the ISE will not be available in the asynchronous job unless you also dot-source or import explicitly in the script block you provide it to run. Sometimes, it may not be worthwhile to do this. Please evaluate before you start using it.

Changes required to the script

I made two changes to the script (changes marked with “Jana”)

  1. Bug fix to make it work when only 1 item is specified in the collection to process (the author might have already fixed this)
  2. Abort after [x] errors. I needed this additional parameter because, it did not make sense to process 285 items if the first 15 failed for example, as it is highly likely that there is some issue and all 300 will fail.
#Source: http://gallery.technet.microsoft.com/scriptcenter/Invoke-Async-Allows-you-to-83b0c9f0#content
<#
.Synopsis
   A means of running multiple instances of a cmdlet/function/scriptblock
.DESCRIPTION
   This function allows you to provide a cmdlet, function or script block with a set of data to allow multithreading.

.EXAMPLE
    $servers = Get-Content c:\_dblog\servers.txt
    $sb = [scriptblock] {param($HostName) Test-Connection -ComputerName $HostName} 
    $rtn = Invoke-Async -Set $servers -SetParam HostName  -ScriptBlock $sb -Verbose -Measure:$true
    $rtn

.EXAMPLE
   $sb = [scriptblock] {param($system) gwmi win32_operatingsystem -ComputerName $system | select csname,caption}
   $servers = Get-Content c:\_dblog\servers.txt
   $rtn = Invoke-Async -Set $servers -SetParam system  -ScriptBlock $sb

.EXAMPLE
   $servers = Get-Content servers.txt
   $rtn = Invoke-Async -Set $servers -SetParam computername -Params @{count=1} -Cmdlet Test-Connection -ThreadCount 50 

.EXAMPLE
    #This example shows how to pass extra parameters other than the collection to iterate thru
    $VerbosePreference = 'Continue'
    $conns = @('DevDB','QADB','ProdDB')

    $sb = [scriptblock] `
            {
                param($Connection, $BasePath, $SQL, $QueryTimeout, $As)  
                
                #Import all required functions that are needed for this scriptblock's functionality!
                . "$BasePath\SQLLib\Invoke-SQLCmd2.ps1"
                
                Invoke-SQLCmd2 -Connection $connection -CloseConnection:$true -SQL $SQL -QueryTimeout $QueryTimeout -As $As
            }


    #Build the variable need to splat the parameters (for the other parameters)
    $params = @{
                    BasePath = Get-PoShBasePath
                    SQL = "SELECT * FROM sys.databases"
                    QueryTimeout = 10
                    As = "DataRow"
                }   
            
    $rslt = Invoke-Async -Set $conns -SetParam Connection  -ScriptBlock $sb -Verbose -Measure:$false -Params $params -ThreadCount 4
    $rslt | ft

.INPUTS

.OUTPUTS
   Determined by the provided cmdlet, function or scriptblock.
.NOTES
    This can often times eat up a lot of memory due in part to how some cmdlets work. Test-Connection is a good example of this. 
    Although it is not a good idea to manually run the garbage collector it might be needed in some cases and can be run like so:
    [gc]::Collect()
#>

function Invoke-Async{
param(
#The data group to process, such as server names.
[parameter(Mandatory=$true,ValueFromPipeLine=$true)]
[object[]]$Set,
#The parameter name that the set belongs to, such as Computername.
[parameter(Mandatory=$true)]
[string] $SetParam,
#The Cmdlet for Function you'd like to process with.
[parameter(Mandatory=$true, ParameterSetName='cmdlet')]
[string]$Cmdlet,
#The ScriptBlock you'd like to process with
[parameter(Mandatory=$true, ParameterSetName='ScriptBlock')]
[scriptblock]$ScriptBlock,
#any aditional parameters to be forwarded to the cmdlet/function/scriptblock
[hashtable]$Params,
#number of jobs to spin up, default being 10.
[int]$ThreadCount=10,
#return performance data
[switch]$Measure,
#return abort threshold (if non-negative/non-zero, bails after this many errors have occured)
[int]$AbortAfterErrorCount=-1

)
Begin
{
    [int] $ErrorCounter = 0                                                                              #20141031 Jana - Added to track errors and bail
    [int] $AllowedErrorCount = if ($AbortAfterErrorCount -le 0) {9999999} else {$AbortAfterErrorCount}   #20141031 Jana - Added to track errors and bail
    $Threads = @()
    $Length = $JobsLeft = $Set.Length

    $Count = 0
    if($Length -lt $ThreadCount){$ThreadCount=$Length}
    $timer = @(1..$ThreadCount  | ForEach-Object{$null})
    $Jobs = @(1..$ThreadCount  | ForEach-Object{$null})
    
    If($PSCmdlet.ParameterSetName -eq 'cmdlet')
    {
        $CmdType = (Get-Command $Cmdlet).CommandType
        if($CmdType -eq 'Alias')
        {
            $CmdType = (Get-Command (Get-Command $Cmdlet).ResolvedCommandName).CommandType
        }
        
        If($CmdType -eq 'Function')
        {
            $ScriptBlock = (Get-Item Function:\$Cmdlet).ScriptBlock
            1..$ThreadCount | ForEach-Object{ $Threads += [powershell]::Create().AddScript($ScriptBlock)}
        }
        ElseIf($CmdType -eq "Cmdlet")
        {
            1..$ThreadCount  | ForEach-Object{ $Threads += [powershell]::Create().AddCommand($Cmdlet)}
        }
    }
    Else
    {
        1..$ThreadCount | ForEach-Object{ $Threads += [powershell]::Create().AddScript($ScriptBlock)}
    }

    If($Params){$Threads | ForEach-Object{$_.AddParameters($Params) | Out-Null}}

}
Process
{
    while($JobsLeft)
    {
        #20140929 Jana - Bug fix - Changed "-lt" to "-le" because it does not execute if if there is only 1 item in the set total to begin with!
        #for($idx = 0; $idx -lt ($ThreadCount-1) ; $idx++)
        for($idx = 0; $idx -le ($ThreadCount-1) ; $idx++)
        {

            $SetParamObj = $Threads[$idx].Commands.Commands[0].Parameters| Where-Object {$_.Name -eq $SetParam}

            #NOTE: Only hits this block after atleast one item has been kicked off..so during very first pass, skips this.
            If ($Jobs[$idx] -ne $null)
            { 
                If($Jobs[$idx].IsCompleted) #job ran ok, clear it out
                {  
                    $result = $null
                    if($threads[$idx].InvocationStateInfo.State -eq "Failed")
                    {
                        $result  = $Threads[$idx].InvocationStateInfo.Reason

                        
                        #Will write out the hashtable values in the error instead of "Set Item: System.Collections.Hashtable Exception: ...."    
                        $OutError = "Set Item: $($($SetParamObj.Value)| Out-String )"                        
                        Write-Error "$OutError Exception: $result"

                        #Write-Error "Set Item: $($SetParamObj.Value) Exception: $result"
                        
                        #This was the original code by the original author (always leave this commented)
                        #Write-Error "Set Item: $($SetParamObj) Exception: $result"

                        #20141031 Jana - Added to track errors and bail
                        $ErrorCounter++
                        if ($ErrorCounter -ge $AllowedErrorCount)
                        {
                            break;
                        }
                    }
                    else
                    { 
                        $result = $Threads[$idx].EndInvoke($Jobs[$idx])
                    }
                    $ts = (New-TimeSpan -Start $timer[$idx] -End (Get-Date))
                    if($Measure)
                    {
                        new-object psobject -Property @{
                            TimeSpan = $ts
                            Output = $result
                            SetItem = $SetParamObj.Value
                            }
                    }
                    else
                    {
                        $result
                    }
                    $Jobs[$idx] = $null
                    $JobsLeft-- #one less left

                    write-verbose "Completed: $($SetParamObj.Value) in $ts"
                    #write-verbose "Completed: $SetParamObj in $ts"
                    write-progress -Activity "Processing Set" -Status "$JobsLeft jobs left" -PercentComplete (($length-$jobsleft)/$length*100)
                }
            }

            If(($Count -lt $Length) -and ($Jobs[$idx] -eq $null)) #add job if there is more to process
            {
                write-verbose "starting: $($Set[$Count])"
                $timer[$idx] = get-date
                $Threads[$idx].Commands.Commands[0].Parameters.Remove($SetParamObj) | Out-Null #check for success?
                $Threads[$idx].AddParameter($SetParam,$Set[$Count]) | Out-Null
                $Jobs[$idx] = $Threads[$idx].BeginInvoke()
                $Count++
            }
            
        }

        #20141031 Jana - Added to track errors and bail        
        if ($ErrorCounter -ge $AllowedErrorCount)
        {
            break;
        }
    }
}
End
{
    $Threads | ForEach-Object{$_.runspace.close();$_.Dispose()}
}
}

Original source for above script:

http://gallery.technet.microsoft.com/scriptcenter/Invoke-Async-Allows-you-to-83b0c9f0#content

Again, I want to give the author “jrich” his due respect and appreciation for the good work.

Advertisements

6 thoughts on “Invoke-Async – Asynchronous parallel processing for any workload in PowerShell

  1. Great post, and it made me chuckle to see you made almost the exact same bugfix that I did locally to solve the single item problem!

  2. Thx for this amazing function …
    I tried using it but I have a huge difference in time of execution between this function and the loop I am using .. 39seconds against 13secons for mine to check 17 servers (it runs in 37seconds on 115 production servers).
    Here is the main “runspace loop” of what I did…

    $dataRowMain is the result of a MYSQL query returning $dataRowMain.system (servername) and $dataRowMain.type
    The scriptblock uses CIMinstances to interrogate each server.. I used the same scriptblock with the function

    I also tried the “original” function to around same result …

    I love the one function approach but as the performance difference is so big … I would like to understand..
    Any clues ?

    #############################################################################################################
    # adding main script ScriptBlock
    . $scriptPath\ScriptBlock.ps1
    #############################################################################################################
    $Throttle=30
    $RunspacePool = [RunspaceFactory]::CreateRunspacePool(1, $Throttle)
    $RunspacePool.Open()
    $Jobs = @()
    $JobCount=1;
    $ServerName=$dataRowMain.system
    $count=$ServerName.count

    ################################################################################################
    foreach ($S in $ServerName) {

    $index=[array]::indexof($dataRowMain.system,$S)
    if ($ServerName.count -eq 1) {$T=$dataRowMain.type }
    else { $T=$dataRowMain.type[$index] }
    $Job = [powershell]::Create().AddScript($ScriptBlock).AddArgument($S).AddArgument($T)
    $Job.RunspacePool = $RunspacePool
    $Jobs += New-Object PSObject -Property @{
    Pipe = $Job
    Result = $Job.BeginInvoke()
    }
    $JobOpen=$Jobs.count
    if ($Jobs.Result.IsCompleted.count -le 0) {$Jobs.Result.IsCompleted.count=1}
    #$Jobs.Result.IsCompleted.count
    Write-Progress -Activity “Creating Server List” -Status “Starting Threads” `
    -CurrentOperation “$JobCount threads created” `
    -PercentComplete (( $Jobs.Result.IsCompleted.count / $count) * 100)
    $JobCount++
    }
    write-output “Processing $count servers ..”
    ###################################################################################################
    $Results = @()
    $sw = [system.diagnostics.stopwatch]::startNew()
    Do {

    $JobsTotal=$Jobs.count
    if ($JobsTotal -eq 0) {$JobsTotal++; write-warning “Jobs count = 0” } #we avoid a div by zero Error
    #write-host “Jobs” $Jobs.Result.IsCompleted
    $i=0
    foreach ($r in ($Jobs.Result.IsCompleted | group)) {
    if ($r.name -eq $false) {$JobCount=$r[$i].count} else {$JobCount=1}
    $i++
    }
    Write-Progress -Activity “Waiting for threads to complete” `
    -Status “$JobCount threads remaining” `
    -CurrentOperation “….” `
    -PercentComplete (100- ($JobCount / $JobsTotal * 100))

    Start-Sleep -Seconds 1

    #write-host $Jobs.Result
    If ($sw.Elapsed.TotalSeconds -gt $maxRunTime) {break}
    } While ($Jobs | Where { $_.Result.Iscompleted -eq $false })

    write-output “All threads Started!”
    #$Results = @()
    ForEach ($Job in $Jobs)
    {

    $Results+=$Job.Pipe.EndInvoke($Job.Result)
    }

    1. Hi. If I understand you correctly, you tried Invoke-Async and it took 39 seconds and 18 seconds for your loop.

      As I said, Invoke-Async is not the fastest method of the available means to parallelize. However, before blaming it, I would record run-times and analyze where it is spending most of the time. In some cases, using foreground operations in a serial fashion could be faster than doing it in parallel in the background. You gain when you have hundreds of servers to process with a parallel mechanism not 10 or 20.

      The reason is that each individual parallel session is its own PowerShell instance (for lack of a better term). As you may you know, you can see them all in the “Task Manager”. Initializing that takes considerable resources and time, which may not be worth your while when dealing with a handful of servers. Doing it serially or with another mechanism may be faster in this case. However, when working with hundreds and hundreds of servers, the times by Invoke-Async should be nothing short of spectacular.

      I hope this answers your question.

      1. Hi thx for the reply and clarification.
        I may be wrong but I was looking at your code and I thought I was using the same processes aka runspaces which is why I didn’t understand the difference in speed.
        I do have hundreds of servers which is why I started looking into this 🙂

  3. I am sorry for the confusion. The code that I have in this post does not use Runspaces. I would still recommend instrumenting your code to see where the delay is. It is tricky to time (or) troubleshoot code that runs in the background. You would have to log to a database (or something similar). You might be surprised by what you find. I have found the delays where I least expected them.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s