Rxjava 2 android для новичков

RxJava

Совсем недавно RxJava была на пике популярности, очень многие восторгались новым способом написания приложений, которое называют реактивным программированием. С появлением корутин в Kotlin популярность стала угасать. Тем не менее остались сторонники реактивного программирования, которые продолжают использовать RxJava в своих проектах, в том числе и на языке Kotlin.

Общая страница по реактивному программированию — http://reactivex.io/. Документация по классам здесь.

Переход на RxJava 2/RxJava 3

С некоторых пор произошло разделение версии на две ветки: 1.x и 2.x.

Ветка 1.x была заморожена 1 июня 2017 (только исправления багов). 31 марта 2018 года ветку закрыли. Я начинал изучать тему на основе 1.x-ветки, поэтому не удивляйтесь, если будут попадаться старые примеры для первой версии. Постараюсь явно предупреждать о подобных случаях, так как различия довольны значительны.

Ветку 2.x заморозили в феврале 2021 года. Теперь следует использовать новую ветку 3.х.

Большинство примеров писалось под версию 2.0, поэтому в статьях возможны ошибки. Решил заново пройтись по статьям и поправить косяки.

Разница между двумя ветками 1 и 2 описана на сайте документации. Общие фундаментальные понятия остались прежними. Разница между 2 и 3 описана в другом документе.

Были переименованы или удалены некоторые виды классов Action и Function.

  • Action0 -> Action
  • Action1 -> Consumer
  • Action2 -> BiConsumer
  • ActionN -> Consumer
  • Action3 — Action9 -> удалены
  • Func0 -> Callable
  • Func1 -> Function
  • Func2 -> BiFunction
  • Func3 — Func9 -> Function3 — Function9

Subscriber переименован в Disposable. А также CompositeSubscription в CompositeDisposable.

Классы

В RxJava огромное количество страшных слов, которые следует выучить.

  • Observable
  • Observer
  • Subject, а также PublishSubject, AsyncSubject, BehaviorSubject, ReplaySubject
  • Processor — подвид Subject с поддержкой BackPressure. AsyncProcessor, BehaviorProcessor, PublishProcessor, ReplayProcessor, UnicastProcessor.
  • Future
  • Single — ленивый эквивалент Future.
  • Maybe
  • Completable
  • Consumer
  • Disposable — бывший Subscription из RxJava 1.x
  • Scheduler
  • Flowable

Observable и Observer

Существуют две сущности: производители и потребители (Observables и Observers). Общая идея состоит в реализации трёх задач — создать поток данных, привести данные в нужный вид, получить исправленные данные.

Операторы

В RxJava есть специальные операторы, с помощью которых можно создавать новые Observable или менять уже существующие. В Java операторы реализованы в виде методов. Как правило, названия оператора совпадает с именем метода или незначительно отличается наличием дополнительных префиксов или суффиксов.

Полная страница операторов в алфавитном порядке представлена в документации. Желательно самостоятельно пройтись по всем операторам, чтобы знать их возможности. Вдруг вам пригодится какой-то конкретный оператор для ваших задач. Мы рассмотрим только часть популярных операторов.

Также имеется интерактивная схема RxMarbles всех операторов, позволяющая двигать данные, чтобы увидеть работу оператора в действии. Схему ещё называет камешковыми диаграммами. Камешковые диаграммы иллюстрируют работу операторов. Как правило, диаграмма содержит две горизонтальные временные оси, направленные слева направо. Фигурки на диаграммах («камешки») служат для визуализации событий. Есть три типа камушек — круг, пятиугольник и треугольник.

Если поток обработан успешно, то справа рисуется вертикальная чёрточка. В случае ошибки рисуется крестик. Если поток никогда не завершается, то ничего не рисуется на временной шкале.

Между осями располагается оператор, который изменяет последовательность событий, поступающих от исходного Observable и передаваемых в результирующий.

В большинстве случаев операторы выполняются синхронно. Вы создаёте цепочку из операторов и они последовательно выполняются.

