14 min read

In this article by Alfonso Garcia Caro Nunez and Suhaib Fahad, author of the book Mastering F#, sheds light on how writing applications that are non-blocking or reacting to events is increasingly becoming important in this cloud world we live in. A modern application needs to carry out a rich user interaction, communicate with web services, react to notifications, and so on; the execution of reactive applications is controlled by events. Asynchronous programming is characterized by many simultaneously pending reactions to internal or external events. These reactions may or may not be processed in parallel.

(For more resources related to this topic, see here.)

In .NET, both C# and F# provide asynchronous programming experience through keywords and syntaxes. In this article, we will go through the asynchronous programming model in F#, with a bit of cross-referencing or comparison drawn with the C# world.

In this article, you will learn about asynchronous workflows in F#

Asynchronous workflows in F#

Asynchronous workflows are computation expressions that are setup to run asynchronously. It means that the system runs without blocking the current computation thread when a sleep, I/O, or other asynchronous process is performed.

You may be wondering why do we need asynchronous programming and why can’t we just use the threading concepts that we did for so long. The problem with threads is that the operation occupies the thread for the entire time that something happens or when a computation is done. On the other hand, asynchronous programming will enable a thread only when it is required, otherwise it will be normal code. There is also lot of marshalling and unmarshalling (junk) code that we will write around to overcome the issues that we face when directly dealing with threads. Thus, asynchronous model allows the code to execute efficiently whether we are downloading a page 50 or 100 times using a single thread or if we are doing some I/O operation over the network and there are a lot of incoming requests from the other endpoint.

There is a list of functions that the Async module in F# exposes to create or use these asynchronous workflows to program. The asynchronous pattern allows writing code that looks like it is written for a single-threaded program, but in the internals, it uses async blocks to execute. There are various triggering functions that provide a wide variety of ways to create the asynchronous workflow, which is either a background thread, a .NET framework task object, or running the computation in the current thread itself.

In this article, we will use the example of downloading the content of a webpage and modifying the data, which is as follows:

let downloadPage (url: string) =
    async {
        let req = HttpWebRequest.Create(url)
        use! resp = req.AsyncGetResponse()
        use respStream = resp.GetResponseStream()
        use sr = new StreamReader(respStream)
        return sr.ReadToEnd()
    }

downloadPage("https://www.google.com")
|> Async.RunSynchronously

The preceding function does the following:

  • The async expression, { … }, generates an object of type Async<string>
  • These values are not actual results; rather, they are specifications of tasks that need to run and return a string
  • Async.RunSynchronously takes this object and runs synchronously

We just wrote a simple function with asynchronous workflows with relative ease and reason about the code, which is much better than using code with Begin/End routines. One of the most important point here is that the code is never blocked during the execution of the asynchronous workflow. This means that we can, in principle, have thousands of outstanding web requests—the limit being the number supported by the machine, not the number of threads that host them.

Using let!

In asynchronous workflows, we will use let! binding to enable execution to continue on other computations or threads, while the computation is being performed. After the execution is complete, the rest of the asynchronous workflow is executed, thus simulating a sequential execution in an asynchronous way.

In addition to let!, we can also use use! to perform asynchronous bindings; basically, with use!, the object gets disposed when it loses the current scope.

In our previous example, we used use! to get the HttpWebResponse object. We can also do as follows:

let! resp = req.AsyncGetResponse()
// process response

We are using let! to start an operation and bind the result to a value, do!, which is used when the return of the async expression is a unit.

do! Async.Sleep(1000)

Understanding asynchronous workflows

As explained earlier, asynchronous workflows are nothing but computation expressions with asynchronous patterns. It basically implements the Bind/Return pattern to implement the inner workings. This means that the let! expression is translated into a call to async. The bind and async.Return function are defined in the Async module in the F# library. This is a compiler functionality to translate the let! expression into computation workflows and, you, as a developer, will never be required to understand this in detail. The purpose of explaining this piece is to understand the internal workings of an asynchronous workflow, which is nothing but a computation expression.

The following listing shows the translated version of the downloadPage function we defined earlier:

async.Delay(fun() ->
    let req = HttpWebRequest.Create(url)
    async.Bind(req.AsyncGetResponse(), fun resp ->
        async.Using(resp, fun resp ->
            let respStream = resp.GetResponseStream()
            async.Using(new StreamReader(respStream), fun sr " 
            ->
                reader.ReadToEnd()
                )
        )
    )
)

