Subject no subject android

RxRelay – это магия? Subject vs RxRelay

В Android-комьюнити я встречал три типа разработчиков, которые сталкивались с RxRelay:

  1. Те, кто не понимают зачем RxRelay используется в их проекте, зачем он нужен и чем отличается от Subject
  2. Те, кто думают, что RxRelay «проглатывает» ошибки или «после того, как произошла ошибка RxRelay, продолжит работать, а Subject — нет» (та самая магия)
  3. Те, кто действительно знает, что такое RxRelay.

Пока первые два типа встречаются чаще, я решил написать статью, которая поможет разобраться в том, как работает RxRelay и проверить его «магические» свойства.

Если вы используете RxJava, то вероятно вы пользуетесь Subject или RxRelay, чтобы прокидывать события из одной сущности в другую или делать из императивного кода реактивный.

Давайте проверим пункт №2 и посмотрим, в чем разница между RxRelay и Subject. Итак, у нас есть две подписки на один relay, при клике на кнопку мы пушим единицу в этот relay.

Три раза подряд кликаем на кнопку и видим вот такой лог.
D/test: Цепочка с ошибкой: onNext
D/test: Цепочка без ошибки: onNext

D/test: Цепочка с ошибкой: onError
D/test: Цепочка без ошибки: onNext

D/test: Цепочка без ошибки: onNext

Если заменить переменную RxRelay на PublishSubject, лог не изменится. И вот почему:

При первом клике мы пушим в наш relay данные. Оба подписчика срабатывают.

При втором клике в цепочке у первого подписчика (disposable1) возникает ошибка.

При третьем клике первый disposable1 уже не срабатывает, так как он получил терминальное состояние onError. Дальше будет работать только второй disposable2.

Так будет и с Subject, и с RxRelay. Напомню, что в rx ошибки идут вниз по цепочке к подписчику (downstream) и выше места, где они возникли, не попадают. В итоге мы проверили, что цепочка на основе RxRelay не может работать после того, как возникла ошибка.

Так если разницы в поведении Subject и RxRelay нет, то в чем их отличие?

Вот что пишет сам разработчик в README на гитхабе:
“Basically: A Subject except without the ability to call onComplete or onError.”

То есть это просто Subject без методов onComplete и onError, даже исходный код классов почти одинаковый. Если мы вызовем на Subject эти методы, то он перестанет работать, так как получит терминальное состояние. Поэтому автор библиотеки решил, что стоит убрать эти методы, потому что те разработчики, которые не знают об этом свойстве Subject могут случайно вызвать их.

Вывод: единственное отличие RxRelay от Subject — это отсутствие двух методов onComplete и onError, чтобы разработчик не мог вызвать терминальный стейт.

Источник

Основы RxJava Subject — Publish, Replay, Behavior и Async Subject

Mar 1, 2017 · 3 min read

Эта статья про все Subject которые есть в RxJava.

  • Publish Subject
  • Replay Subject
  • Behavior Subject
  • Async Subject
Читайте также:  Что делать при ошибка при синтаксическом анализе пакета андроид

У нас уже есть простой проект на RxJava2 для обучения RxJava(многие разработчики учатся с помощью этого проекта), и я уже добавил в него примеры по Subject. Форкаем,клонируем,билдим,запускаем и учим RxJava. Смотрим проект тут.

GitHub — amitshekhariitbhu/RxJava2-Android-Samples: RxJava 2 Android Examples — Migration From…

RxJava2-Android-Samples — RxJava 2 Android Examples — Migration From RxJava 1 to RxJava 2 — How to use RxJava 2 in…

И так, давайте изучать Subject.

Что такое Subject?

Subject — это своего рода мост или прокси, доступный в некоторых реализациях ReactiveX, который действует как наблюдатель(Observer) и наблюдаемый(Observable).Так как он является наблюдателем(observer), он может подписаться на один и более наблюдаемых(Observables), и потому как он наблюдатель(Observer), он может пройти через все элементы, за которыми он наблюдает, повторно передав их, а также может излучать(emit) новые элементы.

Я верю: обучение с помощью примеров лучший способ учиться.

Observable: Предположим, что профессор является наблюдаемым(observable). Профессор учит какой-то теме.

Observer: Предположим, что студент наблюдатель(observer). Студент слушает(или наблюдает) тему, которую преподает профессор.

Publish Subject

Излучает(emit) все последующие элементы наблюдаемого источника в момент подписки.

Здесь, есл и студент вошел в аудиторию, он просто хочет слушать с того момента, когда он вошел в аудиторию. И так, Publish будет лучшим выбором для использования.

Смотрите пример ниже:

Replay Subject