В этом и заключается их мощь, когда мы начинаем их комбинировать. Сцепление нескольких операторов, разветвление потока на несколько подпотоков и последующее их слияние – вам нужно выучить некоторых из популярных операторов, чтобы их правильно использовать.

Источник

Русские Блоги

Подробный Android RxJava 2

Эта статья основана на речи Джейка Уортона на GOTO CopenHagen 2016.

Следующая версия (2.0) RxJava все еще находится в разработке. Хотя наблюдаемое, управление подпиской и противодавление были полностью переписаны, в операторе практически нет изменений. В этой статье вы узнаете, как перенести свои библиотеки и приложения на RxJava 2 и как работать с двумя версиями RxJava.

Зачем использовать реактивное программирование

Почему реактивное программирование внезапно стало популярным? Если вы не можете с самого начала определить приложение как синхронный режим, пока существует асинхронный процесс, это нарушит традиционный метод программирования, к которому вы привыкли. «Сломанный» не означает, что ваше приложение не может работать, но вы столкнетесь с повышенной сложностью, вызванной асинхронностью.

Следующий пример хорошо иллюстрирует эту проблему:

Этот простой пример содержит объект пользователя, и для его изменения можно вызвать установщик. Если мы используем только синхронную однопоточную обработку, мы можем полагаться на желаемые результаты: создать экземпляр, распечатать пользователя, изменить его атрибуты и снова распечатать пользователя.

Когда его нужно обрабатывать асинхронно. Например, мы хотим отобразить модификацию пользователя на стороне сервера. Последние два метода в приведенном выше примере необходимо обрабатывать асинхронно. Как мы можем изменить код, чтобы адаптироваться к этому изменению?

Все, что вы можете сделать, это ничего не делать. Вы можете предположить, что модификация асинхронного вызова на стороне сервера прошла успешно, и вы можете изменить пользователя локально. Это распечатает модификацию на стороне сервера. Очевидно, такая модификация — не лучшая идея. Сеть нестабильна, и сервер может возвращать ошибку. В настоящее время вам необходимо решить эти проблемы локально.

С этой проблемой можно справиться очень просто. Например, вызовите runnable после успешного асинхронного запроса на стороне сервера. Теперь это реактивное программирование.

Однако мы не рассмотрели возможные проблемы, например, сбой сетевого запроса. Возможно, мы создадим собственный слушатель, чтобы при возникновении ошибки мы могли ее обработать.

Мы можем уведомить пользователя об ошибке. Мы можем повторить попытку автоматически. Все эти методы доступны, и именно так большинство людей решают асинхронные проблемы.

Читайте также:  Как перепрошить андроид через рекавери пошаговая инструкция

Проблема возникает в том случае, если вам приходится сталкиваться с ней чаще. Вам необходимо поддерживать множественные вызовы: например, при заполнении формы в приложении вам нужно изменить несколько атрибутов пользователя. Или есть несколько асинхронных вызовов, например, когда один асинхронный вызов успешен, а другой необходимо вызвать.

Следует отметить, что мы обсуждаем проблемы, связанные с Android. Так что есть много других вопросов, которые нужно учитывать. Например, в нашем success В обратном вызове нам также нужно передать данные в пользовательский интерфейс. Но Android Activity имеет жизненный цикл и может быть переработан в любое время. Если пользовательский интерфейс был разрушен при возврате этого асинхронного вызова, вы столкнетесь с проблемами.

Есть и другие способы решения вышеуказанных проблем. Мы можем проверить состояние представления перед его изменением. Мы также можем создать анонимный тип, хотя это вызовет кратковременную утечку памяти, поскольку он содержит ссылку на Activity. Если действие исчезло, этот обратный вызов все равно будет выполняться в фоновом режиме.

И последнее: мы еще не определили, в каком потоке выполняются эти обратные вызовы. Возможно, наш обратный вызов выполняется в дочернем потоке, тогда мы отвечаем за изменение пользовательского интерфейса в основном потоке.

Мы добавили много другого кода в действие, которое не связано с первоначальным намерением этого кода, например, запуск асинхронной задачи и обработка результата этого обратного вызова. Мы еще не обработали ввод данных пользователем, обработку нажатий кнопок и ввод данных из нескольких полей. Исходный код — это просто описание ситуации, когда вы сталкиваетесь с реальным приложением, все эти проблемы накладываются друг на друга.

