Coroutines for Asynchronous Concurrent Programming
Concurrency is needed for two main reasons
- You want to run things in parallel for speed gain (multi-core, cluster, etc)
- You are waiting for a result from an I/O action and want to do other work while waiting
- Disk read/write, network request, remote API call, internal timer, etc
In OCaml
* Concurrency for speed gain is new, its in OCaml 5
* Concurrency to support asynchronous waiting via coroutines: TheLwtandAsynclibraries
- Disk read/write, network request, remote API call, internal timer, etc
Threads
- Concurrency for speed is usually done via threads
- fork off another computation with its own runtime stack etc but share the heap
- But, threads are notoriously difficult to debug due to the number of interleavings of control flows
- Can’t test all of the exponentially many ways parallel computations can interleave
- Many tools today are used to limit resource contention (channels, monitors, locks, ownership types, etc etc etc) but it’s still hard
Coroutines
- So if all you need is to wait for some I/O, its often better to use coroutines
- Key difference of a coroutine is no preemption - routine runs un-interrupted until it chooses to “yield”/”pause”.
- Means that computations are still deterministic, much easier to debug!
- Such an approach is called coroutines due to that term being used in some early PLs.
Coroutines in different languages
Coroutines are found in most modern PLs
- Python has the built-in asyncio library
- JavaScript has built-in
async/awaitsyntax - All other commonly-used languages have some third-party library
In OCaml there are currently two competing libraries
Async- a Jane Street library, very compatible withCorebut not widely used so fewer other libraries use it.Lwt- the standard library for coroutines in OCaml.- We will cover
Lwtsince most useful libraries are built overLwt:Cohttp,Dream, andOpiumfor example.
Principles of Coroutines
- Coroutines are only useful if there is some action started that will complete in the future, and you want to get some work done meanwhile.
- Scenario: You just need to read in a file and write a tranformed version to another file and that is it
- There isn’t really anything else you need to do while waiting for I/O, no need for coroutines.
- Scenario: You want you algorithm to do ten seconds of searching for a best answer, and return the best answer when the timer goes off
- The timer is an action that will complete in the future and you indeed want to get some work done meanwhile: use a coroutine!
A Larger Example of Coroutines: Photomontage App
- Suppose you want to read a bunch of images from different URLs on the Internet and make a collage of them
- You would like to resize them in the order they show up, no need to wait for all the images to come in
- Also if one load is slow don’t block all the subsequent loads
- Kick them all off at the start, then process as they come in
- Some loads could be from dead URLs so will need to time out on those
- There are some sequencing requirements as well
- Resize each image as it comes in (e.g. make 100x100)
- Once all images are in and resized or timed out, a collage is created.
Idea of the implementation
Q: How do we allow these loads to happen concurrently without fork/threads/parallelism?
A: Use coroutines to split I/O actions in two:
- Issue each image GET request
- Package up the resizing code (the continuation, the “rest of the work”) as a function which will run when each load completes
- The coroutine system will run the continuation function for each load when that load is done.
It might seem odd to package up the continuation as a function, but we already did that, with Monads!
Monad-think on the above:
let img_load url =
bind (* code to issue image request and pause *)
(fun img -> (* the continuation: processing code to run after this image loaded *) )
which is, in Core’s let%bind notation,
let img_load url =
let%bind img = (* code to issue image request and pause *) in
(* processing code to run after this image loaded*)
(Note, Lwt uses let%lwt or let* instead of let%bind; you can view them all as synonyms)
- Observe how
bindis naturally making the continuation a function - So we will be using
binda lot when writing coroutine code in OCaml - In general
Lwtis a monad - We won’t be looking at the underlying structure of the
Lwtmonad, but the typeLwt.tis keeping a list of active coroutines and swapping between them.
More details on phtomontage
- Suppose for simplicity there are only two images.
- We eventually need to wait for these loads to finish, here is how.
let p1 = img_load url1 in
let p2 = img_load url2 in
(* We immediately get to this line; the above just kicks off the requests *)
(* p1 and p2 are called "promises" for the actual values: "I promise I'll eventually be the value" *)
(* They are the underlying monadic ("wrapped") values, we will see that below *)
(* .. we can do any other processing here .. *)
(* When we finally need the results of the above we use bind *)
(* The bind will block if value not there yet: *)
let%lwt load1 = p1 in
let%lwt load2 = p2 in ...
(* ... we will get to this line once both loads are finished -- promises fulfulled
If we had other coroutines they can wake up and run while waiting for these loads
Note we could have instead used Lwt.choose to get the *first* one completed:
let%lwt a_load = Lwt.choose [ p1; p2 ]
For this application Lwt.choose is important to get the advantage of coroutines: block minimally
*)
- When
let%lwt load1 = ...is hit, recall this is abindand all the rest of the code is in fact a function:
fun load1 -> let%lwt load2 = p2 in ... - The monad behind the scenes has a data structure holding this function (aka continuation)
- Other coroutines if any can run while waiting, this code is just sitting
- It will call the continuation when the low-level URL load has completed
- in Lwt terminology, when the promise is fulfilled.
- this will then allow the second load to happen.
Running Lwt
- The above is some high level idea of the use of coroutines
- We will now fire up
Lwt, first in the top-loop - See The manual for all the details
To run Lwt from utop do
#require "lwt";;
#require "lwt.unix";; (* if you also want Lwt-ized I/O functions like file read/write etc *)
#require "lwt_ppx";; (* for the let%lwt syntax; need to `opam install lwt_ppx` first *)
And you might also want to do this to put the functions at the top level.
open Lwt;;
Promise basics
This example shows the Lwt version of read_line in action.
let%lwt str = Lwt_io.read_line Lwt_io.stdin in Lwt_io.printf "You typed %S\n" str;;
- This example looks just like the built-in
read_lineexcept for the%lwt; here is whyLwtversion is better:
let p = Lwt_io.read_line Lwt_io.stdin in
printf "See how read not blocking now\n"; Stdio.Out_channel.flush stdout;
let%lwt str = p in Lwt_io.printf "You typed %S\n" str;;
Lets expand the let%lwt to bind to make this more clear:
let p = Lwt_io.read_line Lwt_io.stdin in
printf "See how read not blocking now\n"; Stdio.Out_channel.flush stdout;
Lwt.bind p (fun str -> Lwt_io.printf "You typed %S\n" str);;
What is going on here?
- The first line immediately completes and returns a promise,
p, of typestring Lwt.t- “I promise I will eventually turn into a
pumpkinstring” - We can use
Lwt.stateto look at what the state the promisepis on the way to completionSleepmeans nothing has happened yet (no input)Return vmeans it has been fulfilled withvas the value (the input string in above case)Fail exnmeans it failed with exception conditionexn.- Both
ReturnandFailare resolved (finished) promises
- “I promise I will eventually turn into a
- The
let%lwtabove islet%bindbut forLwt- syntactic sugar forbindLwtis a monad where'a Lwt.tis a promise for a'avalue.- As in any monad,
let%lwt x = <a promise> in .. x normal here ..will take a promise back to normal-land - To do this, the
inof thelet%lwtwill need to block until that resolution.
Here is a top-loop example showing some of these promise states; code is a bit convoluted to be able to see results.
let s,p = let p0 = Lwt_io.read_line Lwt_io.stdin in (Lwt.state p0, p0);; (* state is Sleep - input not read yet*)
(* type something at utop and hit return now - not shown for some reason - this is the input *)
Lwt.state p;; (* returns `Return <the string you typed>` *)
(* Here we artificially make a failure state. It is an exception internal to `Lwt`,
it doesn't get `raise`d in OCaml except at top *)
let p' = Lwt.fail Exit in Lwt.state p';;
Making our own promises
We can make (and directly resolve) our own promises; this also shows what Lwt_io.read_line et al are doing under the hood
let p = return "done";; (* This is the return of Lwt monad - inject a regular value as a "fulfilled" promise *)
state p;; (* indeed it is already resolved to a `Return`. *)
let p, r = wait ();; (* `wait` returns a pair of a sleeped promise and a resolver for it *)
state p;; (* Sleep *)
wakeup_exn r Exit;; (* `wakeup_exn` uses r to resolve p to failure *)
state p;; (* p is now a `Fail Exit`. Note once resolved it can't change any more. *)
let p, r = wait ();; (* another one, lets resolve this one positively *)
wakeup r "hello";;
state p;; (* now a Return "hello" *)
Lwt in an executable
- So far we have only shown promises, not general coroutines
- For coroutines we need to call
Lwt_main.runwhich works best in a standalone executable - See lwteg.zip for a zipfile of the examples below (most from Lwt manual)
- Note you can also invoke
Lwt_main.runin the top loop but it could run forever
A simple Example
- Here is an example of promise resolution in an executable
- Note this one we can test in the top loop since it finishes (all promises are resolved)
Lwt_main.run
(let three_seconds : unit Lwt.t = Lwt_unix.sleep 3. in
let five_seconds : unit Lwt.t = Lwt_unix.sleep 5. in
let%lwt () = three_seconds in
let%lwt () = Lwt_io.printl "3 seconds passed" in
let%lwt () = five_seconds in
Lwt_io.printl "Only 2 more seconds passed")
What is Lwt_main.run doing exactly?
Lwt_main.run (return "hello")
- This runs the
'a Lwt.tcomputation supplied until all promises are resolved, and returns the final value if any. - Any main executable using
Lwtusually calls this at the top, and when all promises are resolved the app can usually terminate.- Apps such as servers will never terminate, they listen for requests until killed.
- Note its type:
'a t -> 'awhich is whatrunshould be in monad-land: get us out of the monad somehow- A common error is to try to call
Lwt_main.runon your own to get out of monad-land but that won’t work, it will destroy all the previous promises. - Moral: once in monad-land, always in monad-land when using
Lwt_main.runin an executable.
- A common error is to try to call
More operations on Promises
- One common operation is when you have launched a bunch of I/O requests to be able to respond when only one of them has come back
- The
Lwtcombinator for that ischoosewhich picks a resolved promise from a list of promises - This example also terminates so we can test it in the top loop.
let () =
let p_1 =
let%lwt () = Lwt_unix.sleep 3. in
Lwt_io.printl "Three seconds elapsed"
in
let p_2 =
let%lwt () = Lwt_unix.sleep 5. in
Lwt_io.printl "Five seconds elapsed"
in
let p_3 = Lwt.choose [ p_1; p_2 ] in (* Lwt.join will resolve when both finish *)
Lwt_main.run p_3
- If you use
joininstead ofchooseabove it will block until all are resolved.- They don’t return any value with
join, unlike withchoose
- They don’t return any value with
- You can also create promises which can be cancelled; use
taskinstead ofwaitto make those- Any continuations waiting on that promise (e.g. any
let%lwton it) are recursively cancelled - See the manual for how you can cancel promises created with
task.
- Any continuations waiting on that promise (e.g. any
Launching a new Coroutine
- The examples above had only one user routine running at a time, but multiple routines possible
Lwt.asynclaunches a new coroutine- The engine will then round-robin between all the active coroutines
- The coroutine will at some point need to do an Lwt operation so it can yield to others
- Here is an example which asynchronously keeps nagging user for input:
let () = let rec show_nag (n : int) : _ Lwt.t = if n = 0 then Lwt.return () else let%lwt () = Lwt_io.printl "Please enter a line" in let%lwt () = Lwt_unix.sleep 1. in show_nag (n-1) in Lwt.async (fun () -> show_nag (5)); Lwt_main.run begin let%lwt line = Lwt_io.(read_line stdin) in Lwt_io.printl line end - In this case the
Lwt.sleepwill yield, so the read line can happen if data appeared. - Note the
ncounter is used so this won’t nag forever.
Pausing
- Here is an example that shows how
Lwt.pauseis used in a compute-intensive task to let other coroutines run.
let () =
let rec handle_io (n) () =
if n = 0 then Lwt.return ()
else
let%lwt () = Lwt_io.printl ".. Imagine we are handling I/O here .." in
let%lwt () = Lwt_unix.sleep 0.1 in
handle_io (n-1) ()
in
let rec compute n =
if n = 0 then Lwt.return ()
else
let%lwt () = (* pause in the below will cause this bind to block and put continuation on queue *)
if n mod 1_000_000 = 0 then Lwt.pause () else Lwt.return ()
in
compute (n - 1)
in
Lwt.async @@ handle_io 50;
Lwt_main.run (compute 10_000_000)
Using Libraries that are built on Lwt
- Web libraries Cohttp, Dream, and Opium all use
Lwt. - For example for Dream when you say
Dream.runat the top of your main program it in turn will callLwt_main.run Lwtthen is running the whole time the server is up, and all of your own server code is running inside the Lwt monad.- The
simple_counter.mlexample in the zip gives a very high-level idea how a server like Dream works, but this example just works from stdio.- In this example the
handle_connectionfunction is repeatedly listening for input and invokinghandle_messageto handle messages. This is a super simplified idea of what the Dream library is doing - Your code in Dream is something like
handle_message, it is just OCaml code which is not directly aware ofLwtetc.
- In this example the
- The
counter_server.mlexample in the above zip is actually listening on a network port so is one step closer to a web server.- Again the “user” of the server just writes
handle_messagehere and the server invokes that on each input to get the output.
- Again the “user” of the server just writes
If you want to learn more about Lwt internals, see This Lwt tutorial