# Flow Class: Processor interface

In 
Flow
Published 2023-06-09

This tutorial explains to you how to use Flow.Processor interface in Java. We have an example for you as well.

Reactive applications are the applications which react to data as they occur. The core principals of the reactive applications (or systems) were defined in Reactive Manifesto and they are:

  • responsive : the application respond quickly to input data;
  • resilient to failure
  • elastic : could handle easily different workloads
  • message-driven : the messages must be queued not to overload the consumer.

Starting from Java 9, an easy way of creating reactive application is by using Flow API. We can use Processor, Subscriber, Publisher, Subscription interfaces and SubmissionPublisher class (this class implements Flow.Publisher).

Please take a look at the following example and read carefully the comments. The code is self-explanatory.

From the base application downloaded from Spring Initializr, I updated the main class and I added some new classes as below:

Employee.java
package com.example.demo;

public class Employee {
    int id;
    String name;

    public Employee(int id, String name) {
        this.id = id;
        this.name = name;
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }
}
MyProcessor.java
package com.example.demo;

import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;

public class MyProcessor extends SubmissionPublisher<Integer> implements Flow.Processor<Employee, Integer> {
    private Flow.Subscription subscription;

    // Do something when the Processor subscribes to a Publisher
    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        // Requests 1 messages from the Publisher. This request(1) "starts" the Subscriber(Processor).
        subscription.request(1);
    }

    // Do something when the Processor receive a message from the Publisher
    @Override
    public void onNext(Employee emp) {
        var empId = emp.getId();
        System.out.println(emp.getName()+" ... data is processed by PROCESSOR >> empId="+empId);

        // Emit an event (which will be handled by MySubscriber)
        submit(empId);

        // Requests another 1 messages from the Publisher if there is any available
        subscription.request(1);
    }

    // Do something when an error occurs
    @Override
    public void onError(Throwable error) {
        System.out.println("Error occurred: " + error.getMessage());
    }

    // Do something when the Publisher stop publishing
    @Override
    public void onComplete() {
        System.out.println("onComplete() from Processor");
    }
}
MySubscriber.java
package com.example.demo;

import java.util.concurrent.Flow;

public class MySubscriber implements Flow.Subscriber<Integer> {
    private Flow.Subscription subscription;

    // Do something when the Subscriber subscribes to a Publisher
    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        // Requests 1 messages from the Publisher(Processor).
        subscription.request(1);
    }

    // Do something when the Subscriber receive a message from the Publisher(Processor)
    @Override
    public void onNext(Integer nrReceived) {
        System.out.println("The result on the SUBSCRIBER for empId=" + nrReceived+" is "+nrReceived*10);

        // Requests 1 messages from the Publisher(Processor).
        subscription.request(1);
    }

    // Do something when an error occurs
    @Override
    public void onError(Throwable error) {
        System.out.println("Error occurred: " + error.getMessage());
    }

    // Do something when the Processor stop publishing
    @Override
    public void onComplete() {
        System.out.println("onComplete() from MySubscriber");
    }
}
DemoApplication.java
package com.example.demo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

import java.util.concurrent.SubmissionPublisher;

@SpringBootApplication
public class DemoApplication {

	public static void main(String[] args) throws InterruptedException {

		SpringApplication.run(DemoApplication.class, args);

		// Define a publisher which can "emit" Employee instances
		SubmissionPublisher<Employee> publisher = new SubmissionPublisher<>();

		// Create an instance of MyProcessor class
		MyProcessor processor = new MyProcessor();

		// Create an instance of MySubscriber class
		MySubscriber subscriber = new MySubscriber();

		// Create the processing chain
		publisher.subscribe(processor);
		processor.subscribe(subscriber);

		// Publish a list of employees to MyProcessor instance (in this case)
		for (int i = 1; i <= 5; i++) {
			publisher.submit(new Employee(i, "EMP"+i));
		}

		System.out.println("HELLO from DemoApplication");
		Thread.sleep(5000);
		publisher.close();
		Thread.sleep(1000);
	}
}

When I run this code I get the following log:

HELLO from DemoApplication
EMP1 ... data is processed by PROCESSOR >> empId=1
EMP2 ... data is processed by PROCESSOR >> empId=2
EMP3 ... data is processed by PROCESSOR >> empId=3
EMP4 ... data is processed by PROCESSOR >> empId=4
The result on the SUBSCRIBER for empId=1 is 10
EMP5 ... data is processed by PROCESSOR >> empId=5
The result on the SUBSCRIBER for empId=2 is 20
The result on the SUBSCRIBER for empId=3 is 30
The result on the SUBSCRIBER for empId=4 is 40
The result on the SUBSCRIBER for empId=5 is 50
onComplete() from Processor

Process finished with exit code 0