Отзывчивое мышление

В некотором смысле все в приложении асинхронно. У нас есть сетевой запрос, отправляем запрос, и он будет долго возвращаться. Из-за этого мы не можем заблокировать поток пользовательского интерфейса. Следовательно, его можно обрабатывать только в фоновом потоке. У нас есть файловая система, включая чтение и запись базы данных, и даже SharedPreferences Он также блокирует основной поток.

Пользователь также представляет собой асинхронные данные. Мы отображаем данные в пользовательском интерфейсе, и пользовательский интерфейс будет реагировать, щелкнув кнопку или изменив поле ввода. Все это происходит асинхронно. Мы не можем извлекать пользовательские данные синхронно, мы можем только ждать их возврата.

Многие думают, что можно написать однопоточное приложение, и любая задача выполняется в основном потоке. Однако сам пользовательский интерфейс асинхронен. Обработка асинхронных данных, получаемых время от времени, и одновременная обработка различных состояний приложения. Вы не можете заблокировать основной поток при этом. Это корень сложности приложения.

Сетевой запрос

Без сетевых запросов сложно найти приложение. Вы также должны иметь дело с файлами и базами данных, которые также являются асинхронными. Наконец, пользовательский интерфейс также стал источником асинхронности. Следовательно, все аспекты Android являются асинхронными, иначе использование предыдущего метода программирования с однопоточной синхронизацией в конечном итоге навредит вам.

Мы должны рассмотреть модель, в которой наш код находится на среднем уровне, выступая в качестве арбитра различных состояний. Вместо того, чтобы исправлять всю асинхронную обработку. Мы можем позволить пользовательскому интерфейсу напрямую подписаться на базу данных и вносить соответствующие изменения при изменении данных. Мы можем изменить сетевой запрос или базу данных при нажатии кнопки, вместо того, чтобы распространять событие щелчка после получения нажатия кнопки.

Точно так же, когда сетевой запрос вернется, было бы лучше, если бы данные могли быть обновлены. Мы знаем, что при обновлении данных пользовательский интерфейс также автоматически обновляется, так что код для пассивного обновления интерфейса может быть удален. Если Android выполняет некоторые асинхронные задачи, такие как вращение, было бы хорошо, если бы пользовательский интерфейс мог автоматически обновляться, и лучше автоматически запускать некоторые фоновые задачи.

Наконец, мы можем удалить много кода, который поддерживает различные состояния.

RxJava

Вот почему мы хотим разработать RxJava. И это также настоящая адаптивная библиотека для Android. Это также первая библиотека на Android, которую можно использовать для реактивного программирования. RxJava 2 поддерживает все версии java, поддерживаемые Android.

В основном он выполняет три функции:

  • Представляет коллекцию классов источников данных
  • Коллекция классов для мониторинга источников данных
  • Набор методов для изменения и организации данных

Эти источники данных начинают выполнение только тогда, когда вы начинаете мониторинг. Если вы отмените мониторинг, выполняемая задача будет отменена одновременно. Сетевой запрос может быть абстрагирован как отдельная задача, но поток нажатия кнопки может быть неограниченным, пока существует ваш пользовательский интерфейс, даже если вы подписаны только на событие нажатия кнопки. Более того, поток кликов по кнопке также может быть пустым или никогда не может остановиться. Все это повлияло на традиционную модель наблюдателя. У нас есть сгенерированные данные, и существует соглашение о том, как они определяются. Все, что мы хотим делать, это отслеживать их. Мы хотим добавить слушателя, который будет получать уведомления об изменении данных.

От текучего к наблюдаемому

Оба этих основных типа появятся в RxJava 2. Оба типа используются для моделирования одного и того же типа (который может содержать от 0 до n элементов). Зачем вам нужны два типа для одного и того же типа данных?

