- Global Scope
- Possible replacements
- GlobalScope vs custom scope
- Legitimate use-cases
- Coroutines basics
- Your first coroutine
- Structured concurrency
- Extract function refactoring
- Scope builder
- Scope builder and concurrency
- An explicit job
- Coroutines ARE light-weight
- Kotlin Coroutines 1.5: GlobalScope Marked as Delicate, Refined Channels API, and More
- GlobalScope marked as a delicate API
- Possible replacements
- Legitimate use cases
- Extensions for JUnit 5
- Channel API refinement
- The new naming scheme
- Try functions: non-suspending counterparts to send and receive
- Catching functions: suspending functions that encapsulate errors
- Migrating your code to new functions
- Reactive integrations on the road to stability
- Improved integration with Reactive Streams
- New convenience functions
- Start using kotlinx.coroutines 1.5.0!
Global Scope
A global CoroutineScope not bound to any job. Global scope is used to launch top-level coroutines which are operating on the whole application lifetime and are not cancelled prematurely.
Active coroutines launched in GlobalScope do not keep the process alive. They are like daemon threads.
This is a delicate API. It is easy to accidentally create resource or memory leaks when GlobalScope is used. A coroutine launched in GlobalScope is not subject to the principle of structured concurrency, so if it hangs or gets delayed due to a problem (e.g. due to a slow network), it will stay working and consuming resources. For example, consider the following code:
A call to loadConfiguration creates a coroutine in the GlobalScope that works in background without any provision to cancel it or to wait for its completion. If a network is slow, it keeps waiting in background, consuming resources. Repeated calls to loadConfiguration will consume more and more resources.
Possible replacements
In many cases uses of GlobalScope should be removed, marking the containing operation with suspend , for example:
In cases when GlobalScope.launch was used to launch multiple concurrent operations, the corresponding operations shall be grouped with coroutineScope instead:
In top-level code, when launching a concurrent operation from a non-suspending context, an appropriately confined instance of CoroutineScope shall be used instead of a GlobalScope . See docs on CoroutineScope for details.
GlobalScope vs custom scope
Do not replace GlobalScope.launch < . >with CoroutineScope().launch < . >constructor function call. The latter has the same pitfalls as GlobalScope . See CoroutineScope documentation on the intended usage of CoroutineScope() constructor function.
Legitimate use-cases
There are limited circumstances under which GlobalScope can be legitimately and safely used, such as top-level background processes that must stay active for the whole duration of the application’s lifetime. Because of that, any use of GlobalScope requires an explicit opt-in with @OptIn(DelicateCoroutinesApi::class) , like this:
Источник
Coroutines basics
This section covers basic coroutine concepts.
Your first coroutine
A coroutine is an instance of suspendable computation. It is conceptually similar to a thread, in the sense that it takes a block of code to run that works concurrently with the rest of the code. However, a coroutine is not bound to any particular thread. It may suspend its execution in one thread and resume in another one.
Coroutines can be thought of as light-weight threads, but there is a number of important differences that make their real-life usage very different from threads.
Run the following code to get to your first working coroutine:
You can get the full code here.
You will see the following result:
Let’s dissect what this code does.
launch is a coroutine builder. It launches a new coroutine concurrently with the rest of the code, which continues to work independently. That’s why Hello has been printed first.
delay is a special suspending function. It suspends the coroutine for a specific time. Suspending a coroutine does not block the underlying thread, but allows other coroutines to run and use the underlying thread for their code.
runBlocking is also a coroutine builder that bridges the non-coroutine world of a regular fun main() and the code with coroutines inside of runBlocking < . >curly braces. This is highlighted in an IDE by this: CoroutineScope hint right after the runBlocking opening curly brace.
If you remove or forget runBlocking in this code, you’ll get an error on the launch call, since launch is declared only in the CoroutineScope:
The name of runBlocking means that the thread that runs it (in this case — the main thread) gets blocked for the duration of the call, until all the coroutines inside runBlocking < . >complete their execution. You will often see runBlocking used like that at the very top-level of the application and quite rarely inside the real code, as threads are expensive resources and blocking them is inefficient and is often not desired.
Structured concurrency
Coroutines follow a principle of structured concurrency which means that new coroutines can be only launched in a specific CoroutineScope which delimits the lifetime of the coroutine. The above example shows that runBlocking establishes the corresponding scope and that is why the previous example waits until World! is printed after a second’s delay and only then exits.
In a real application, you will be launching a lot of coroutines. Structured concurrency ensures that they are not lost and do not leak. An outer scope cannot complete until all its children coroutines complete. Structured concurrency also ensures that any errors in the code are properly reported and are never lost.
Extract function refactoring
Let’s extract the block of code inside launch < . >into a separate function. When you perform «Extract function» refactoring on this code, you get a new function with the suspend modifier. This is your first suspending function. Suspending functions can be used inside coroutines just like regular functions, but their additional feature is that they can, in turn, use other suspending functions (like delay in this example) to suspend execution of a coroutine.
You can get the full code here.
Scope builder
In addition to the coroutine scope provided by different builders, it is possible to declare your own scope using the coroutineScope builder. It creates a coroutine scope and does not complete until all launched children complete.
runBlocking and coroutineScope builders may look similar because they both wait for their body and all its children to complete. The main difference is that the runBlocking method blocks the current thread for waiting, while coroutineScope just suspends, releasing the underlying thread for other usages. Because of that difference, runBlocking is a regular function and coroutineScope is a suspending function.
You can use coroutineScope from any suspending function. For example, you can move the concurrent printing of Hello and World into a suspend fun doWorld() function:
You can get the full code here.
This code also prints:
Scope builder and concurrency
A coroutineScope builder can be used inside any suspending function to perform multiple concurrent operations. Let’s launch two concurrent coroutines inside a doWorld suspending function:
You can get the full code here.
Both pieces of code inside launch < . >blocks execute concurrently, with World 1 printed first, after a second from start, and World 2 printed next, after two seconds from start. A coroutineScope in doWorld completes only after both are complete, so doWorld returns and allows Done string to be printed only after that:
An explicit job
A launch coroutine builder returns a Job object that is a handle to the launched coroutine and can be used to explicitly wait for its completion. For example, you can wait for completion of the child coroutine and then print «Done» string:
You can get the full code here.
This code produces:
Coroutines ARE light-weight
Run the following code:
You can get the full code here.
It launches 100K coroutines and, after 5 seconds, each coroutine prints a dot.
Источник
Kotlin Coroutines 1.5: GlobalScope Marked as Delicate, Refined Channels API, and More
Kotlin Coroutines 1.5.0 is out! Here’s what the new version brings.
- GlobalScope is now marked as a delicate API. GlobalScope is an advanced API that is easy to misuse. The compiler will now warn you about possible misuse and require an opt-in for this class in your program.
- Extensions for JUnit. CoroutinesTimeout is now available for JUnit 5.
- The refined Channel API. Along with a new naming scheme for the library functions, the non-suspending functions trySend and tryReceive were introduced as better alternatives to offer and poll .
- Stabilization of Reactive Integrations. We added more functions for converting from Reactive Streams types to Kotlin Flow and back, stabilized many existing functions and ReactiveContext API.
In this blog post, you will also find recommendations for migrating to the new version.
GlobalScope marked as a delicate API
The GlobalScope class is now marked with the @DelicateCoroutinesApi annotation. From now on, any use of GlobalScope requires an explicit opt-in with @OptIn(DelicateCoroutinesApi::class) .
While the use of GlobalScope isn’t recommended for most cases, the official documentation still introduces the concepts via this delicate API.
A global CoroutineScope is not bound to any job. Global scope is used to launch top-level coroutines that operate during the whole application lifetime and are not canceled prematurely. Active coroutines launched in GlobalScope do not keep the process alive. They are like daemon threads.
This is a delicate API and it is easy to accidentally create resource or memory leaks when GlobalScope is used. A coroutine launched in GlobalScope is not subject to the principle of structured concurrency, so if it hangs or gets delayed due to a problem (e.g. due to a slow network), it will keep working and consuming resources. For example, consider the following code:
A call to loadConfiguration creates a coroutine in GlobalScope that works in the background without any provision to cancel it or to wait for its completion. If the network is slow, it keeps waiting in the background, consuming resources. Repeated calls to loadConfiguration will consume more and more resources.
Possible replacements
In many cases, the use of GlobalScope should be avoided and the containing operation should be marked with suspend , for example:
In cases when GlobalScope.launch is used to launch multiple concurrent operations, the corresponding operations should be grouped with coroutineScope instead:
In top-level code, when launching a concurrent operation from a non-suspending context, an appropriately confined instance of CoroutineScope should be used instead of GlobalScope .
Legitimate use cases
There are limited circumstances under which GlobalScope can be legitimately and safely used, such as top-level background processes that must stay active for the whole duration of an application’s lifetime. Because of that, any use of GlobalScope requires an explicit opt-in with @OptIn(DelicateCoroutinesApi::class) , like this:
We recommend reviewing all your usages of GlobalScope carefully and annotating only those that fall under the “legitimate use-cases” category. For any other usages, they could likely be a source of bugs in your code – replace such GlobalScope usages as described above.
Extensions for JUnit 5
We have added a CoroutinesTimeout annotation that allows running tests in a separate thread, failing them after the provided time limit and interrupting the thread. Previously, CoroutinesTimeout was available for JUnit 4. In this release, we’re adding the integration for JUnit 5.
To use the new annotation, add the following dependency to your project:
Here’s a simple example of how to use the new CoroutinesTimeout in your tests:
In the example, the coroutines timeout is defined at a class level and specifically for the firstTest . The annotated test does not timeout, as the annotation on the function overrides the class-level one. The secondTest uses the class-level annotation, so it times out.
The annotation is declared in the following way:
The first parameter, testTimeoutMs , specifies the timeout duration in milliseconds. The second parameter, cancelOnTimeout , determines if all the running coroutines should be canceled at the moment of the timeout. If it’s set to true , all the coroutines will be automatically canceled.
Whenever you use the CoroutinesTimeout annotation, it automatically enables the coroutines debugger and dumps all coroutines at the moment of the timeout. The dump contains the coroutine creation stack traces. If there is a need to disable the creation stack traces in order to speed tests up, consider directly using CoroutinesTimeoutExtension , which allows this configuration.
Many thanks to Abhijit Sarkar, who created a useful PoC for CoroutinesTimeout for JUnit 5. The idea was developed into the new CoroutinesTimeout annotation that we added in the 1.5 release.
Channel API refinement
Channels are important communication primitives that allow you to pass data between different coroutines and callbacks. In this release, we reworked the Channel API a bit, replacing the offer and poll functions causing confusion with better alternatives. Along the way, we developed a new consistent naming scheme for suspending and non-suspending methods.
The new naming scheme
We tried to work towards a consistent naming scheme to use further in other libraries or Coroutines API. We wanted to make sure that the name of the function would convey the information about its behavior. As a result, we came up with the following:
- The regular suspending methods are left as-is, e.g., send , receive .
- Their non-suspending counterparts with error encapsulation are consistently prefixed with “try”: trySend and tryReceive instead of the old offer and poll .
- New error-encapsulating suspending methods will have the suffix “Catching”.
Let’s dive into the details about these new methods.
Try functions: non-suspending counterparts to send and receive
One coroutine can send some information to a channel, while the other one can receive this information from it. Both send and receive functions are suspending. send suspends its coroutine if the channel is full and can’t take a new element, while receive suspends its coroutine if the channel has no elements to return:
These functions have non-suspending counterparts for use in synchronous code: offer and poll , which are now deprecated in favor of trySend and tryReceive . Let’s discuss the reasons for this change.
offer and poll are supposed to do the same thing as send and receive , but without suspension. It sounds easy, and everything works fine when the element can be sent or received. But in case of an error, what happens? send and receive would suspend until they can do their job. offer and poll simply returned false and null , respectively, if the element couldn’t be added because the channel is full, or no element can be retrieved because the channel is empty. They both threw an exception in an attempt to work with a closed channel, and this last detail turned out to be confusing.
In this example, poll is called before any element is added and therefore returns null immediately. Note that it’s not supposed to be used in this way: you should instead continue polling elements regularly, but we call it directly for simplicity of this explanation. The invocation of offer is also unsuccessful since our channel is a rendezvous channel and has zero buffer capacity. As a result, offer returns false , and poll returns null , simply because they were called in the wrong order.
In the example above, try uncommenting the channel.close() statement to make sure that the exception is thrown. In this case, poll returns false , as before. But then offer tries to add an element to an already closed channel, fails, and throws an exception. We received many complaints that such behavior is error-prone. It’s easy to forget to catch this exception, and while you’d rather ignore it or handle it differently, it crashes your program.
The new trySend and tryReceive fix this issue and return a more detailed result. Each returns the ChannelResult instance, which is one of the three things: a successful result, a failure, or an indication that the channel was closed.
This example works in the same way as the previous one, with the only difference that tryReceive and trySend return a more detailed result. You can see Value(Failed) in the output instead of false and null . Uncomment the line closing the channel again and ensure that trySend now returns Closed result capturing an exception.
Thanks to inline value classes, using ChannelResult doesn’t create additional wrappers underneath, and if the successful value is returned, it’s returned as is, without any overhead.
Catching functions: suspending functions that encapsulate errors
Starting from this release, the error-encapsulating suspending methods will have the suffix “Catching”. For instance, the new receiveCatching function handles the exception in the case of a closed channel. Consider this simple example:
The channel is closed before we try retrieving a value. However, the program completes successfully, indicating that the channel was closed. If you replace receiveCatching with the ordinary receive function, it will throw ClosedReceiveChannelException :
For now, we only provide receiveCatching and onReceiveCatching (instead of the previously internal receiveOrClosed ), but we have plans to add more functions.
Migrating your code to new functions
You can replace all the usages of the offer and poll functions in your project automatically with new calls. Since offer returned Boolean , its equivalent replacement is channel.trySend(«Element»).isSuccess .
Likewise, the poll function returns a nullable element, so its replacement becomes channel.tryReceive().getOrNull() .
If the result of the invocation wasn’t used, you can replace them directly with new calls.
The behavior towards handling exceptions is now different, so you will need to make the necessary updates manually. If your code relies on ‘offer’ and ‘poll’ methods throwing exceptions on a closed channel, you’ll need to use the following replacements.
The equivalent replacement for channel.offer(«Element») should throw an exception if the channel was closed, even if it was closed normally:
The equivalent replacement for channel.poll() throws an exception if the channel was closed with an error and returns null if it was closed normally:
Such changes reflect the old behavior of offer and poll functions.
We assume that in most cases, your code didn’t rely on these subtleties of behavior on a closed channel, but rather that it was a source of bugs. That’s why the automatic replacements provided by IDE simplify the semantics. If this is not true for you, please review and update your usages manually and consider rewriting them completely to handle the cases of closed channels differently, without throwing exceptions.
Reactive integrations on the road to stability
Version 1.5 of Kotlin Coroutines promotes most of the functions responsible for integrations with reactive frameworks to stable API.
In the JVM ecosystem, there are a few frameworks that deal with asynchronous stream processing conforming to the Reactive Streams standard. For instance, Project Reactor and RxJava are the two popular Java frameworks in this area.
While Kotlin Flows are different and the types are not compatible with the ones specified by the standard, they are conceptually still streams. It is possible to convert Flow to the reactive (specification and TCK compliant) Publisher and vice versa. Such converters are provided by kotlinx.coroutines out-of-the-box and can be found in the corresponding reactive modules.
For instance, if you need interoperability with the Project Reactor types, you should add the following dependencies to your project:
Then you will be able to use Flow .asPublisher() if you want to use the Reactive Streams types, or Flow .asFlux() if you need to use Project Reactor types directly.
This is a very condensed view of the subject at hand. If you are interested in learning more, consider reading Roman Elizarov’s article about Reactive Streams and Kotlin Flows.
While the integrations with reactive libraries are working towards the stabilization of the API, technically, the goal is to get rid of the @ExperimentalCoroutinesApi and implement the leftovers for various topics.
Improved integration with Reactive Streams
Compatibility with Reactive Streams specifications is important in order to ensure interoperability between 3rd-party frameworks and Kotlin Coroutines. It helps to adopt Kotlin Coroutines in legacy projects without the need to rewrite all of the code.
There is a long list of functions that we managed to promote to stable status this time. It is now possible to convert a type from any Reactive Streams implementation to Flow and back. For instance, the new code can be written with Coroutines, but integrated with the old reactive codebase via the opposite converters:
Also, numerous improvements were made to ReactorContext , which wraps Reactor’s Context into CoroutineContext for seamless integration between Project Reactor and Kotlin Coroutines. With this integration, it is possible to propagate the information about Reactor’s Context through coroutines.
The context is implicitly propagated through the subscribers’ context by all Reactive integrations, such as Mono , Flux , Publisher.asFlow , Flow.asPublisher and Flow.asFlux . Here’s a simple example of propagating the subscriber’s Context to ReactorContext :
In the example above, we construct a Flow instance which is then converted to Reactor’s Flux instance, with no context. The effect of calling the subscribe() method without an argument is to request the publisher to send all data. As a result, the program prints the phrase “Reactor context in Flow: null”.
The next call chain also converts the Flow to Flux , but then adds a key-value pair, answer=42, to the Reactor’s context for this chain. The call to subscribe() triggers the chain. In this case, since the context is populated, the program prints “Reactor context in Flow: Context1 ”
New convenience functions
When working with reactive types like Mono in the Coroutines context, there are a few convenience functions that allow retrieval without blocking the thread. In this release, we deprecated awaitSingleOr* functions on arbitrary Publisher s, and specialized some await* functions for Mono and Maybe .
Mono produces at most one value, so the last element is the same as the first. In this case, the semantics of dropping the remaining elements isn’t useful as well. Therefore, Mono.awaitFirst() and Mono.awaitLast() are deprecated in favour of Mono.awaitSingle() .
Start using kotlinx.coroutines 1.5.0!
The new release features an impressive list of changes. The new naming scheme developed while refining the Channels API is a notable achievement by the team. Meanwhile, there’s a great focus on making the Coroutines API as simple and intuitive as possible.
To start using the new version of Kotlin Coroutines, just update the content of your build.gradle.kts file. First, make sure that you have the latest version of the Kotlin Gradle plugin:
And then update the versions of the dependencies, including the libraries with specific integrations for the Reactive Streams.
Источник