The following things are happening in the workflow:

  • The Delay function has a deferred lambda that executes later.
  • The body of the lambda creates an HttpWebRequest and is forwarded in a variable req to the next segment in the workflow.
  • The AsyncGetResponse function is called and a workflow is generated, where it knows how to execute the response and invoke when the operation is completed. This happens internally with the BeginGetResponse and EndGetResponse functions already present in the HttpWebRequest class; the AsyncGetResponse is just a wrapper extension present in the F# Async module.
  • The Using function then creates a closure to dispose the object with the IDisposable interface once the workflow is complete.

Async module

The Async module has a list of functions that allows writing or consuming asynchronous code. We will go through each function in detail with an example to understand it better.

Async.AsBeginEnd

It is very useful to expose the F# workflow functionality out of F#, say if we want to use and consume the API’s in C#. The Async.AsBeginEnd method gives the possibility of exposing the asynchronous workflows as a triple of methods—Begin/End/Cancel—following the .NET Asynchronous Programming Model (APM).

Based on our downloadPage function, we can define the Begin, End, Cancel functions as follows:

type Downloader() = 
    let beginMethod, endMethod, cancelMethod = "    Async.AsBeginEnd downloadPage
    member this.BeginDownload(url, callback, state : obj) = "    beginMethod(url, callback, state)
    member this.EndDownload(ar) = endMethod ar
    member this.CancelDownload(ar) = cancelMethod(ar)

Async.AwaitEvent

The Async.AwaitEvent method creates an asynchronous computation that waits for a single invocation of a .NET framework event by adding a handler to the event.

type MyEvent(v : string) = 
    inherit EventArgs()
    member this.Value = v;
 
let testAwaitEvent (evt : IEvent<MyEvent>) = async {
    printfn "Before waiting"
    let! r = Async.AwaitEvent evt
    printfn "After waiting: %O" r.Value    
    do! Async.Sleep(1000)
    return ()
    }

let runAwaitEventTest () = 
    let evt = new Event<Handler<MyEvent>, _>()     
    Async.Start <| testAwaitEvent evt.Publish    
    System.Threading.Thread.Sleep(3000)
    printfn "Before raising"
    evt.Trigger(null, new MyEvent("value"))
    printfn "After raising"

> runAwaitEventTest();;
> Before waiting
> Before raising
> After raising
> After waiting : value

The testAwaitEvent function listens to the event using Async.AwaitEvent and prints the value. As the Async.Start will take some time to start up the thread, we will simply call Thread.Sleep to wait on the main thread. This is for example purpose only. We can think of scenarios where a button-click event is awaited and used inside an async block.

Async.AwaitIAsyncResult

Creates a computation result and waits for the IAsyncResult to complete. IAsyncResult is the asynchronous programming model interface that allows us to write asynchronous programs. It returns true if IAsyncResult issues a signal within the given timeout. The timeout parameter is optional, and its default value is -1 of Timeout.Infinite.

let testAwaitIAsyncResult (url: string) =
    async {
        let req = HttpWebRequest.Create(url)
        let aResp = req.BeginGetResponse(null, null)
        let! asyncResp = Async.AwaitIAsyncResult(aResp, 1000)
        if asyncResp then
            let resp = req.EndGetResponse(aResp)
            use respStream = resp.GetResponseStream()
            use sr = new StreamReader(respStream)
            return sr.ReadToEnd()
        else return ""
    }
    > Async.RunSynchronously (testAwaitIAsyncResult "https://www.google.com")

We will modify the downloadPage example with AwaitIAsyncResult, which allows a bit more flexibility where we want to add timeouts as well. In the preceding example, the AwaitIAsyncResult handle will wait for 1000 milliseconds, and then it will execute the next steps.

Async.AwaitWaitHandle

Creates a computation that waits on a WaitHandle—wait handles are a mechanism to control the execution of threads. The following is an example with ManualResetEvent:

let testAwaitWaitHandle waitHandle = async {
    printfn "Before waiting"
    let! r = Async.AwaitWaitHandle waitHandle
    printfn "After waiting"
    }
 
let runTestAwaitWaitHandle () = 
    let event = new System.Threading.ManualResetEvent(false)
    Async.Start <| testAwaitWaitHandle event
    System.Threading.Thread.Sleep(3000)
    printfn "Before raising"
    event.Set() |> ignore
    printfn "After raising"

The preceding example uses ManualResetEvent to show how to use AwaitHandle, which is very similar to the event example that we saw in the previous topic.