Это из-за проблемы противодавления. «Я» (автор) не хочет слишком подробно рассказывать об обратном давлении, хотя это может замедлить передачу данных. Система, в которой мы находимся, имеет только ограниченные ресурсы, но если вы не можете быстро справиться с обратным давлением, это замедлит источник данных, который отправляет вам данные.

В RxJava 1 каждый тип системы имеет противодавление, но в RxJava 2 у нас есть два типа. Поскольку каждый тип имеет обратное давление, с которым необходимо иметь дело, но не все источники данных реализуют его, в конечном итоге вы столкнетесь с аварийным отказом. Это связано с тем, что необходимо иметь дело с противодавлением. В RxJava 2 вы можете узнать, поддерживает ли система типов, которую вы используете, обратное давление.

Читайте также:  Телевизор tcl андроид инструкция

Например, если у нас есть событие касания в качестве источника данных, это то, что мы не можем замедлить. Мы не можем сказать пользователю, чтобы он замедлился, зная, что мы щелкнем второй раз после рисования изменений, полученных из последнего события. на интерфейсе. Мы можем справиться с этим только другими способами, такими как кнопка отключения или отображение других интерфейсов, чтобы сделать его медленнее, но отправку события после щелчка нельзя замедлить.

Можно сравнить с базой. Например, если мы хотим получить большой набор результатов, мы можем захотеть получить только несколько строк этого набора результатов в какой-то момент. База данных может хорошо отражать проблему: в ней есть концепция курсоров. Но концепция потока событий касания и базы данных отличается. Потому что нет возможности замедлить работу пальцев пользователя. В RxJava 1 вы можете видеть, что оба этих типа Observable Используйте, поэтому вам придется иметь дело с проблемой обратного давления во время выполнения, и в конечном итоге вы получите исключение или ваше приложение выйдет из строя.

Из-за двух разных типов они имеют противодавление. Поскольку они моделировали один и тот же тип данных, они использовали тот же подход к проблеме передачи данных в обратный вызов. Интерфейсы прослушивания двух источников данных также кажутся очень близкими. Первый метод называется onNext , Этот метод также является методом передачи данных. так долго как Observable последний Flowable Если сгенерировано несколько данных, этот метод будет вызван один раз для генерации одних данных. Таким образом можно обработать каждый элемент.

Это может быть бесконечно. Расщепление, которое вы слышали при нажатии кнопки, затем onNext Метод будет вызываться каждый раз, когда пользователь щелкает мышью. Для ограниченных источников данных у нас будет два заключительных события, одно из которых complete ,один error , полное означает успех, ошибка означает ошибку. onComplete с onError Это называется финальным событием, потому что эти два метода вызываются onNext Метод больше не будет вызываться. Разница заключается в методе onSubscribe 。

Если вы знаете RxJava 1, то вам нужно обратить внимание на следующее: когда вы подписываетесь на Observable или Flowable, вы создаете ресурс. После использования ресурсы необходимо утилизировать. Вот этот onSubscribe Обратный вызов будет вызван, как только он начнет прослушивать Observable или Flowable, и вернет вам один из двух типов в зависимости от типа подписки.

Для Observable вы можете позвонить dispose Метод, то есть я израсходовал этот ресурс, мне не нужен обратный вызов. Что делать, если у нас есть сетевой запрос? Это отменит сетевой запрос. Если вы слушаете бесконечный поток нажатий кнопок, то звоните dispose Метод состоит в том, что вы больше не хотите принимать события щелчка. И воля onSet Слушатель представления. Это то же самое для типа Flowable. Хотя у него другое имя, использование такое же. Оно имеет cancel Методы и dispose Эффект от метода такой же. Есть еще одно отличие, которое называется request Method, и именно здесь в API появляется обратное давление. Этот метод запроса сообщает типу Flowable, что вам нужно больше элементов.

Поток ответа

. Это стандартная мера обработки асинхронного потока без обратного давления блокировки.

Я собираюсь поговорить о том, почему имена типов одноразового использования и подписки так различаются, почему в их методах один называется dispose, а другой — cancel, вместо того, чтобы один наследует другой и добавляет метод reqeust. Причина в том, что существует такое понятие, как реактивный поток. Это источник других второстепенных явлений.

