RxJava
Совсем недавно RxJava была на пике популярности, очень многие восторгались новым способом написания приложений, которое называют реактивным программированием. С появлением корутин в Kotlin популярность стала угасать. Тем не менее остались сторонники реактивного программирования, которые продолжают использовать RxJava в своих проектах, в том числе и на языке Kotlin.
Общая страница по реактивному программированию — http://reactivex.io/. Документация по классам здесь.
Переход на RxJava 2/RxJava 3
С некоторых пор произошло разделение версии на две ветки: 1.x и 2.x.
Ветка 1.x была заморожена 1 июня 2017 (только исправления багов). 31 марта 2018 года ветку закрыли. Я начинал изучать тему на основе 1.x-ветки, поэтому не удивляйтесь, если будут попадаться старые примеры для первой версии. Постараюсь явно предупреждать о подобных случаях, так как различия довольны значительны.
Ветку 2.x заморозили в феврале 2021 года. Теперь следует использовать новую ветку 3.х.
Большинство примеров писалось под версию 2.0, поэтому в статьях возможны ошибки. Решил заново пройтись по статьям и поправить косяки.
Разница между двумя ветками 1 и 2 описана на сайте документации. Общие фундаментальные понятия остались прежними. Разница между 2 и 3 описана в другом документе.
Были переименованы или удалены некоторые виды классов Action и Function.
- Action0 -> Action
- Action1 -> Consumer
- Action2 -> BiConsumer
- ActionN -> Consumer
- Action3 — Action9 -> удалены
- Func0 -> Callable
- Func1 -> Function
- Func2 -> BiFunction
- Func3 — Func9 -> Function3 — Function9
Subscriber переименован в Disposable. А также CompositeSubscription в CompositeDisposable.
Классы
В RxJava огромное количество страшных слов, которые следует выучить.
- Observable
- Observer
- Subject, а также PublishSubject, AsyncSubject, BehaviorSubject, ReplaySubject
- Processor — подвид Subject с поддержкой BackPressure. AsyncProcessor, BehaviorProcessor, PublishProcessor, ReplayProcessor, UnicastProcessor.
- Future
- Single — ленивый эквивалент Future.
- Maybe
- Completable
- Consumer
- Disposable — бывший Subscription из RxJava 1.x
- Scheduler
- Flowable
Observable и Observer
Существуют две сущности: производители и потребители (Observables и Observers). Общая идея состоит в реализации трёх задач — создать поток данных, привести данные в нужный вид, получить исправленные данные.
Операторы
В RxJava есть специальные операторы, с помощью которых можно создавать новые Observable или менять уже существующие. В Java операторы реализованы в виде методов. Как правило, названия оператора совпадает с именем метода или незначительно отличается наличием дополнительных префиксов или суффиксов.
Полная страница операторов в алфавитном порядке представлена в документации. Желательно самостоятельно пройтись по всем операторам, чтобы знать их возможности. Вдруг вам пригодится какой-то конкретный оператор для ваших задач. Мы рассмотрим только часть популярных операторов.
Также имеется интерактивная схема RxMarbles всех операторов, позволяющая двигать данные, чтобы увидеть работу оператора в действии. Схему ещё называет камешковыми диаграммами. Камешковые диаграммы иллюстрируют работу операторов. Как правило, диаграмма содержит две горизонтальные временные оси, направленные слева направо. Фигурки на диаграммах («камешки») служат для визуализации событий. Есть три типа камушек — круг, пятиугольник и треугольник.
Если поток обработан успешно, то справа рисуется вертикальная чёрточка. В случае ошибки рисуется крестик. Если поток никогда не завершается, то ничего не рисуется на временной шкале.
Между осями располагается оператор, который изменяет последовательность событий, поступающих от исходного Observable и передаваемых в результирующий.
В большинстве случаев операторы выполняются синхронно. Вы создаёте цепочку из операторов и они последовательно выполняются.
В этом и заключается их мощь, когда мы начинаем их комбинировать. Сцепление нескольких операторов, разветвление потока на несколько подпотоков и последующее их слияние – вам нужно выучить некоторых из популярных операторов, чтобы их правильно использовать.
Источник
Урок 9. Room. RxJava
В этом уроке рассмотрим возможность совместного использования RxJava и Room. Как получать данные в Flowable, Single и Maybe.
Полный список уроков курса:
Если вы еще не знакомы с RxJava, то посмотрите соответствующий курс.
Подключение к проекту
В build.gradle модуля добавляйте dependencies
Flowable
В Dao указываем для метода выходной тип Flowable
В коде подписываемся и получаем данные
subscribeOn в случае с Flowable не нужен. Запрос в базу будет выполнен не в UI потоке. А вот, чтобы результат пришел в UI поток, используем observeOn
Теперь при любом изменении данных в базе, мы будем получать свежие данные в методе accept. И нам не надо будет каждый раз их снова запрашивать самим.
Если при запросе нескольких записей, вместо Flowable
> использовать Flowable :
то мы получим только первую запись из всего результата
Если же мы составляем запрос для получения только одной записи, то Flowable вполне подойдет. Давайте рассмотрим этот пример подробнее.
В коде подписываемся на Flowable
Итак, мы запрашиваем из базы запись по id. И тут возможны варианты.
Если запись есть в базе, то она придет в accept сразу же после подписки. И при каждом последующем обновлении этой записи в базе данных, она также будет приходить в accept.
Если записи нет, то сразу после подписки ничего не придет. А вот если она позже появится, то она придет в accept.
У вышеописанного примера есть минус. Если записи нет в базе, то Flowable вообще ничего нам не пришлет. Т.е. это будет выглядеть так, как будто он все еще выполняет запрос.
Это можно исправить следующим образом:
Хоть мы и ожидаем всего одну запись, но используем не Flowable , а Flowable
>. И если записи нет, то мы хотя бы получим пустой лист вместо полной тишины.
Single
Рассмотрим тот же пример с запросом одной записи, но с использованием Single. Напомню, что в Single может прийти только один onNext, либо OnError. После этого Single считается завершенным.
В коде подписываемся
В отличие от Flowable, с Single необходимо использовать onSubscribe, чтобы задать поток для выполнения запроса. Иначе в onError придет ошибка: java.lang.IllegalStateException: Cannot access database on the main thread since it may potentially lock the UI for a long period of time.
Снова рассматриваем варианты наличия требуемой записи в базе.
Если такая запись в базе есть, то она придет в onSuccess. После этого Single будет считаться завершенным и при последующих обновлениях этой записи ничего приходить уже не будет.
Если такой записи в базе нет, то мы в onError получим ошибку: android.arch.persistence.room.EmptyResultSetException: Query returned empty result set: SELECT * FROM employee WHERE
После этого Single будет считаться завершенным, и даже, если такая запись появится в базе, то нам ничего приходить уже не будет.
Maybe
Рассмотрим тот же пример с запросом одной записи, но с использованием Maybe. Напомню, что в Maybe может прийти либо один onNext, либо onComplete, либо OnError. После этого Maybe считается завершенным.
В коде подписываемся
С Maybe также необходимо использовать onSubscribe, чтобы задать поток для выполнения запроса.
Рассматриваем варианты наличия требуемой записи в базе.
Если такая запись в базе есть, то она придет в onSuccess. После этого Maybe будет считаться завершенным и при последующих обновлениях этой записи ничего приходить уже не будет.
Если такой записи в базе нет, то мы получим onComplete. После этого Maybe будет считаться завершенным, и даже, если такая запись появится в базе, то нам ничего приходить уже не будет.
В каком случае что лучше использовать?
Flowable подходит, если вы запрашиваете данные и далее планируете автоматически получать их обновления.
Single и Maybe подходят для однократного получения данных. Разница между ними в том, что Single логичнее использовать, если запись должна быть в базе. Если ее нет, вам придет ошибка. А Maybe допускает, что записи может и не быть.
Присоединяйтесь к нам в Telegram:
— в канале StartAndroid публикуются ссылки на новые статьи с сайта startandroid.ru и интересные материалы с хабра, medium.com и т.п.
— в чатах решаем возникающие вопросы и проблемы по различным темам: Android, Kotlin, RxJava, Dagger, Тестирование
— ну и если просто хочется поговорить с коллегами по разработке, то есть чат Флудильня
— новый чат Performance для обсуждения проблем производительности и для ваших пожеланий по содержанию курса по этой теме
Источник
Урок 1. Основы RxJava. Observable и Observer.
Этот урок начнем с паттерна Наблюдатель и разберемся, как он используется в RxJava. Рассмотрим основные понятия: Observable и Observer, и какие типы событий они используют. Далее разберем один теоретический и один практический примеры.
Версии RxJava
На момент создания этого курса, вторая версия RxJava была в статусе Release Candidate. Поэтому начало курса описывает первую версию. Но, во-первых, вторая версия очень похожа на первую, и уроки актуальны для обоих версий. А, во-вторых, если вы новичок в теме Rx, то вам будет сложно сразу понять главное новшество второй версии.
Поэтому предлагаю вам спокойно читать уроки, написанные по первой версии, т.к. все эти знания будут актуальны и для второй. А начиная с 11 урока мы просто перейдем на вторую версию. К тому моменту вы уже без проблем сможете понять главные отличия между ними.
Теория
Прежде чем начать обсуждать механизмы RxJava, давайте вспомним паттерн Наблюдатель. В нем есть объект, который генерирует какие-то события, и есть объект или объекты, которые подписываются и получают эти события. Я думаю, что в работе вы постоянно используете этот паттерн. Самый простой пример — обработчик нажатия кнопки.
В Java даже есть инструменты для этого паттерна — класс Observable и интерфейс Observer. Реализация интерфейса Observer — это объект, который ожидает событие. Observable — это класс, который всем переданным ему Observer-объектам сообщит о том, что событие наступило.
Эти же названия используются и в RxJava. И смысл их остался тем же: Observable генерирует событие, а Observer получает его. Но было значительно расширено само понятие «событие». В RxJava события, которые Observable передает в Observer, можно рассматривать как поток данных. И события в этом потоке имеют три типа:
1) Next — очередная порция данных
2) Error — произошла ошибка
3) Completed — поток завершен и данных больше не будет.
В качестве примера давайте рассмотрим поиск авиарейса. Пусть у нас есть метод, который умеет бегать по сайтам авиакомпаний, искать там по заданным критериям необходимый рейс и все полученные рейсы возвращать нам. Мы задаем ему пункт назначения, пункт отправления, дату — и он начинает работу.
По мере того, как он один за другим обрабатывает сайты, он генерирует события Next, в которые передает результат. Т.е. каждый найденный рейс — новое событие Next.
Если в работе метода произошла какая то серьезная ошибка и продолжение работы невозможно, метод отправит событие Error.
Если метод успешно обработал все известные ему сайты и закончил работу, он отправит нам событие Completed.
Какие места занимают во всей этой схеме Observable и Observer? Метод поиска при вызове возвращает нам объект Observable. А мы создаем Observer, в котором пишем код для обработки полученных событий, и подписываемся на этот Observable. По мере работы метода, Observable будет генерировать события, которые мы будем получать в нашем созданном Observer:
— если пришло событие Next с очередным рейсом, то берем рейс и, например, добавляем его в адаптер списка. Таким образом рейсы один за другим будут появляться в списке по мере их нахождения.
— если пришло событие Completed, значит мы можем выключить ProgressBar и уведомить пользователя, что поиск завершен
— если пришло событие Error, то уведомляем пользователя, что поиск был прерван с ошибкой
В этом примере я описал все три типа событий, чтобы было понятно, зачем они нужны. Но, вовсе необязательно в каждом потоке данных вы встретите все эти типы. Например, Observable, который сообщает нам о нажатиях на кнопку. В этом случае мы будем получать только событие Next при каждом нажатии. Событие Completed нам не придет, потому что пользователь может сколько угодно раз нажимать эту кнопку и никакого последнего нажатия там не будет. Ну и ошибку тут мы вряд ли получим.
Надеюсь, после этого введения и примеров у вас появилось понимание роли объектов Observable и Observer. Самое сложное поначалу — это просто отличать их друг от друга )
Практика
Давайте рассмотрим простейший пример Observable.
Создание Observable выглядит так:
Observable — это описание означает, что Observable будет предоставлять данные типа String, т.е. каждое событие Next, которое он будет генерировать, будет приходить с объектом типа String. Метод Observable.from создает для нас Observable, который возьмет данные из указанного String массива и передаст их получателям
Создаем получателя, т.е. Observer:
Observer — получатель данных типа String. Напомню, что он от Observable ожидает получения событий трех типов Next, Error и Completed. И под каждый тип у Observer есть свой одноименный метод:
onNext(String s) — в этот метод будут приходить данные
onError(Throwable e) — будет вызван в случае какой-либо ошибки и на вход получит данные об ошибке
onCompleted() — уведомление о том, что все данные переданы
Оба объекта созданы, осталось подписать Observer на Observable методом subscribe:
Сразу после подписки Observable передаст в Observer все данные (в метод onNext) и сигнал о том, что передача завершена (метод onCompleted).
Этот простой пример призван показать взаимодействие между Observable и Observer. Мы использовали в нем Observable, который умеет передавать данные из предоставленного ему массива. Но это только один из видов Observable. Дальше мы научимся создавать различные Observable.
Что дальше
Когда я только начинал изучать RxJava, у меня после рассмотрения таких примеров возникали вопросы типа:
— В какой момент Observable начал генерировать события: после создания или после подписки на него Observer-а?
— Что будет если подписать несколько Observer-ов: каждый получит свои данные или те, кто подписался позже, не получит ничего?
— Как создать свой Observable, который будет отправлять результаты работы моего кода?
— Как сделать, чтобы работа в Observable выполнялась в одном потоке (в смысле Thread), а результаты приходили в другом?
По мере дальнейшего изучения и экспериментов, эти вопросы были успешно решены, и обо всем этом я расскажу в следующих уроках этого курса.
Кроме того, мы рассмотрим, какие возможности работы с потоками данных предоставляет RxJava. Например, вы можете взять поток данных и выполнять над ним различные преобразования: фильтровать данные или конвертировать данные из одного типа в другой. Можете объединять данные из разных потоков данных последовательно, параллельно или попарно.
Также в RxJava присутствует отличный инструментарий для работы с потоками (речь уже не о потоках данных, а потоках, которые Thread). Вы можете указать один поток для генерации данных в Observable, другой поток для выполнения каких либо операций преобразования над этими данными и третий поток, в котором данные будут приходить в Observer.
В общем, тема очень интересная и полезная, и этот курс поможет вам в ней разобраться.
Источник