Async.AwaitTask

Returns an asynchronous computation that waits for the given task to complete and returns its result. This helps in consuming C# APIs that exposes task based asynchronous operations.

let downloadPageAsTask (url: string) =
    async {
        let req = HttpWebRequest.Create(url)
        use! resp = req.AsyncGetResponse()
        use respStream = resp.GetResponseStream()
        use sr = new StreamReader(respStream)
        return sr.ReadToEnd()
    }
    |> Async.StartAsTask

let testAwaitTask (t: Task<string>) =
    async {
        let! r = Async.AwaitTask t
        return r
    }
> downloadPageAsTask "https://www.google.com"
  |> testAwaitTask
  |> Async.RunSynchronously;;

The preceding function is also downloading the web page as HTML content, but it starts the operation as a .NET task object.

Async.FromBeginEnd

The FromBeginEnd method acts as an adapter for the asynchronous workflow interface by wrapping the provided Begin/End method. Thus, it allows using large number of existing components that support an asynchronous mode of work. The IAsyncResult interface exposes the functions as Begin/End pattern for asynchronous programming. We will look at the same download page example using FromBeginEnd:

let downloadPageBeginEnd (url: string) =
    async {
        let req = HttpWebRequest.Create(url)
        use! resp = Async.FromBeginEnd(req.BeginGetResponse, req.EndGetResponse)
        use respStream = resp.GetResponseStream()
        use sr = new StreamReader(respStream)
        return sr.ReadToEnd()
    }

The function accepts two parameters and automatically identifies the return type; we will use BeginGetResponse and EndGetResponse as our functions to call. Internally, Async.FromBeginEnd delegates the asynchronous operation and gets back the handle once the EndGetResponse is called.

Async.FromContinuations

Creates an asynchronous computation that captures the current success, exception, and cancellation continuations. To understand these three operations, let’s create a sleep function similar to Async.Sleep using timer:

let sleep t = Async.FromContinuations(fun (cont, erFun, _) ->
    let rec timer = new Timer(TimerCallback(callback))
    and callback state = 
        timer.Dispose()
        cont(())
    timer.Change(t, Timeout.Infinite) |> ignore
    )

let testSleep = async {
    printfn "Before"
    do! sleep 5000
    printfn "After 5000 msecs"
    }
Async.RunSynchronously testSleep

The sleep function takes an integer and returns a unit; it uses Async.FromContinuations to allow the flow of the program to continue when a timer event is raised. It does so by calling the cont(()) function, which is a continuation to allow the next step in the asynchronous flow to execute. If there is any error, we can call erFun to throw the exception and it will be handled from the place we are calling this function.

Using the FromContinuation function helps us wrap and expose functionality as async, which can be used inside asynchronous workflows. It also helps to control the execution of the programming with cancelling or throwing errors using simple APIs.

Async.Start

Starts the asynchronous computation in the thread pool. It accepts an Async<unit> function to start the asynchronous computation. The downloadPage function can be started as follows:

