Реактивное программирование - Потоки - BLoC

Реактивное программирование - Потоки - BLoC

Что такое потоки? Давайте представим трубу, обычную трубу с двумя концами, но только один из них позволяет что-то вставить в него. Когда мы вставляем что-то в этот конец, поток внутри трубы направляется наружу через другой конец.

Во Flutter

  • трубы (pipe) называются Потоки (streams)
  • для управления потоками обычно используются StreamController
  • Для добавление чего-то в поток StreamController выставляет "вход" называемый StreamSink, он доступен через свойство sink
  • Для получения из потока, StreamController выставляет свойство stream  

Что можно передавать в потоках?

Все что угодно. Начиная со значений, событий, объектов, коллекций, сетов, ошибок или других потоков, любые типы данных могут передаваться через потоки.

Как узнать что что-то передается потоком?

Когда нужно получить уведомление, что что-то передается потоком, достаточно "слушать" stream свойство StreamController.

Когда будет определен слушатель, то мы получим объект StreamSubscription. Через этот объект мы и будем получать уведомления, когда что-то будет происходить в потоке.

Когда появляется хотя бы один слушатель потока, поток начинает генерировать события и уведомлять активные StreamSubscription объекты каждый раз когда:

  • какие либо данные выходят из потока
  • когда какие либо ошибки отправлены в поток
  • когда поток закрылся

Также StreamSubscription позволяет

  • остановить прослушивание
  • остановиться на время (пауза)
  • возобновить прослушивание

Так потоки это простые трубопроводы (pipes)?

Нет, потоки также позволяют обрабатывать данные при их перемещении по потоку перед их выходом наружу.

Для трансформации данных внутри потока мы используем StreamTransformer, которые не делает ничего кроме:

  • функция которая захватывает данные перемещаемые внутри потока
  • делает что-то с данными
  • на выходе отдает трансформированные данные, которые также являются потоком

Сразу становиться понятно, что можно использовать несколько StreamTransformers последовательно.

Например:

  • фильтрация: фильтрация на основе определенного условия
  • группировка: перегруппировка данных
  • модификация: различные типы модификаций
  • включения данных в другие потоки
  • буферизация
  • обработка: любые операции с данными
  • ...

Типы потоков

2 типа потоков

Поток с одной подпиской

Этот тип потока допускает только один слушатель на протяжении всего жизненного цикла этого потока.

нельзя прослушать поток дважды, даже после отмены первой подписки.

Широковещательный поток

Второй тип потока позволяет иметь любое количество подписчиков.

можно добавить слушателя (подписаться) на поток в любой момент. Новый подписчик будет получать новые события с момента подписки.  

Примеры

Любой тип данных

import 'dart:async';


// поток с одной подпиской
void main() {
  final StreamController ctrl = StreamController();

// слушатель
final StreamSubscription sub = ctrl.stream.listen((data) => print('$data'));

// добавление данных
ctrl.sink.add('name');
ctrl.sink.add(123);
ctrl.sink.add({'a': 'A'});
ctrl.sink.add(1.23);

// закрытие потока
ctrl.close();
}

Трансформация потока

Используем широковещательный поток который передают целые числа и печатает только четные. Используем фильтрацию и позволяем пройти дальше только четным числам

import 'dart:async';

void main() {
  // здесь мы указываем и тип
  final SreamController<int> ctrl =  StreamController<int>.broadcast();
      
  final StreamSubscription sub = ctrl.stream
      .where((x) => (x % 2 == 0))
      .listen((x) => print('$x'));
  // добавим данные
  for (int i = 1; i <11; i++) {
    ctrl.sink.add(i);
  }
                         
  ctrl.close();
}
      
      

RxDart

в наши дни (2019) описание Потоков не будет  полным если мы не упомянуть RxDart.

RxDart это реализация ReactiveX API для Dart, которая расширяет оригинальные потоки Dart для соответствия стандартам ReactiveX.

Так как это изначально не было сделано Google, то поставляется отдельной библиотекой.

Соответствие понятий

Dart RxDart
Stream Observable
StreamController Subject

RxDart расширяет оригинальный Dart Streams API и предлагает 3 основных варианта StreamController:

PublishSubject

PublishSubject - это обычный широковещательный StreamController с одним исключением: поток возвращает Observable вместо Stream.

PublishSubject отправляет подписчику только те события которые были добавлены в поток после его подписки.

BehaviorSubject

Также широковещательный StreamController который возвращает Observable вместо Stream

Основное отличие от PublishSubject то, что  BehaviorSubject также отправляет самое последнее событие произошедшее перед подпиской.

ReplaySubject

ReplaySubject также широковещательный StreamController, который возвращает Observable вместо Stream

По умолчанию ReplaySubject отправляет все события которые уже были отправлены потоком всем новым подписчикам как новые события.

Важные замечания о Resources

Очень важно и является хорошей практикой всегда освобождать  все ресурсы если больше в них нет необходимости.
  • StreamSubscripton - когда больше не нужно прослушивать поток, отмените подписку
  • StreamController - когда  StreamController больше не нужен, закройте его.
  • то же относиться и к RxDart Subjects, если больше не нужны, закройте их.

Как создать виджет основанный на данных поступающих из потока?

Flutter предлагает удобный StatefulWidget - StreamBuilder

StreamBuilder слушает поток и каждый раз когда какие-то данные поступаю из потока, автоматические перестраивается, вызывая builder