Мы будем использовать тип подписчика и тип подписки в качестве посредника. На самом деле они являются частью потока ответов, и поэтому имеют разные имена.

Теперь давайте посмотрим на пример, в котором есть два определения диспетчера пользователей. До того, как мы получили пользователя из класса и отобразили его в интерфейсе, это уже было некорректно. Теперь мы рассматриваем эту модель как наблюдателя пользователя, источник данных для пользовательского объекта. Пока пользователь изменяется, будет отправляться уведомление, после чего вы сможете отреагировать на изменение и отобразить измененные данные. В настоящее время нет необходимости учитывать, когда лучше всего отображать изменения пользователя среди различных событий, происходящих в системе.

В RxJava есть три подмножества наблюдаемых. Один из них — Single, этот тип содержит элемент или содержит ошибку. Это не похоже на поток, это больше похоже на один асинхронный источник. И это не включает обратное давление. Например, вы вызываете метод, возвращаете экземпляр типа или генерируете исключение. Сингл — это та же концепция. Вы подписываетесь на сингл, либо возвращаете данные, либо получаете сообщение об ошибке. Разница в том, что он отзывчивый.

Второй — Completable. Это похоже на метод, возвращающий void. Он завершится успешно или выдаст исключение. Это похоже на отзывчивый Runnable. Completable содержит набор кода, который может быть запущен как успешно, так и безуспешно.

Последний — Может быть, этот тип доступен только в RxJava 2. У него либо есть предмет, ошибки, либо его может не быть. Его можно считать необязательным. Метод, который возвращает необязательное значение, если он не генерирует исключение, он всегда что-то возвращает. Но возвращаемое необязательное значение может иметь значение, а может и не иметь.

в случае setName Методы и setAge Вызов методов асинхронный, они либо успешны, либо терпят неудачу, и на самом деле они не возвращают данные. Так что комплектный тип им больше всего подходит.

Создать источник (Источник)

Этот пример используется, чтобы показать, как создавать источники и как интегрировать существующие адаптивные источники вместе.

У этих типов есть статические методы, которые можно использовать для их создания. Вы также можете создать из одного значения, из массива или любого проходимого типа. Но есть два метода, которые считаются наиболее часто используемыми: синхронный или асинхронный.

Читайте также:  Довнлоад хелпер для андроид

первый fromCallable Этот метод используется для обработки синхронного поведения, которое возвращает только значение. Большим преимуществом этого метода является то, что он позволяет вам генерировать исключение из вызываемого объекта. Если есть сетевой запрос, он выдаст исключение ввода-вывода. Если это вызывает исключение, мы можем поймать ошибку. Если этот запрос будет успешным, мы можем начать с onNext Способ получения желаемого результата.

Может быть вызван на все пять типов fromCallable метод.

в Maybe Тип и Completable Есть два других метода для часто используемых типов. Ни один из типов не возвращает значения. Это просто запускаемый, но он отзывчивый.

Лучший способ создать наблюдаемое — create . Мы передадим обратный вызов, который будет вызываться при появлении нового подписчика.

Отличается от fromCallable , create Метод можно вызывать несколько раз onNext . Еще одно преимущество состоит в том, что теперь мы можем моделировать асинхронные данные. Если я сделаю http-запрос, я могу позвонить onNext Метод завершения асинхронного вызова. Еще одно преимущество заключается в том, что вы можете справиться с ситуациями отказа от подписки.

Если вы перестанете прислушиваться к HTTP-запросам, нет причин для продолжения выполнения. Мы можем добавить операцию отмены, чтобы отменить HTTP-запрос и освободить ресурсы.

Источник монитора

Когда вы подписываетесь на Observable, вы не используете эти интерфейсы напрямую. subscribe Начните прослушивание после вызова метода. Так как же отказаться от подписки?

У нас есть тип под названием DisposableObserver , Этот тип автоматически обрабатывает множество вещей, вам нужно только позаботиться о самом Observable. Итак, как нам утилизировать?

