- Lessons learnt using Coroutines Flow in the Android Dev Summit 2019 app
- 1. Prefer exposing streams as Flows (not Channels)
- 2. How to use Flow in your Android app architecture
- UseCase and Repository
- ViewModel
- 3. When to use a BroadcastChannel or Flow as an implementation detail
- When to use Flow
- When to use BroadcastChannel
- Disclaimer
- 4. Convert data streams callback-based APIs to Coroutines
- Flow implementation
- BroadcastChannel implementation
- 5. Testing tips
- Современный подход к конкурентности в Android: корутины в Kotlin
Lessons learnt using Coroutines Flow in the Android Dev Summit 2019 app
This article is about the best practices we found when using Flow in the Android Dev Summit (ADS) 2019 app; which has just been open sourced. Keep reading to find out how each layer of our app handles data streams.
The ADS app architecture follows the recommended app architecture guide, with the addition of a domain layer (of UseCases) which help separate concerns, keeping classes small, focused, reusable and testable:
Like many Android apps the ADS app lazily loads data from the network or a cache; we found this to be a perfect use case for Flow . For one shot operations, suspend functions were a better fit. There are two main commits that refactor the app to use Coroutines. The first commit migrates one-shot operations, and the second one migrates to data streams.
In this article, you can find the principles we followed to refactor the app from using LiveData in all the layers of the architecture to just use LiveData for communication between View and ViewModel, and Coroutines for the UseCase and lower layers of our architecture.
1. Prefer exposing streams as Flows (not Channels)
There are two ways you can deal with streams of data in coroutines: the Flow API and the Channel API. Channels are a synchronisation primitive whereas Flow is built to model streams of data: it’s a factory for subscriptions to streams of data. Channels can however be used to back a Flow , as we’ll see later.
Prefer exposing Flow since it gives you more flexibility, more explicit contracts and operators than Channel
Flows automatically close the stream of data due to the nature of the terminal operators which trigger the execution of the stream of data and complete successfully or exceptionally depending on all the flow operations in the producer side. Therefore, you can’t (nearly as easily) leak resources on the producer side. This is easier to do with Channels: the producer might not clean up heavy resources if the Channel is not closed properly.
The data layer of an app is responsible for providing data usually by reading from a database or fetching from the Internet. For example here’s a DataSource interface that exposes a stream of user event data:
2. How to use Flow in your Android app architecture
UseCase and Repository
The layers in-between View/ViewModel and the DataSource (i.e. UseCase and Repository in our case) often need to combine data from multiple queries or transform the data before it can be used by the ViewModel layer. Just like Kotlin sequences, Flow supports a large set of operators to transform your data. There are a wealth of operators already available, or you can create your own transformation (e.g. using the transform operator). However, Flow exposes suspend lambdas on many of the operators, there’s often no need to make a custom transform to accomplish complex tasks, just call suspend functions from inside your Flow .
In our ADS example, we want to combine the UserEventResult with session data in the Repository layer. We use the map operator to apply a suspend lambda to each value of the Flow retrieved from DataSource:
ViewModel
When performing UI ↔ ViewModel communication with LiveData , the ViewModel layer should consume the stream of data coming from the data layer using a terminal operator (e.g. collect , first or toList ).
If you’re converting a Flow to a LiveData , you can use the Flow.asLiveData() extension function from the androidX lifecycle LiveData ktx library. This is very convenient since it will share a single underlying subscription to the Flow and will manage the subscription based on the observers’ lifecycles. Moreover, LiveData also keeps the most recent value for late-coming observers and the subscription active across configuration changes. Check this simpler code that showcases how you can use the extension function:
Disclaimer: The code snippet above is not part of the app; it’s a simplified version of the code that showcases how you can use Flow.asLiveData() .
3. When to use a BroadcastChannel or Flow as an implementation detail
Back to the DataSource implementation, how can we implement the getObservableUserEvent function we exposed above? The team considered two alternatives implementations: the flow builder or the BroadcastChannel API. Each serve different use cases.
When to use Flow
Flow is a cold stream. A cold stream is a data source whose producer will execute for each listener that starts consuming events, resulting in a new stream of data being created on each subscription. Once the consumer stops listening or the producer block finishes, the stream of data will be closed automatically.
Flow is a great fit when the production of data needs to start/stop to match the observer
You can emit a limited or unlimited number of elements using the flow builder.
Flow tends to be used for expensive tasks as it provides automatic cleanup via coroutine cancellation. Notice that this cancellation is cooperative, a flow that never suspends can never be cancelled: in our example, since delay is a suspend function that checks for cancellation, when the subscriber stops listening, the Flow will stop and cleanup resources.
When to use BroadcastChannel
A Channel is a concurrency primitive for communicating between coroutines. A BroadcastChannel is an implementation of Channel with multicast capabilities.
There are some cases where you might want to use an implementation of BroadcastChannel in your DataSource layer:
Use BroadcastChannel when the producer(s) and consumer(s) have different lifetimes or operate completely independently of each other
The BroadcastChannel API is the perfect fit when you want the producer to follow a different lifecycle and broadcast the current result to anyone who’s listening. In this way, the producer doesn’t need to start every time a new listener starts consuming events.
You can still expose a Flow to the caller, they don’t need to know about how this is implemented. You can use the extension function BroadcastChannel.asFlow() to expose a BroadcastChannel as a Flow .
However, closing that Flow won’t cancel the subscription. When using BroadcastChannel , you have to take care of its lifecycle. They don’t know if there are listeners or not, and will keep resources alive until the BroadcastChannel is cancelled or closed. Make sure to close the BroadcastChannel when it’s no longer needed. Also, remember that a closed channel cannot be active again, you’d need to create a new instance.
An example of how to use the BroadcastChannel API can be found in the next section.
Disclaimer
Parts of the Flow and Channel APIs are still in experimental, they’re likely to change. There are some situations where you would currently use Channels but the recommendation in the future may change to use Flow . Specifically, the StateFlow and Flow’s share operator proposals may reduce the usage of Channel in the future.
4. Convert data streams callback-based APIs to Coroutines
Multiple libraries already support coroutines for data streams operations, including Room. For those that don’t, you can convert any callback-based API to Coroutines.
Flow implementation
If you want to convert a stream callback-based API to use Flow , you can use the channelFlow function (also callbackFlow , which shares the same implementation). channelFlow creates an instance of a Flow whose elements are sent to a Channel . This allows us to provide elements running in a different context or concurrently.
In the following sample, we want to emit the elements that we get from a callback into a Flow :
- Create a flow with the channelFlow builder that registers a callback to a third party library.
- Emit all items received from the callback to the Flow .
- When the subscriber stops listening, we unregister the subscription to the API using the suspend funawaitClose .
BroadcastChannel implementation
For our stream of data that tracks user authentication with Firestore, we used the BroadcastChannel API as we want to register one Authentication listener that follows a different lifecycle and broadcasts the current result to anyone who’s listening.
To convert a callback API to BroadcastChannel you need a bit more code than with Flow . You can create a class where the instance of the BroadcastChannel can be kept in a variable. During initialisation, register the callback that sends elements to the BroadcastChannel as before:
5. Testing tips
To test Flow transformations (as we do in the UseCase and Repository layers), you can use the flow builder to return fake data. For example:
To test implementations of Flow successfully, a good idea is to use the take operator to get some items from the Flow and the toList operator as the terminal operator to get the results in a list. See an example of this in the following test:
The take operator is a great fit to close the Flow after you get the items. Not closing a started Flow (or BroadcastChannel ) after each test will leak memory and creates a flaky and inconsistent test suite.
Note: If the implementation of the DataSource is done with a BroadcastChannel , the code above is not enough. You have to manage its lifecycle by making sure you start the BroadcastChannel before the test and close it after the test finishes. If not, you’ll leak memory. You can see a test like this in this other Flow sample.
Testing Coroutines best practices also apply here. If you create a new coroutine in code under test, you might want to execute it in your test thread for a deterministic execution of your test. Check out more about this in the Testing Coroutines ADS 2019 talk.
Источник
Современный подход к конкурентности в Android: корутины в Kotlin
Напоминаем, что у нас уже открыт предзаказ на долгожданную книгу о языке Kotlin из знаменитой серии Big Nerd Ranch Guides. Сегодня мы решили предложить вашему вниманию перевод статьи, рассказывающей о корутинах Kotlin и о правильной работе с потоками в Android. Тема обсуждается очень активно, поэтому для полноты картины также рекомендуем посмотреть эту статью с Хабра и этот подробный пост из блога компании Axmor Software.
Современный фреймворк для обеспечения конкурентности в Java/Android учиняет ад обратных вызовов и приводит к блокирующим состояниям, так как в Android нет достаточно простого способа гарантировать потокобезопасность.
Корутины Kotlin – это очень эффективный и полный инструментарий, позволяющий гораздо проще и производительнее управлять конкурентностью.
Приостановка и блокирование: в чем разница
Корутины не заменяют потоков, а скорее дают фреймворк для управления ими. Философия корутин заключается в определении контекста, позволяющего ожидать, пока завершатся фоновые операции, не блокируя при этом основного потока.
Цель корутин в данном случае – обойтись без обратных вызовов и упростить конкуренцию.
Для начала возьмем самый простой пример: запустим корутину в контексте Main (главный поток). В нем мы извлечем изображение из потока IO и отправим это изображение на обработку обратно в Main .
Код прост как однопоточная функция. Причем, пока getImage выполняется в выделенном пуле потоков IO , главный поток свободен и может взяться за любую другую задачу! Функция withContext приостанавливает текущую корутину, пока работает ее действие ( getImage() ). Как только getImage() возвратится и looper из главного потока станет доступен, корутина возобновит работу в главном потоке и вызовет imageView.setImageBitmap(image) .
Второй пример: теперь нам требуется, чтобы были выполнены 2 фоновые задачи, чтобы ими можно было воспользоваться. Мы применим дуэт async/await, чтобы две эти задачи выполнялись параллельно, и воспользуемся их результатом в главном потоке, как только обе задачи будут готовы:
async подобен launch , но возвращает deferred (сущность Kotlin, эквивалентная Future ), поэтому ее результат можно получить при помощи await() . При вызове без параметров она работает в контексте, задаваемом по умолчанию для текущей области видимости.
Опять же, главный поток остается свободен, пока мы дожидаемся наших 2 значений.
Как видите, функция launch возвращает Job , который можно использовать для ожидания, пока операция завершится – это делается при помощи функции join() . Она работает как и в любом другом языке, с той оговоркой, что просто приостанавливает корутину, а не блокирует поток.
Диспетчеризация – ключевая концепция при работе с корутинами. Это действие, позволяющее «перепрыгнуть» от одного потока к другому.
Рассмотрим, как в java выглядит эквивалент для диспетчеризации в Main , то есть,
Реализация контекста Main для Android – это диспетчер на основе Handler . Итак, это действительно очень подходящая реализация:
launch(Dispatchers.Main) посылает Runnable в Handler , так что его код выполняется не сразу.
launch(Dispatchers.Main, CoroutineStart.UNDISPATCHED) немедленно выполнит свое лямбда-выражение в текущем потоке.
Dispatchers.Main гарантирует, что когда корутина возобновит работу, она будет направлена в главный поток; кроме того, Handler используется здесь как нативная реализация Android для отправки в цикл событий приложения.
Точная реализация выглядит так:
Вот хорошая статья помогающая разобраться в тонкостях диспетчеризации в Android:
Understanding Android Core: Looper, Handler, and HandlerThread.
Контекст корутины (он же – диспетчер корутины) определяет, в каком потоке будет выполняться ее код, что делать, если будет выброшено исключение, и обращается к родительскому контексту для распространения отмены.
job.cancel() отменит все корутины, родителем которых является job . A exceptionHandler получит все исключения, выброшенные в этих корутинах.
Интерфейс coroutineScope упрощает обработку ошибок:
Если откажет какая-либо из его дочерних корутин, то откажет и вся область видимости, и все дочерние корутины будут отменены.
В примере async , если извлечь значение не удалось, а другая задача при этом продолжила работу – у нас возникает поврежденное состояние, и с этим надо что-то делать.
При работе с coroutineScope функция useValues будет вызываться лишь в случае, если извлечение обоих значений прошло успешно. Также, если deferred2 откажет, deferred1 будет отменена.
Также можно “поместить в область видимости” целый класс, чтобы задать для него контекст CoroutineContext по умолчанию и использовать его.
Пример класса, реализующего интерфейс CoroutineScope :
Запуск корутин в CoroutineScope :
Диспетчер launch или async , задаваемый по умолчанию, теперь становится диспетчером актуальной области видимости.
Автономный запуск корутины (вне какого-либо CoroutineScope):
Можно даже определить область видимости для приложения, задав диспетчер Main по умолчанию:
- Корутины ограничивают интероперабельность с Java
- Ограничивают изменяемость во избежание блокировок
- Корутины предназначены для ожидания, а не для организации потоков
- Избегайте I/O в Dispatchers.Default (и Main …) — для этого предназначен Dispatchers.IO
- Потоки ресурсозатратны, поэтому используются однопоточные контексты
- Dispatchers.Default основан на ForkJoinPool , появившемся в Android 5+
- Корутины можно использовать посредством каналов
Избавляемся от блокировок и обратных вызовов при помощи каналов
Определение канала из документации JetBrains:
Канал Channel концептуально очень похож на BlockingQueue . Ключевое отличие заключается в том, что он не блокирует операцию put, он предусматривает приостанавливающий send (или неблокирующий offer ), а вместо блокирования операции take предусматривает приостанавливающий receive .
Рассмотрим простой инструмент для работы с каналами: Actor .
Actor , опять же, очень похож на Handler : мы определяем контекст корутины (то, есть, поток, в котором собираемся выполнять действия) и работаем с ним в последовательном порядке.
Разница, конечно же, заключается в том, что здесь используются корутины; можно указать мощность, а выполняемый код – приостановить.
В принципе, actor будет переадресовывать любую команду каналу корутины. Он гарантирует выполнение команды и ограничивает операции в ее контексте. Такой подход отлично помогает избавиться от вызовов synchronize и держать все потоки свободными!
В данном примере мы пользуемся запечатанными классами Kotlin, выбирая, какое именно действие выполнить.
Причем, все эти действия будут поставлены в очередь, параллельно выполняться они никогда не будут. Это удобный способ добиться ограничения изменяемости.
Жизненный цикл Android + корутины
Акторы могут очень пригодиться и для управления пользовательским интерфейсом Android, упрощают отмену задач и предотвращают перегрузку главного потока.
Давайте это реализуем и вызовем job.cancel() при уничтожении активности.
Класс SupervisorJob похож на обычный Job с тем единственным исключением, что отмена распространяется только в нисходящем направлении.
Поэтому мы не отменяем всех корутин в Activity , когда одна из них отказывает.
Чуть лучше дела обстоят с функцией расширения, позволяющей открыть доступ к этому CoroutineContext из любого View в CoroutineScope .
Теперь мы можем все это скомбинировать, функция setOnClick создает объединенный actor для управления ее действиями onClick . В случае множественных нажатий промежуточные действия будут игнорироваться, исключая таким образом ошибки ANR (приложение не отвечает), и эти действия будут выполняться в области видимости Activity . Поэтому при уничтожении активности все это будет отменено.
В данном примере мы задаем для Channel значение Conflated , чтобы он игнорировал часть событий, если их будет слишком много. Можно заменить его на Channel.UNLIMITED , если вы предпочитаете ставить события в очередь, не теряя ни одного из них, но все равно хотите защитить приложение от ошибки ANR.
Также можно комбинировать корутины и фреймворки Lifecycle, чтобы автоматизировать отмену задач, связанных с пользовательским интерфейсом:
Упрощаем ситуацию с обратными вызовами (часть 1)
Вот как можно преобразить использование API, основанного на обратных вызовах, благодаря Channel .
API работает вот так:
- requestBrowsing(url, listener) инициирует синтаксический разбор папки, расположенной по адресу url .
- Слушатель listener получает onMediaAdded(media: Media) для любого медиа-файла, обнаруженного в этой папке.
- listener.onBrowseEnd() вызывается по завершении синтаксического разбора папки
Вот старая функция refresh в поставщике контента для обозревателя VLC:
Создаем канал, который будет запускаться в refresh . Теперь обратные вызовы обозревателя будут лишь направлять медиа в этот канал, а затем закрывать его.
Теперь функция refresh стала понятнее. Она создает канал, вызывает обозреватель VLC, затем формирует список медиа-файлов и обрабатывает его.
Вместо функций select или consumeEach можно использовать for для ожидания медиа, и этот цикл будет разрываться, как только канал browserChannel закроется.
Упрощаем ситуацию с обратными вызовами (часть 2): Retrofit
Второй подход: мы вообще не используем корутины kotlinx, зато применяем корутинный core-фреймворк.
Смотрите, как на самом деле работают корутины!
Функция retrofitSuspendCall оборачивает запрос на вызов Retrofit Call , чтобы сделать из него функцию suspend .
При помощи suspendCoroutine мы вызываем метод Call.enqueue и приостанавливаем корутину. Предоставленный таким образом обратный вызов обратится к continuation.resume(response) , чтобы возобновить корутину откликом от сервера, как только тот будет получен.
Далее нам остается просто объединить наши функции Retrofit в retrofitSuspendCall , чтобы с их помощью возвращать результаты запросов.
Таким образом, вызов, блокирующий сеть, делается в выделенном потоке Retrofit, корутина находится здесь, ожидая отклика от сервера, а использовать ее в приложении – проще некуда!
Такая реализация вдохновлена библиотекой gildor/kotlin-coroutines-retrofit.
Также имеется JakeWharton/retrofit2-kotlin-coroutines-adapter с другой реализацией, дающей аналогичный результат.
Channel можно использовать и многими другими способами; посмотрите в BroadcastChannel более мощные реализации, которые могут вам пригодиться.
Также можно создавать каналы при помощи функции Produce.
Наконец, при помощи каналов удобно организовать коммуникацию между компонентами UI: адаптер может передавать события нажатий в свой фрагмент/активность через Channel или, например, через Actor .
Источник