let asyncDownloadPage(url) = async {
      let! result = downloadPage(url)
      printfn "%s" result"}
asyncDownloadPage "http://www.google.com" 
|> Async.Start

 

We wrap the function to another async function that returns an Async<unit> function so that it can be called by Async.Start.

Async.StartChild

Starts a child computation within an asynchronous workflow. This allows multiple asynchronous computations to be executed simultaneously, as follows:

let subTask v = async   {
    print "Task %d started" v
    Thread.Sleep (v * 1000)
    print "Task %d finished" v
    return v
    }
 
let mainTask = async    {
    print "Main task started"
 
    let! childTask1 = Async.StartChild (subTask 1)    
    let! childTask2 = Async.StartChild (subTask 5)    
    print "Subtasks started"
 
    let! child1Result = childTask1
    print "Subtask1 result: %d" child1Result
 
    let! child2Result = childTask2
    print "Subtask2 result: %d" child2Result
 
    print "Subtasks completed"
    return ()
    }
 
Async.RunSynchronously mainTask

Async.StartAsTask

Executes a computation in the thread pool and returns a task that will be completed in the corresponding state once the computation terminates. We can use the same example of starting the downloadPage function as a task.

let downloadPageAsTask (url: string) =
    async {
        let req = HttpWebRequest.Create(url)
        use! resp = req.AsyncGetResponse()
        use respStream = resp.GetResponseStream()
        use sr = new StreamReader(respStream)
        return sr.ReadToEnd()
    }
    |> Async.StartAsTask
let task = downloadPageAsTask("http://www.google.com")
prinfn "Do some work"
task.Wait()
printfn "done"

 

Async.StartChildAsTask

Creates an asynchronous computation from within an asynchronous computation, which starts the given computation as a task.

let testAwaitTask = async {
    print "Starting"
    let! child = Async.StartChildAsTask <| async {  // "    Async.StartChildAsTask shall be described later
            print "Child started"
            Thread.Sleep(5000)
            print "Child finished"
            return 100
    }
    print "Waiting for the child task"
    let! result = Async.AwaitTask child
    print "Child result %d" result
    }

Async.StartImmediate

Runs an asynchronous computation, starting immediately on the current operating system thread. This is very similar to the Async.Start function we saw earlier:

let asyncDownloadPage(url) = async {
      let! result = downloadPage(url)
      printfn "%s" result"}
asyncDownloadPage "http://www.google.com" 
|> Async.StartImmediate

Async.SwitchToNewThread

Creates an asynchronous computation that creates a new thread and runs its continuation in it:

let asyncDownloadPage(url) = async {
      do! Async.SwitchToNewThread()
      let! result = downloadPage(url)
      printfn "%s" result"}
asyncDownloadPage "http://www.google.com" 
|> Async.Start

 

Async.SwitchToThreadPool

Creates an asynchronous computation that queues a work item that runs its continuation, as follows:

let asyncDownloadPage(url) = async {
      do! Async.SwitchToNewThread()
      let! result = downloadPage(url)
      do! Async.SwitchToThreadPool()
      printfn "%s" result"}
asyncDownloadPage "http://www.google.com" 
|> Async.Start

 

Async.SwitchToContext

Creates an asynchronous computation that runs its continuation in the Post method of the synchronization context. Let’s assume that we set the text from the downloadPage function to a UI textbox, then we will do it as follows:

let syncContext = System.Threading.SynchronizationContext()
let asyncDownloadPage(url) = async {     
      do! Async.SwitchToContext(syncContext)
      let! result = downloadPage(url)
      textbox.Text <- result"}
asyncDownloadPage "http://www.google.com" 
|> Async.Start

 

Note that in the console applications, the context will be null.

Async.Parallel

The Parallel function allows you to execute individual asynchronous computations queued in the thread pool and uses the fork/join pattern.

let parallel_download() =
    let sites = ["http://www.bing.com";
                 "http://www.google.com";
                 "http://www.yahoo.com";
                 "http://www.search.com"]
 
    let htmlOfSites =
        Async.Parallel [for site in sites -> downloadPage site ]
        |> Async.RunSynchronously
    printfn "%A" htmlOfSites

 

We will use the same example of downloading HTML content in a parallel way. The preceding example shows the essence of parallel I/O computation

  • The async function, { … }, in the downloadPage function shows the asynchronous computation
  • These are then composed in parallel using the fork/join combinator
  • In this sample, the composition executed waits synchronously for overall result

Async.OnCancel

A cancellation interruption in the asynchronous computation when a cancellation occurs. It returns an asynchronous computation trigger before being disposed.

    // This is a simulated cancellable computation. It checks the "    token source 
    // to see whether the cancel signal was received. 
    let computation "    (tokenSource:System.Threading.CancellationTokenSource) =
        async {
            use! cancelHandler = Async.OnCancel(fun () -> printfn "            "Canceling operation.")
            // Async.Sleep checks for cancellation at the end of "            the sleep interval, 
            // so loop over many short sleep intervals instead of "            sleeping 
            // for a long time. 
            while true do 
                do! Async.Sleep(100)
        }

    let tokenSource1 = new "    System.Threading.CancellationTokenSource()
    let tokenSource2 = new "    System.Threading.CancellationTokenSource()

    Async.Start(computation tokenSource1, tokenSource1.Token)
    Async.Start(computation tokenSource2, tokenSource2.Token)
    printfn "Started computations."
    System.Threading.Thread.Sleep(1000)
    printfn "Sending cancellation signal."
    tokenSource1.Cancel()
    tokenSource2.Cancel()

The preceding example implements the Async.OnCancel method to catch or interrupt the process when CancellationTokenSource is cancelled.

Summary

In this article, we went through detail, explanations about different semantics in asynchronous programming with F#, used with asynchronous workflows. We saw a number of functions from the Async module.

Resources for Article:


Further resources on this subject:


LEAVE A REPLY

Please enter your comment!
Please enter your name here