In this article, we are going to learn about Streams in dart. For these, we should know what Streams in Dart is. A Stream is a sequence of asynchronous events. In other words, streams are a source of asynchronous events delivered sequentially. There are data events, which are sometimes referred to as elements of the stream due to a stream’s similarity to a list, and there are error events, which are notifications of failure. Once all data elements have been emitted, a special event signaling the stream is done will notify any listeners that there is no more.

A stream is like a pipe, you put a value on the one end, and if there’s a listener on the other end that listener will receive that value. we can process a stream using either await for or listen() from the Stream API.

Advantage of Streams:

The primary advantage of using streams to communicate is that it keeps code loosely coupled. The owner of a stream can emit values as they become available, and it doesn’t need to know anything about who’s listening or why. Similarly, consumers of the data need only adhere to the stream interface, and the means by which the stream’s data is generated are entirely hidden.

There are two important points to know –

  1. Sink: In Flutter Streams, a Sink is a point from where we can add data into the stream pipe.
  2. Source: In Flutter Stream, a Source is a point from where we can keep listening to stream data or get the data that is been added to the stream.

Types Of Stream

There are two types of streams:

  1. Single subscription streams
  2. Broadcast streams

Single Subscription streams:

The most common kind of stream contains a sequence of events that are parts of a larger whole. Events need to be delivered in the correct order and without missing any of them. This is the kind of stream we get when we read a file or receive a web request.

Such a stream can only be listened to once. Listening again later could mean missing out on initial events, and then the rest of the stream makes no sense. When we start listening, the data will be fetched and provided in chunks.

By default, Streams are set up for a single subscription. They hold onto the values until someone subscribes and can only be listened to once. You will get an exception if you try to listen more than once. Any event’s value should not be missed and must be in the correct order. Inside the stream controller, there is only one stream, and only one subscriber can use that stream.

Broadcast streams

The other kind of stream is intended for individual messages that can be handled one at a time. This kind of stream can be used for mouse events in a browser, for example.

we can start listening to such a stream at any time, and you get the events that are fired while you listen. More than one listener can listen at the same time, and we can listen again later after canceling a previous subscription.

If we need multiple parts of your app to access the same stream, use a broadcast stream, instead. A broadcast stream allows any number of listeners. It fires when its events are ready, whether there are listeners or not. To create a broadcast stream, you simply call asBroadcastStream() on an existing single subscription stream. This is the stream that is set up for multiple subscriptions. They hold onto the values until subscribers can only listen many times. You can use the broadcast stream if you want more objects to listen to the stream. It can be used for mouse events in a browser. Inside the stream controller, many streams can be used by many subscribers. E.g., You can start watching videos on such a stream at any time, and more than one subscriber can watch the video simultaneously. Similarly, you can watch again after canceling a previous subscription.

Syntax

StreamController<data_type> controller = StreamController<data_type>.broadcast();

How To Create a Stream

To create a stream let’s create a StreamController first.

StreamController<data_type> controller = StreamController<data_type>();

Now we can access this controller through the stream property.

Stream stream = controller.stream;

How To Subscribe to A Stream

After getting access from the stream you subscribe to the stream by calling a listen() method.

stream.listen((value) {
  print("Value from controller: $value");
});

How To Add Value To The Stream

We can add the stream by calling the add() method. Let’s add some value to the stream.

controller.add(3);

How To Cancel A Stream

You can cancel a stream by using the cancel() method.

streamSubscription.cancel();

How To Manage The Stream

To manage the stream, listen() the method is used.

StreamSubscription<int> streamSubscription = stream.listen((value){
  print("Value from controller: $value");
});

How To Manage The Stream

To manage the stream, listen() the method is used.

StreamSubscription<int> streamSubscription = stream.listen((value){
  print("Value from controller: $value");
});

Method Use in Streams

StreamSubscription<T> listen(void Function(T event) onData,
    {Function onError, void Function() onDone, bool cancelOnError});
StreamSubscription<int>  subscription = stream.listen(
  (data) { //this block is executed when data event is recieved by listener
    print('Data: $data');
  },
  onError: (err) { //this block is executed when error event is recieved by listener
    print('Error: ${err}');
  },
  cancelOnError: false, //this decides if subscription is cancelled on error or not
  onDone: () { //this block is executed when done event is recieved by listener
    print('Done!');
  },
);

The stream in the example above is a single subscription stream. This means that the stream has been set up for an individual listener to subscribe to once throughout the stream’s lifetime. So, a repeated subscription or multiple listeners will result in an error. By default, all streams are single subscriptions, so they only emit events when a listener is subscribed.

Example of Broadcast streams

import 'dart:convert';
import 'dart:async';

// Initializing a stream controller for a broadcast stream
StreamController<String> controller = StreamController<String>.broadcast();

// Creating a new broadcast stream through the controller
Stream<String> stream = controller.stream;

void main() {
	// Setting up a subscriber to listen for any events sent on the stream
	StreamSubscription<String> subscriber1 = stream.listen((String data) {
		print('Subscriber1: ${data}');
	},
	onError: (error) {
		print('Subscriber1: ${error}');
	},
	onDone: () {
		print('Subscriber1: Stream closed!');
	});

	// Setting up another subscriber to listen for any events sent on the stream
	StreamSubscription<String> subscriber2 = stream.listen((String data) {
		print('Subscriber2: ${data}');
	},
	onError: (error) {
		print('Subscriber2: ${error}');
	},
	onDone: () {
		print('Subscriber2: Stream closed!');
	});

	// Adding a data event to the stream with the controller
	controller.sink.add('Sagar Koju');

	// Adding an error event to the stream with the controller
	controller.addError('Error!');

	// Closing the stream with the controller
	controller.close();
}

Output

Subscriber1: SagarKoju
Subscriber2: SagarKoju
Subscriber1: Error!
Subscriber2: Error!
Subscriber1: Stream closed!
Subscriber2: Stream closed!

Let’s look into some of the classes’ useful methods that are used in the above programs:

  1. add() method: handles forwarding any data to the sink.
  2. addError() method:  If an error occurs and your stream’s listeners need to be informed then addError() is used.
  3. listen() method:  We listen to the stream for the data incoming with .listen() method.

Keywords Used In-Stream

  • async*: It is mainly used in the stream that works like the async in the future.
  • yield: It is used to emit values from a generator, either async or sync. yield returns values from an Iterable or a Stream.
  • yield*: yield* is used to call its Iterable or Stream function recursively.

Thank You for reading the guide about Streams in Dart