Излучает(emit все элементы источника наблюдаемого(Observable), независимо от того, когда подписчик(subscriber) подписывается.

Здесь, если студент с опозданием вошел в аудиторию, он хочет послушать лекцию с самого начала. И так, для этого мы должны использовать Replay.

Смотрите пример ниже:

Behavior Subject

Он излучает(emit) совсем недавно созданый элемент и все последующие элементы наблюдаемого источника, когда наблюдатель(observer) присоединяется к нему.

Здесь, если студент вошел в аудиторию, он хочет слушать самые последние вещи(не с начала) преподаваемые профессором таким образом, что он получает идею контекста. Итак, здесь мы будем использовать Behavior.

Смотрите пример ниже:

Async Subject

Он выдает только последнее значение наблюдаемого источника(и только последнее).

Здесь, если студент пришел в любой момент времени в аудиторию, и он хочет слушать только о последней вещи(и только последней) которую учат. Итак, здесь мы будем использовать Async.

Смотрите пример ниже:

И так, всякий раз когда будете застревать в одном из вышесказанных случаев, RxJava Subject станет вашим лучшим другом.

Источник

Введение в RxJava: Ключевые типы

Содержание:
  • Часть первая – Вступление
    1. Почему Rx?
    2. Ключевые типы
    3. Жизненный цикл подписки
  • Часть вторая – Последовательности
    1. Создание последовательности
    2. Фильтрация последовательности
    3. Исследование
    4. Агрегация
    5. Трансформация последовательностей
  • Часть третья – Управление последовательностями
  • Часть четвертая – Параллельность

Ключевые типы

Rx базируется на двух фундаментальных типах, в то время, как некоторые другие расширяют их функциональность. Этими базовыми типами являются Observable и Observer, которые мы и рассмотрим в этом разделе. Мы также рассмотрим Subject’ы – они помогут в понимании основных концепций Rx.

Rx построена на паттерне Observer. В этом нет ничего нового, обработчики событий уже существуют в Java (например, JavaFX EventHandler[1]), однако они проигрывают в сравнении с Rx по следующим причинам:

  • Обработку событий в них сложно компоновать
  • Их вызов нельзя отложить
  • Могут привести к утечке памяти
  • Не существует простого способа сообщить об окончании потока событий
  • Требуют ручного управления многопоточностью.

Observable

Observable – первый базовый тип, который мы рассмотрим. Этот класс содержит в себе основную часть реализации Rx, включая все базовые операторы. Мы рассмотрим их позже, а пока нам следует понять принцип работы метода subscribe. Вот ключевая перегрузка [2]:

Читайте также:  Как убрать знак наушники с андроида

Метод subscribe используется для получения данных выдаваемых [3] observable. Эти данные передаются наблюдателю, который предполагает их обработку в зависимости от требований потребителя. Наблюдатель в этом случае является реализацией интерфейса Observer.

Observable сообщает три вида событий:

  • Данные
  • Сигнал о завершении последовательности [4] (что означает, что новых данных больше не будет)
  • Ошибку, если последовательность завершилась по причине исключительной ситуации (это событие так же предполагает завершение последовательности)

Observer

В Rx предусмотрена абстрактная реализация Observer, Subscriber. Subscriber реализует дополнительную функциональность и, как правило, именно его следует использовать для реализации Observer. Однако, для начала, рассмотрим только интерфейс:

Эти три метода являются поведением, которое описывает реакцию наблюдателя на сообщение от observable. onNext у наблюдателя будет вызван 0 или более раз, опционально сопровождаясь onCompleted или onError. После них вызовов больше не будет.

Разрабатывая код с помощью Rx, вы увидите много Observable, но намного меньше Observer. И хотя и необходимо понимать концепцию Observer, существуют способы не требующие непосредственного создания его экземпляра.

Реализация Observable и Observer

Вы можете вручную реализовать Observer и Observable. В реальности в этом, как правило, нет необходимости: Rx предоставляет готовые решения, чтобы упростить разработку. Это также может быть не совсем безопасно, поскольку взаимодействие между частями библиотеки Rx включает в себя принципы и внутреннюю инфраструктуру, которые могут быть не очевидны новичку. В любом случае, будет проще для начала использовать множество инструментов уже предоставленных библиотекой для создания необходимого нам функционала.

Чтобы подписаться на observable, совсем нет необходимости в реализации Observer. Существуют другие перегрузки метода subscribe, которые принимают в качестве аргументов соответствующие функции для onNext, onError и onSubscribe, инкапсулирующие создание экземпляра Observer. Предоставлять их всех тоже не обязательно, вы можете описать только часть из них, например, только onNext или только onNext и onError.

Лямбда-выражения в Java 1.8 делают эти перегрузки очень подходящими для использования в коротких примерах этой серии статей.

Subject

Subject’ы являются расширением Observable, одновременно реализуя интерфейс Observer. Идея может показаться странной, но в определенных случаях они делают некоторые вещи намного проще. Они могут принимать сообщения о событиях (как observer) и сообщать о них своим подписчикам (как observable). Это делает их идеальной отправной точкой для знакомства с Rx кодом: когда у вас есть данные, поступающие извне, вы можете передать их в Subject, превращая их таким образом в observable.

Существует несколько реализаций Subject. Сейчас мы рассмотрим самые важные из них.

PublishSubject

PublishSubject – самая простая реализация Subject. Когда данные передаются в PublishSubject, он выдает их всем подписчикам, которые подписаны на него в данный момент.

Как мы видим, 1 не была напечатана из-за того, что мы не были подписаны в момент когда она была передана. После того как мы подписались, мы начали получать все значения поступающие в subject.

Здесь мы впервые используем метод subscribe, так что стоит уделить этому внимание. В данном случае мы используем перегруженную версию, которая принимает один объект класса Function, отвечающий за onNext. Эта функция принимает значение типа Integer и ничего не возвращает. Функции, которые ничего не возвращают также называются actions. Мы можем передать эту функцию следующими способами:

  • Предоставить объект класса Action1
  • Неявно создать таковой используя лямбда-выражение
  • Передать ссылку на существующий метод с соответствующей сигнатурой. В данном случае, System.out::println имеет перегруженную версию, которая принимает Object, поэтому мы передаем ссылку на него. Таким образом, подписка позволяет нам печатать в основной поток вывода все поступающие в Subject числа.
Читайте также:  Пульт huayu xmrm 006 live d79c100004a17 для xiaomi android tv box с голосовым поиском

ReplaySubject

ReplaySubject имеет специальную возможность кэшировать все поступившие в него данные. Когда у него появляется новый подписчик, последовательность выдана ему начиная с начала. Все последующие поступившие данные будут выдаваться подписчикам как обычно.

Все значения были получены, не смотря на то, что один из подписчиков подписался позже другого. Обратите внимание, что до того как получить новое значение, подписчик получает все пропущенные. Таким образом, порядок последовательности для подписчика не нарушен.

Кэшировать всё подряд не всегда лучшая идея, так как последовательности могут быть длинными или даже бесконечными. Фабричный метод ReplaySubject.createWithSize ограничивает размер буфера, а ReplaySubject.createWithTime время, которое объекты будут оставаться в кеше.

Наш подписчик на этот раз пропустил первое значение, которое выпало из буфера размером 2. Таким же образом со временем из буфера выпадают объекты у
Subject созданного при помощи createWithTime.

Создание ReplaySubject с ограничением по времени требует объект планировщика (Scheduler), который является представлением времени в Rx. Мы обязательно вернемся к планировщикам в разделе про многопоточность.

ReplaySubject.createWithTimeAndSize ограничивает буфер по обоим параметрам.

BehaviorSubject

BehaviorSubject хранит только последнее значение. Это то же самое, что и ReplaySubject, но с буфером размером 1. Во время создания ему может быть присвоено начальное значение, таким образом гарантируя, что данные всегда будут доступны новым подписчикам.

Начальное значение предоставляется для того, чтобы быть доступным еще до поступления данных.

Так как роль BehaviorSubject – всегда иметь доступные данные, считается неправильным создавать его без начального значения, также как и завершать его.

AsyncSubject

AsyncSubject также хранит последнее значение. Разница в том, что он не выдает данных до тех пока не завершится последовательность. Его используют, когда нужно выдать единое значение и тут же завершиться.

Обратите внимание, что если бы мы не вызвали s.onCompleted(), этот код ничего бы не напечатал.

Неявная инфраструктура

Как мы уже упоминали, существуют принципы, которые могут быть не очевидны в коде. Один из важнейших заключается в том, что ни одно событие не будет выдано после того, как последовательность завершена (onError или onCompleted). Реализация subject’ уважает эти принципы:

Безопасность не может быть гарантирована везде, где используется Rx, поэтому вам лучше быть осведомленным и не нарушать этот принцип, так как это может привести к неопределенным последствиям.

В продолжении мы рассмотрим жизненный цикл Observable.

[1] Или знакомые всем Event Listeners. – Примеч. Автора
[2] Я, все-таки считаю, что ключевой перегрузкой тут является именно версия с Observer в качестве аргумента, в оригинале в качестве примера приводится версия subscribe(Subscriber subscriber) – Примеч. Автора
[3] Я буду использовать слово «выдавать» чтобы описать событие передачи данных от Observable Observer’у (to emit в ориг.). – Примеч. Автора
[4] Автор использует термин последовательность (sequence), чтобы обозначить множество всех данных, которые может выдать Observable. – Примеч. Автора

Источник

Оцените статью