Controlling flow with Scala Futures

If you're unfamiliar with Futures in Scala 2.10, check out First steps with `futures` in Scala. Sri

Composing units of code that execute asynchronously | concurrently | in-parallel is now trivial and elegant.

Real-world systems need to enforce some form of control-flow. Asynchronous control flow.

This can get tricky. Especially if you must avoid spaghetti nested code.

Thoughtfully Scala introduces a few constructs to avoid the mess.


Futures may fail. They may throw exceptions. Thus far we were trapping onFailure.

In the example below, we recover from an exception and return a string value.

    // Sometimes we need recover from an exception with a value
    // lets `recover` from an exception with a fallback val 'Infinity'

    val tryDivideByZeroAgain = future {
      Thread.sleep(1000)
      1 / 0
    } recover {
      case e: ArithmeticException => "Infinity"
    }

    tryDivideByZeroAgain onSuccess {
      case e => Console.println(e)
    }

    tryDivideByZeroAgain onFailure {
      case e => Console.println(e)
    }

    Console.println("Try dividing by zero, recover from exception..")

    Thread.sleep(2000)

    // output
    // Try dividing by zero, recover from exception..
    // Infinity
  

Moving on, a similar example but we don't want to recover directly with an expression / value.

Instead we want a failing future to fallbackTo another asynchronous future.

    // Or maybe future f1 must fallback to future f2?

    val f1 = future {
      Thread.sleep(500)
      1 / 0
    }

    val f2 = future {
      Thread.sleep(500)
      "Infinity"
    }

    f1 fallbackTo f2 onSuccess {
      case v => Console.println(v)
    }

    Console.println("Try dividing by zero, fallback to another future..")

    Thread.sleep(2000)

    // output
    // Try dividing by zero, fallback to another future..
    // Infinity
  

With andThen we can execute a series of futures and explicitly define the execution order.

    // the for {} construct lets us execute multiple futures in parallel
    // to serially execute futures in specific orders, we use `andThen`
    // andThen ensures execution orders in what would otherwise be random

    val whamBamThankYouMaam = future {
      Thread.sleep(500)
      Console.println("Wham!")
    } andThen {
      case _ => {
        Thread.sleep(500)
        Console.println("Bam!")
      }
    } andThen {
      case _ => {
        Thread.sleep(500)
        Console.println("Thank you ma'am!")
      }
    }

    Console.println("Will you score?")

    Thread.sleep(2000)

    // output
    //
    // Will you score?
    // Wham!
    // Bam!
    // Thank you ma'am!
  

Everything about Scala is type safe but what we have seen of futures so far isn't.

promise is a type-safety way to express a future of type T.

    // `promises` can be used to compose type safe futures

    val willYouMarryMe = promise[Boolean]

    willYouMarryMe.future onSuccess {
      case yes => Console.println("Yes! :D")
    }

    willYouMarryMe.future onFailure {
      case no => Console.println("No :(")
    }

    future {
      Thread.sleep(1000)
      if (new Random().nextBoolean())
        willYouMarryMe success true // try passing non boolean value here
      else
        willYouMarryMe failure new Exception
    }

    Console.println("Will you marry me?")

    Thread.sleep(2000)

    // output
    //
    // Will you marry me?
    // Yes! :D | No :(
  

promise.future can fulfilled only once. But you can have multiple asynchronous blocks that each tryComplete a promise.

The first one that calls tryComplete is taken forward and the rest aren't.

    // Two asynchronous blocks `tryComplete` a promise

    val whoWonTheRace = promise[String]

    whoWonTheRace.future onSuccess {
      case name => Console.println(name + " wins")
    }

    future {
      Thread.sleep(new Random().nextInt(500))
      whoWonTheRace trySuccess "x"
    }

    future {
      Thread.sleep(new Random().nextInt(500))
      whoWonTheRace trySuccess "y"
    }

    Console.println("Who won the race?")

    Thread.sleep(1000)

    // output
    //
    // Who won the race?
    // x wins | y wins
  

Methods available here are tryComplete, trySuccess and tryFailure


The entire example - available on GitHub - is an executable application.

For further reading check out Futures and Promises on Scala Docs.