Это достигнуто disposable , Таким образом, вы можете вызвать метод dispose, и он загрузит этот метод по цепочке вызовов. В RxJava 2 есть новый метод под названием subscribeWith . Он возвращает объект, и вы можете вызвать для него метод удаления.

Все вышеперечисленные четыре типа не имеют обратного давления. Итак, как бороться с типом противодавления:

Тип противодавления несколько иной. Вы можете использовать эту аналогию, например, вы не откроете файл без метода закрытия, но получите курсор без метода закрытия.

Оператор

Операторы делают три вещи:

  • Управляйте или комбинируйте данные
  • Рабочий поток
  • Оперативная передача данных

Отправьте строку и получите новую строку.

В реактивном мире у нас есть Observable и мы реализуем операцию через оператора. В этом примере map Это оператор, мы можем получить переданные данные и обработать их, чтобы создать новые данные.

Я хочу отслеживать передачу данных в другом потоке. Таким образом, пользовательские данные будут находиться в фоновом потоке, но пользовательские данные будут обрабатываться в основном потоке. потом observeOn Это тот оператор, который нам нужен. Поскольку мы изменили поток, порядок, в котором вы применяете эти операторы, имеет решающее значение.

Если мы выдадим сетевой запрос, и сетевой запрос будет выполнен синхронно. Что ж, мы определенно не хотим, чтобы это происходило в основном потоке. Мы можем использовать операторы для изменения того, где мы подписываемся на Observable, то есть где в конечном итоге выполняется запрос. Когда мы подпишемся на возврат в фоновом режиме, он перейдет к выполнению в фоновом потоке. Ввод-вывод — это просто пул потоков, который вы можете использовать, он будет выполняться в пуле потоков и, наконец, уведомит слушателя об изменениях.

Потому что мы observeOn Используется позже map Оператор, он будет выполняться в основном потоке Android. Мы не хотим обрабатывать возврат http в основном потоке, мы хотим обработать ответ http, а затем вернуться в основной поток.

Другие операторы

Есть и другие операторы, которые могут обрабатывать Observable и возвращать другой тип. Один оператор first() , Он вернет первый элемент списка, объект типа Single. В RxJava 1 этот метод вернет Observable, который испускает только один элемент. Если этот Observable пуст, будет возвращена ошибка. Потому что мы знаем, что в сингле есть элемент или ошибка.

Другие операторы, такие как firstElement() Вернет объект типа Maybe. Когда наблюдаемый объект пуст, объект типа Maybe может быть завершен без генерации исключения. Если вас интересует только успех или неудача, то объект типа Completable определенно то, что вам нужно. Эти текучие вещества также существуют.

Начните использовать реактивное программирование

Если мы хотим сделать наш первый пример адаптивным, мы можем подписаться на пользователя и сказать: «Мы хотим получать уведомления в основном потоке, а затем я хочу отображать его в пользовательском интерфейсе». Каждый раз, когда пользователь меняется, эта покупка прокси будет выполняться автоматически, и мы можем автоматически видеть обновление в интерфейсе. Нам не нужно беспокоиться об управлении собой.

Однако мы должны не забывать управлять возвращенными одноразовыми предметами, потому что мы находимся в мире Android и хотим, чтобы эти коды перестали работать, когда действие исчезает. в onDestroy В методе мы избавляемся от этих расходных материалов.

В заключение

RxJava 2 имеет дело с асинхронной проблемой в Android. Будь то сетевой запрос или сам Android, база данных или даже событие. И напишите код, который реагирует на изменения в этих источниках, а не код, который реагирует на изменения и управляет состоянием.

Если вы используете RxJava 1, естьобъект взаимодействияможно использовать. Вы можете переключаться между двумя версиями типа.

RxJava 2 — это не новое знание. Реактивное программирование не ново ни в каком измерении. Сам Android — это очень отзывчивый мир, и нас научили использовать некоторое последовательное выполнение, проще описать способ написания кода.

Реактивное программирование позволяет нам моделировать разработку под Android более разумным и асинхронным способом. Примите асинхронность источника вместо того, чтобы самостоятельно поддерживать различные состояния. Пусть разработка Android-приложения будет по-настоящему отзывчивой.

Источник

Оцените статью