Skip to content

Execution context #304

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Sep 11, 2015
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 151 additions & 5 deletions overviews/core/_posts/2012-09-20-futures.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ languages: [ja]

## Introduction

Futures provide a nice way to reason about performing many operations
in parallel-- in an efficient and non-blocking way. The idea
is simple, a `Future` is a sort of a placeholder object that you can
create for a result that does not yet exist. Generally, the result of
the `Future` is computed concurrently and can be later collected.
Futures provide a way to reason about performing many operations
in parallel-- in an efficient and non-blocking way.
A [`Future`](http://www.scala-lang.org/api/current/index.html#scala.concurrent.Future)
is a placeholder object for a value that may not yet exist.
Generally, the value of the Future is supplied concurrently and can subsequently be used.
Composing concurrent tasks in this way tends to result in faster, asynchronous, non-blocking parallel code.

By default, futures and promises are non-blocking, making use of
Expand All @@ -36,6 +36,152 @@ in the case of an application which requires blocking futures, for an underlying
environment to resize itself if necessary to guarantee progress.
-->

A typical future looks like this:

val inverseFuture: Future[Matrix] = Future {
fatMatrix.inverse() // non-blocking long lasting computation
}(executionContext)


Or with the more idiomatic:

implicit val ec: ExecutionContext = ...
val inverseFuture : Future[Matrix] = Future {
fatMatrix.inverse()
} // ec is implicitly passed


Both code snippets delegate the execution of `fatMatrix.inverse()` to an `ExecutionContext` and embody the result of the computation in `inverseFuture`.


## Execution Context

Future and Promises revolve around [`ExecutionContext`s](http://www.scala-lang.org/api/current/index.html#scala.concurrent.ExecutionContext), responsible for executing computations.

An `ExecutionContext` is similar to an [Executor](http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executor.html):
it is free to execute computations in a new thread, in a pooled thread or in the current thread
(although executing the computation in the current thread is discouraged -- more on that below).

The `scala.concurrent` package comes out of the box with an `ExecutionContext` implementation, a global static thread pool.
It is also possible to convert an `Executor` into an `ExecutionContext`.
Finally, users are free to extend the `ExecutionContext` trait to implement their own execution contexts,
although this should only be done in rare cases.


### The Global Execution Context

`ExecutionContext.global` is an `ExecutionContext` backed by a [ForkJoinPool](http://docs.oracle.com/javase/tutorial/essential/concurrency/forkjoin.html).
It should be sufficient for most situations but requires some care.
A `ForkJoinPool` manages a limited amount of threads (the maximum amount of thread being referred to as *parallelism level*).
The number of concurrently blocking computations can exceed the parallelism level
only if each blocking call is wrapped inside a `blocking` call (more on that below).
Otherwise, there is a risk that the thread pool in the global execution context is starved,
and no computation can proceed.

By default the `ExecutionContext.global` sets the parallelism level of its underlying fork-join pool to the amount of available processors
([Runtime.availableProcessors](http://docs.oracle.com/javase/7/docs/api/java/lang/Runtime.html#availableProcessors%28%29)).
This configuration can be overriden by setting one (or more) of the following VM attributes:

* scala.concurrent.context.minThreads - defaults to `Runtime.availableProcessors`
* scala.concurrent.context.numThreads - can be a number or a multiplier (N) in the form 'xN' ; defaults to `Runtime.availableProcessors`
* scala.concurrent.context.maxThreads - defaults to `Runtime.availableProcessors`

The parallelism level will be set to `numThreads` as long as it remains within `[minThreads; maxThreads]`.

As stated above the `ForkJoinPool` can increase the amount of threads beyond its `parallelismLevel` in the presence of blocking computation.
As explained in the `ForkJoinPool` API, this is only possible if the pool is explicitly notified:

import scala.concurrent.Future
import scala.concurrent.forkjoin._

// the following is equivalent to `implicit val ec = ExecutionContext.global`
import ExecutionContext.Implicits.global

Future {
ForkJoinPool.managedBlock(
new ManagedBlocker {
var done = false

def block(): Boolean = {
try {
myLock.lock()
// ...
} finally {
done = true
}
true
}

def isReleasable: Boolean = done
}
)
}


Fortunately the concurrent package provides a convenient way for doing so:

import scala.concurrent.Future
import scala.concurrent.blocking

Future {
blocking {
myLock.lock()
// ...
}
}

Note that `blocking` is a general construct that will be discussed more in depth [below](#in_a_future).

Last but not least, you must remember that the `ForkJoinPool` is not designed for long lasting blocking operations.
Even when notified with `blocking` the pool might not spawn new workers as you would expect,
and when new workers are created they can be as many as 32767.
To give you an idea, the following code will use 32000 threads:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is perhaps too detailed, and concerns standing ForkJoinPool implementations more than execution contexts. I would omit this snippet.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This section is named The Global Execution Context. IMO you ought to know how your ExecutionContext.global behaves. I would agree with you if the underlying implementation of the global EC wasn't a ForkJoinPool.


implicit val ec = ExecutionContext.global

for( i <- 1 to 32000 ) {
Future {
blocking {
Thread.sleep(999999)
}
}
}


If you need to wrap long lasting blocking operations we recommend using a dedicated `ExecutionContext`, for instance by wrapping a Java `Executor`.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

FJP is a Java Executor so I am unsure how this recommendation will help. Thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps what was meant here is to create a dedicated execution context just for executing blocking code, so that non-blocking code running on a separate execution context is not influenced negatively.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That was indeed the idea.



### Adapting a Java Executor

Using the `ExecutionContext.fromExecutor` method you can wrap a Java `Executor` into an `ExecutionContext`.
For instance:

ExecutionContext.fromExecutor(new ThreadPoolExecutor( /* your configuration */ ))


### Synchronous Execution Context

One might be tempted to have an `ExecutionContext` that runs computations within the current thread:

val currentThreadExecutionContext = ExecutionContext.fromExecutor(
new Executor {
// Do not do this!
def execute(runnable: Runnable) { runnable.run() }
})

This should be avoided as it introduces non-determinism in the execution of your future.

Future {
doSomething
}(ExecutionContext.global).map {
doSomethingElse
}(currentThreadExecutionContext)

The `doSomethingElse` call might either execute in `doSomething`'s thread or in the main thread, and therefore be either asynchronous or synchronous.
As explained [here](http://blog.ometer.com/2011/07/24/callbacks-synchronous-and-asynchronous/) a callback should not be both.



## Futures

A `Future` is an object holding a value which may become available at some point.
Expand Down