Wednesday, 4 January 2017

Parallel Stream in Java Stream API

You can execute streams in serial or in parallel. When a stream executes in parallel, the Java runtime partitions the stream into multiple sub-streams. Aggregate operations iterate over and process these sub-streams in parallel and then combine the results.

This parallel execution of data, with each sub-stream running in a separate thread, will result in increase in performance. Since, part of the data (sub-stream) is processed by different processors of multi-core processors in separate threads which are later combined to give the final result, bulk operations can also be processed in less time making the whole process more efficient and less time consuming.

Since partial results are computed by separate threads and later combined so it becomes important to think about these points -

  1. Is separate combiner needed to combine partial results or aggregate function itself can work as combiner too.
  2. Since multi-threading is involved so any shared variable should not be updated by any operation in the parallel stream.
  3. Most of the time collection is used as a data source of the stream and collections are not thread safe. Which means that multiple threads cannot manipulate a collection without introducing thread interference errors.
  4. Parallel stream uses common fork-join thread pool. Thus, running a performance intensive long running task may block all threads in the pool, which may block all other tasks (as no threads will be available) that are using parallel streams.
  5. Note that parallelism is not automatically faster than performing operations serially, although it can be if you have enough data and processor cores. While aggregate operations enable you to more easily implement parallelism, it is still your responsibility to determine if your application is suitable for parallelism.

How to get a parallel stream

Collection has methods Collection.stream() and Collection.parallelStream(), which produce sequential and parallel streams respectively.

You can also call parallel() method on a sequential stream to get a parallel stream. The parallel() method is defined in the BaseStream interface.

As example: If you have an Employee class and for some testing you want to create 1000 objects of Employee class then you can use parallel() method with range -

List<Employee> empList = IntStream.rangeClosed(1, 1000).parallel().mapToObj(Employee::new).collect(Collectors.toList());

Examples using parallel stream

Let’s first see a simple example using parallel stream where you need to find the employee with maximum salary.

Employee class

public class Employee {
    private String lastName;
    private String firstName;
    private String empId;
    private int age;
    private int salary;
    public String getLastName() {
        return lastName;
    }
    public void setLastName(String lastName) {
        this.lastName = lastName;
    }
    public String getFirstName() {
        return firstName;
    }
    public void setFirstName(String firstName) {
        this.firstName = firstName;
    }
    public String getEmpId() {
        return empId;
    }
    public void setEmpId(String empId) {
        this.empId = empId;
    }
    public int getAge() {
        return age;
    }
    public void setAge(int age) {
        this.age = age;
    }
    
    public String getFullName(){
     return this.firstName + " " + this.lastName;
    }
   
    public int getSalary() {
 return salary;
    }
    public void setSalary(int salary) {
 this.salary = salary;
    }
}

Using parallel stream

import java.util.ArrayList;
import java.util.List;
import java.util.OptionalInt;

public class ParallelDemo1 {

    public static void main(String[] args) {
        // getting list of employee 
        List<Employee> empList = createList();
        OptionalInt maxSalary = empList.parallelStream().mapToInt(e -> e.getSalary()).max();
        if(maxSalary.isPresent()){
            System.out.println("Max Salary " + maxSalary.getAsInt());
        }
    }
    
    // Stub method to create list of employee objects
    private static List createList(){
        List<Employee> empList = new ArrayList<Employee>();
        Employee emp = new Employee();
        emp.setEmpId("E001");
        emp.setAge(40);
        emp.setFirstName("Ram");
        emp.setLastName("Chandra");
        emp.setSalary(5000);
        empList.add(emp);
        emp = new Employee();
        emp.setEmpId("E002");
        emp.setAge(35);
        emp.setFirstName("Sheila");
        emp.setLastName("Baijal");
        emp.setSalary(7000);
        empList.add(emp);
        emp = new Employee();
        emp.setEmpId("E003");
        emp.setAge(24);
        emp.setFirstName("Mukesh");
        emp.setLastName("Rishi");
        emp.setSalary(9000);
        empList.add(emp);
        emp = new Employee();
        emp.setEmpId("E004");
        emp.setAge(37);
        emp.setFirstName("Rani");
        emp.setLastName("Mukherjee");
        emp.setSalary(10000);
        empList.add(emp);
        return empList;
    }
}

Output

Max Salary 10000

Mistake of updating a shared variable

As already stated above updating a shared state when using parallel stream may cause problem due to multi-threading. Let’s see it with an example.

There is an Employee class and 500 objects of the Employee class are stored in a list. Then using parallel stream you are trying to get the total salary paid to all employees.

Employee class

Employee class is same as used above with one difference, now there is a constructor with int argument which is used to set salary property. Using range method 500 objects will be created with salary set as 1..500.

public class Employee {
    private String lastName;
    private String firstName;
    private String empId;
    private int age;
    private int salary;
    
    Employee(int salary){
     this.salary = salary;
    }
    
    public String getLastName() {
        return lastName;
    }
    public void setLastName(String lastName) {
        this.lastName = lastName;
    }
    public String getFirstName() {
        return firstName;
    }
    public void setFirstName(String firstName) {
        this.firstName = firstName;
    }
    public String getEmpId() {
        return empId;
    }
    public void setEmpId(String empId) {
        this.empId = empId;
    }
    public int getAge() {
        return age;
    }
    public void setAge(int age) {
        this.age = age;
    }
    
    public String getFullName(){
     return this.firstName + " " + this.lastName;
    }
   
    public int getSalary() {
  return salary;
    }
}

Total Salary Calculation

