# Flow Class: Subscriber, Publisher, Subscription interfaces

In 
Flow
Published 2023-06-09

This tutorial explains to you how to use Flow.Subscriber, Flow.Publisher, Flow.Subscription interfaces in Java. We have an example for you as well. If you want to see how the Flow.Processor interface could be implemented, you can take a look at the article named Flow Class: Processor interface.

Reactive applications are the applications which react to data as they occur. The core principals of the reactive applications (or systems) were defined in Reactive Manifesto and they are:

  • responsive : the application respond quickly to input data;
  • resilient to failure
  • elastic : could handle easily different workloads
  • message-driven : the messages must be queued not to overload the consumer.

Starting from Java 9, an easy way of creating reactive application is by using Flow API. We can use Processor, Subscriber, Publisher, Subscription interfaces and SubmissionPublisher class (this class implements Flow.Publisher).

Please take a look at the following example and read carefully the comments. The code is self-explanatory.

From the base application downloaded from Spring Initializr, I updated the main class and I added some new classes as below:

Employee.java
package com.example.demo;

public class Employee {
    int id;
    String name;

    public Employee(int id, String name) {
        this.id = id;
        this.name = name;
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }
}
MySubscriber1.java
package com.example.demo;

import java.util.concurrent.Flow;

public class MySubscriber1 implements Flow.Subscriber<Employee> {
    private Flow.Subscription subscription;

    // Do something when the Subscriber subscribes to a Publisher
    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        // Requests 3 messages from the Publisher. Generally we use 1.
        subscription.request(3);
    }

    // Do something when the Subscriber receive a message from the Publisher
    @Override
    public void onNext(Employee emp) {
        System.out.println("//MySubscriber1// is processing \"" + emp.getName()+"\" data.");

        // After the 1st event, the subscription is cancelled and
        // MySubscriber1 will no longer process the messages published by the publisher
        subscription.cancel();
    }

    // Do something when an error occurs
    @Override
    public void onError(Throwable error) {
        System.out.println("Error occurred: " + error.getMessage());
    }

    // Do something when the Publisher stop publishing
    @Override
    public void onComplete() {
        System.out.println("onComplete() from MySubscriber1");
    }
}
MySubscriber2.java
package com.example.demo;

import java.util.concurrent.Flow;

public class MySubscriber2 implements Flow.Subscriber<Employee> {
    private Flow.Subscription subscription;

    // Do something when the Subscriber subscribes to a Publisher
    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        // Requests 1 messages from the Publisher. This request(1) "starts" the Subscriber.
        subscription.request(1);
    }

    // Do something when the Subscriber receive a message from the Publisher
    @Override
    public void onNext(Employee emp) {
        System.out.println(emp.getName()+" data is processed by >>> MySubscriber2.");
        // Requests another 1 messages from the Publisher if any available
        subscription.request(1);
    }

    // Do something when an error occurs
    @Override
    public void onError(Throwable error) {
        System.out.println("Error occurred: " + error.getMessage());
    }

    // Do something when the Publisher stop publishing
    @Override
    public void onComplete() {
        System.out.println("onComplete() from MySubscriber2");
    }
}
DemoApplication.java
package com.example.demo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import java.util.concurrent.SubmissionPublisher;

@SpringBootApplication
public class DemoApplication {

	public static void main(String[] args) throws InterruptedException {

		SpringApplication.run(DemoApplication.class, args);

		// Define a publisher which can "emit" Employee classes
		SubmissionPublisher<Employee> publisher = new SubmissionPublisher<>();

		publisher.subscribe(new MySubscriber1());
		publisher.subscribe(new MySubscriber2());

		// Publish a list of employees to MySubscriber1 & MySubscriber2
		for (int i = 1; i <= 10; i++) {
			publisher.submit(new Employee(i, "EMP"+i));
		}

		System.out.println("HELLO from DemoApplication");
		Thread.sleep(5000);
		publisher.close();
		Thread.sleep(1000);
	}
}

When I run this code I get the following log:

HELLO from DemoApplication
EMP1 data is processed by >>> MySubscriber2.
EMP2 data is processed by >>> MySubscriber2.
EMP3 data is processed by >>> MySubscriber2.
EMP4 data is processed by >>> MySubscriber2.
EMP5 data is processed by >>> MySubscriber2.
EMP6 data is processed by >>> MySubscriber2.
EMP7 data is processed by >>> MySubscriber2.
//MySubscriber1// is processing "EMP1" data.
EMP8 data is processed by >>> MySubscriber2.
EMP9 data is processed by >>> MySubscriber2.
EMP10 data is processed by >>> MySubscriber2.
onComplete() from MySubscriber2

Process finished with exit code 0