Sometimes you may want to send data in a stream. A stream consists of data sequence. Data can be pushed to a stream, while one or more listeners subscribe to the stream and get aware every time a new data added to the stream. In this tutorial, I'm going to show you how to send data using a stream, including how to create a stream controller, send data to the stream, and create the listener which includes function for handling events like data received, error occurred and task finished. This tutorial is for Flutter, but it can be implemented on any framework using Dart language.
Dependencies
Stream is a built-in Dart library. So, we don't need to install any dependency. To be able to use stream, import the async
library.
import 'dart:async';
Code
Below is the code structure of this tutorial. We need to store a variable streamController
which will be explained later.
import 'dart:async';
import 'package:flutter/material.dart';
void main() => runApp(MyApp());
class MyApp extends StatelessWidget {
@override
Widget build(BuildContext context) {
return MaterialApp(
title: 'Stream Example',
home: StreamExample(),
);
}
}
class StreamExampleState extends State<StreamExample> {
StreamController<String> streamController;
@override
void initState() {
super.initState();
// Implement stream here
}
@override
Widget build(BuildContext context) {
print("build");
return Scaffold(
appBar: AppBar(
title: Text('Stream Example'),
),
body: Center(
child: Text(
"Woolha.com"
)
),
);
}
Future<String> getData() async {
await Future.delayed(Duration(seconds: 5)); //Mock delay
print("Fetched Data");
return "This a test data";
}
}
class StreamExample extends StatefulWidget {
@override
StreamExampleState createState() => new StreamExampleState();
}
Create a StreamController
To create a data stream, we can use StreamController
. The constructor supports the following parameters.
Parameter | Type |
---|---|
onListen |
Function |
onPause |
Function |
onResume |
Function |
onResume |
Function |
onCancel |
Function |
sync |
boolean (default: false ) |
Below is the constructor example.
class StreamExampleState extends State<StreamExample> {
void onPauseHandler() {
print('on Pause');
}
StreamController streamController;
@override
void initState() {
super.initState();
streamController = new StreamController(
onPause: onPauseHandler,
);
}
}
To send data, use add
method with the data as the argument.
streamController.add("This a test data");
If error occurs, instead of sending data, you can send error by using addError
method.
streamController.addError(new Exception('An exception'));
Create a StreamSubscription
After creating a StreamController
, We need something that can subscribe to the stream. We can create a StreamSubscription<T>
, with T
is data type (String
in this example). To create a StreamSubscription
, use streamController.stream.listen
.
StreamSubscription<String> subscription;
subscription = streamController.stream.listen((data) {
print("DataReceived: " + data);
}, onDone: () {
print("Task Done");
}, onError: (error) {
print("Some Error");
});
The first parameter must be onListen
function which will be executed every time a data received. There are three optional parameters: onDone
onError
and cancelOnError
Below is the list of supported parameters
Parameter | Type |
---|---|
onListen (Required as first parameter) |
Function |
onDone (Optional) |
Function |
onError (Optional) |
Function |
cancelOnError (Optional) |
bool |
Below is the full code
import 'dart:async';
import 'package:flutter/material.dart';
void main() => runApp(MyApp());
class MyApp extends StatelessWidget {
@override
Widget build(BuildContext context) {
return MaterialApp(
title: 'Stream Example',
home: StreamExample(),
);
}
}
class StreamExampleState extends State {
void onPauseHandler() {
print('on Pause');
}
StreamController streamController;
@override
void initState() {
super.initState();
streamController = new StreamController(
onPause: onPauseHandler,
);
StreamSubscription subscription;
subscription = streamController.stream.listen((data) {
print("DataReceived: " + data);
// Add 5 seconds delay
// It will call onPause function passed on StreamController constructor
subscription.pause(Future.delayed(const Duration(seconds: 5)));
}, onDone: () {
print("Task Done");
}, onError: (error) {
print("Some Error");
});
streamController.add("This a test data");
streamController.addError(new Exception('An exception'));
streamController.add("This a test data 2");
streamController.close(); //Streams must be closed when not needed
streamController.add("This a test data 3");
}
@override
void dispose() {
streamController.close();
super.dispose();
}
@override
Widget build(BuildContext context) {
print("build");
return Scaffold(
appBar: AppBar(
title: Text('Stream Example'),
),
body: Center(
child: Text(
"Woolha.com"
)
),
);
}
}
class StreamExample extends StatefulWidget {
@override
StreamExampleState createState() => new StreamExampleState();
}
That's how to use StreamController
along with StreamSubscription
.