import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class ParallelDemo {

    public static void main(String[] args) {
        Salary sal = new Salary();
        List<Employee> empList = createList();
        empList.parallelStream().forEach(sal::doProcess);
        
        System.out.println("Total - " + sal.getTotalSalary());

    }
    // Stub method to create list of employee objects
    private static List createList(){
        List<Employee> empList = IntStream.rangeClosed(1, 500).mapToObj(Employee::new).collect(Collectors.toList());
        return empList;
    }
}

class Salary{
    private int total = 0;
    
    public void doProcess(Employee emp){
        addSalary(emp.getSalary());
    }
    
    public void addSalary(int salary){
        total = total + salary;
    }
    public int getTotalSalary(){
        return total;
    }
}

Output

Total – 113359, Total – 125250, Total – 120901, Total – 123835, Total – 125250

I got these 5 outputs on executing it 5 times. You can see that output is different (Correct output is 125250 by the way). It is because total is changed from the parallel stream which is a shared variable.

If you have seen the first example I have given for parallel stream you must have got an idea what’s the better way to do it.

public class ParallelDemo {

    public static void main(String[] args) {
        Salary sal = new Salary();
        List<Employee> empList = createList();
        //empList.parallelStream().forEach(sal::doProcess);
        int totalSalary = empList.parallelStream().mapToInt(e -> e.getSalary()).sum();
        sal.addSalary(totalSalary);
        System.out.println("Total - " + sal.getTotalSalary());

    }
    // Stub method to create list of employee objects
    private static List createList(){
        List<Employee> empList = IntStream.rangeClosed(1, 500).mapToObj(Employee::new).collect(Collectors.toList());
        return empList;
    }
}

Here note that you are not changing the shared variable in the stream. You are getting the salaries of the employees and summing it. That way, even in parallel stream there is no problem as different threads are getting different data to add and then those partial results are combined to get the total salary.

Using Collectors.groupingByConcurrent

Operation groupingBy performs poorly with parallel streams. (This is because it operates by merging two maps by key, which is computationally expensive). With parallel stream you should use groupingByConcurrent operation instead of groupingBy which returns an instance of ConcurrentMap instead of Map.

As example – If you have an Employee class and you want to group employees on the basis of sex it can be done as -

import java.util.Arrays;
import java.util.List;

import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;

public class ParallelDemo1 {

    public static void main(String[] args) {
        ParallelDemo1 pd = new ParallelDemo1();
        // getting list of employee 
        List<Employee> empList = pd.createList();
        
        ConcurrentMap<Character, List<Employee>> bySalary = empList.parallelStream().collect(Collectors.groupingByConcurrent(e -> e.sex));
        bySalary.forEach((K, V)->{
            System.out.println("Key- " + K + " Value ");
            V.forEach(v->System.out.println(v.name));
        });
        
    }
    
    
    // Stub method to create list of employee objects
    private List<Employee> createList(){
        List<Employee> empList = Arrays.asList(new Employee("E001", 40, "Ram", 'M', 5000), 
        new Employee("E002", 35, "Sheila", 'F', 7000), 
        new Employee("E003", 24, "Mukesh", 'M', 9000), 
        new Employee("E004", 37, "Rani", 'F', 10000));
        
        return empList;
    }

    class Employee {
        private String empId;
        private int age;
        private String name;
        private char sex;
        private int salary;
        Employee(String empId, int age, String name, char sex, int salary){
            this.empId = empId;
            this.age = age;
            this.name = name;
            this.sex = sex;
            this.salary = salary;
        }
        
    }
}

Output

Key- F Value 
Sheila
Rani
Key- M Value 
Mukesh
Ram

Using forEachOrdered

The order in which a pipeline processes the elements of a stream depends on whether the stream is executed in serial or in parallel.

As example :

Integer[] intArray = {1, 2, 3, 4, 5, 6, 7, 8 };
List<Integer> listOfIntegers = new ArrayList<>(Arrays.asList(intArray));
System.out.println("listOfIntegers:");
listOfIntegers.stream().forEach(e -> System.out.print(e + " "));

Output

listOfIntegers:
1 2 3 4 5 6 7 8

Here you can see that the pipeline prints the elements of the list listOfIntegers in the order that they were added to the list.

With parallel stream -

System.out.println("Parallel stream");
listOfIntegers.parallelStream().forEach(e -> System.out.print(e + " "));

Output

Parallel stream:
3 4 1 6 2 5 7 8

Here you can see that the pipeline prints the elements of the list in an apparently random order. When you execute a stream in parallel, the Java compiler and runtime determine the order in which to process the stream's elements to maximize the benefits of parallel computing unless otherwise specified by the stream operation.

Using forEachOrdered -

System.out.println("With forEachOrdered:");
listOfIntegers.parallelStream().forEachOrdered(e -> System.out.print(e + " "));

Output

With forEachOrdered:
8 7 6 5 4 3 2 1

Note that the method forEachOrdered is used here, which processes the elements of the stream in the order specified by its source, regardless of whether you executed the stream in serial or parallel. Note that you may lose the benefits of parallelism if you use operations like forEachOrdered with parallel streams.

Points to note

  1. Parallelism is not automatically faster than performing operations serially, although it can be if you have enough data and processor cores.
  2. There is increased overhead of splitting the data, managing multiple threads, combining the partial results.
  3. Make sure that there is enough data for computation that offsets this increased overhead and time saved in parallely processing the data scores over any overhead tasks.
  4. Performance of parallel execution also depends upon the number of processors available.

That's all for this topic Parallel Stream in Java Stream API. If you have any doubt or any suggestions to make please drop a comment. Thanks!


Related Topics

  1. Stream API in Java 8
  2. Java Stream API Examples
  3. Reduction Operations in Java Stream API
  4. Lambda expressions in Java 8

You may also like -

>>>Go to Java advance topics page

No comments:

Post a Comment