This tutorial explains what is ConnectableObservable
in RxDart and how to use it as well as its variations.
ConnectableObservable
is a kind of Observable
that can be listened to multiple times. While an Observable
usually begins to emit items when it is listened to, a ConnectableObservable
usually begins to emit items when its .connect()
method is called. Because of that behavior, you need to wait until all intended Observers already listen to before calling .connect()
method which triggers the start of item emission. But actually there are variations that allow item emission replay to new Observers.
ConnectableObservable
broadcasts a single-subscription stream and it waits until all intended Observers
to listen to the Observable
before items are started to be emitted right after .connect()
method is called.
Below are the steps for using ConnectableObservable
- First you have to convert from an ordinary
Observable
using.publish()
method which converts it into aConnectableObservable
. - Create some
Observers
that listen to theObservable
. - Call
.connect()
method.
Look at the code below for example.
import 'package:rxdart/rxdart.dart';
void main(List<String> arguments) {
ConnectableObservable<int> numbers = Observable.range(1, 5).publish();
numbers.listen((value) { print('Observer 1: $value'); });
numbers.listen((value) { print('Observer 2: $value'); });
numbers.connect();
}
Output:
Observer 1: 1
Observer 2: 1
Observer 1: 2
Observer 2: 2
Observer 1: 3
Observer 2: 3
Observer 1: 4
Observer 2: 4
Observer 1: 5
Observer 2: 5
Using .autoConnect()
autoConnect
converts the ConnectableObersvable
into an Observable
that automatically connects when there is a first Observer
subscribes to it. It only connects at most once. If you need to disconnect, you need to provide a callback for canceling the subscription.
Signature:
Observable<T> autoConnect({
void Function(StreamSubscription<T> subscription) connection,
});
Example:
Observable<int> numbers = Observable.range(1, 5).publish().autoConnect();
Using refCount
refCount
converts the ConnectableObservable
into an Observable
that stays connected as long as at least one subscription exists.
Signature:
Observable<T> refCount();
Example:
Observable<int> numbers = Observable.range(1, 5).publish().refCount();
or you can use the shorthand (.share()
)
Observable<int> numbers = Observable.range(1, 5).share();
ConnectableObservable
Variations
PublishConnectableObservable
A ConnectableObservable
that converts a single-subscription Stream
into a broadcast Stream
. This is the one used in the above example.
Observable's methods for creating a PublishConnectableObservable
ConnectableObservable<T> publish() => PublishConnectableObservable<T>(this);
ReplayConnectableObservable
A ConnectableObservable
that converts a single-subscription Stream
into a broadcast Stream
which replays all emitted items to new listeners. It provides synchronous access to the list of emitted items.
Observable
's methods for creating a ReplayConnectableObservable
ReplayConnectableObservable<T> publishReplay({int maxSize}) =>
ReplayConnectableObservable<T>(this, maxSize: maxSize);
Example:
ConnectableObservable<int> numbers = Observable.range(1, 5).publishReplay();
numbers.listen((value) { print('Observer 1: $value'); });
numbers.connect();
await Future.delayed(const Duration(seconds: 1));
numbers.listen((value) { print('Observer 2: $value'); });
Output:
Observer 1: 1
Observer 1: 2
Observer 1: 3
Observer 1: 4
Observer 1: 5
Observer 2: 1
Observer 2: 2
Observer 2: 3
Observer 2: 4
Observer 2: 5
ValueConnectableObservable
A ConnectableObservable
that converts a single-subscription Stream
into a broadcast Stream
which replays only the latest value to new listeners. It also provides synchronous access to the latest value.
Observable
's methods for creating a ValueConnectableObservable
ValueConnectableObservable<T> publishValue() =>
ValueConnectableObservable<T>(this);
ValueConnectableObservable<T> publishValueSeeded(T seedValue) =>
ValueConnectableObservable<T>.seeded(this, seedValue);
Example:
ConnectableObservable<int> numbers = Observable.range(1, 5).publishValue();
numbers.listen((value) { print('Observer 1: $value'); });
numbers.connect();
await Future.delayed(const Duration(seconds: 1));
numbers.listen((value) { print('Observer 2: $value'); });
Output:
Observer 1: 1
Observer 1: 2
Observer 1: 3
Observer 1: 4
Observer 1: 5
Observer 2: 5
The second example uses seed value.
Example:
ConnectableObservable<int> numbers = Observable.range(1, 5).publishValueSeeded(0);
numbers.listen((value) { print('Observer 1: $value'); });
numbers.connect();
await Future.delayed(const Duration(seconds: 1));
numbers.listen((value) { print('Observer 2: $value'); });
Output:
Observer 1: 0
Observer 1: 1
Observer 1: 2
Observer 1: 3
Observer 1: 4
Observer 1: 5
Observer 2: 5