Sunday, May 26, 2013

Java Concurrency Part 5

This is the fifth part of the tutorial, and this link goes to the previous post java concurrency part 4

Executor Framework

When a program that runs many concurrent tasks, all the code related to the threads has to be implemented, create a thread object per task, execute the thread, obtain its results, and so on. 
This can bring some problems such as manage not efficiently the resources of the computer and affect the performance of the application.
For large applications a better approach is needed and the executor framework can help with this.

The Executor Framework separates the task of thread creation, its execution and management, it encapsulates functionality and it improves the performance using a pool of threads.

The way the executor framework works is really simple, it only requires instances of Runnable or Callable objects and it takes care of the rest.

The java.util.concurrent.Executors is a utility class for create pools, factories and services for the executor framework.

Example:
public class Server {  
    private ThreadPoolExecutor executor;  
             
    public Server() {  
       executor = (ThreadPoolExecutor)Executors.newCachedThreadPool();//Creates the executor object or a thread pool  
    }
    public void executeTask(Task task) {  
       executor.execute(task);//executes a task  
    }
    public void endServer() {  
       executor.shutdown();// This method shuts down the executor  
    }  
}
  
public class Task implements Runnable {  
   private String name;  

   public Task(String name) {      
     this.name = name;  
   }  
   public void run() {  
     System.out.println(Thread.currentThread().getName() + ", created on: " + new Date());  
     try {  
       TimeUnit.SECONDS.sleep((long) (Math.random() * 10));  
     } catch (InterruptedException e) {  
       e.printStackTrace();  
     }  
     System.out.println(Thread.currentThread().getName() + ", finished on: " + new Date());       
   }  
 }  

 ...........  
 Server server = new Server();  
     for (int i = 0; i < 10; i++) {  
       Task task = new Task("Task " + i);  
       server.executeTask(task);  
     }  
     server.endServer();  
 ...........  

In the previous example the Executors class created a java.util.concurrent.ThreadPoolExecutor object, this class is an implementation of java.util.concurrent.ExecutorService interface. Although ThreadPoolExecutor can be created directly using its constructors, it is recommended to use the Executors class.

The ThreadPoolExecutor uses the execute() method to execute a Runnable or Callable. It has other methods as getPoolSize(), getCompleteTaskCount() to get the state of the pool.
The ThreadPoolExecutor has to be terminated explicitly by calling the endServer() method, otherwise it won't end and the program will never finish.

To avoid to overload the application and provoke a poor performance, the Executors class has the method newFixedThreadPool(int nThreads) which creates a fixed-size thread executor.This executor has a maximum number of threads indicated by the parameter nThreads,and as the java api says “At any point, at most nThreads threads will be active processing tasks. If additional tasks are submitted when all threads are active, they will wait in the queue until a thread is available. If any thread terminates due to a failure during execution prior to shutdown, a new one will take its place if needed to execute subsequent tasks”.

Example:
   ......  
   public Server() {      
     executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(5);  
   }  
   ......  

In the example the executor is created with a maximum of 5 threads at a time, this means if more than 5 tasks are send to execute only 5 will and the remaining will be blocked until there is a free thread to process them.

Tasks that return a value

The executor framework can run tasks that return a value, this is another advantage of using this framework. For this mechanism the java.util.concurrent.Callable interface is used, instead of having a run() it offers a call() method, which returns any type of object that is specified in the generic form:

                   public interface Callable<V> {
                         V call() throws Exception;
                   }

The ExecutorService has the submit() method which accepts objects of type Callable and executes them, this method returns an object of the type java.util.concurrent.Future, the Future interface has methods to obtain the result generated by the Callable object.

Example:
 public class MultiplyCalculator implements Callable<Integer> {  
   private int operator1;  
   private int operator2;  
   public MultiplyCalculator(int operator1, int operator2) {  
     this.operator1 = operator1;  
     this.operator2 = operator2;  
   }  
   public Integer call() throws Exception {  
     return operator1 * operator2;  
   }  
 }  

 ..........  
 ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(2);//Maximum 2 threads at a time  
  List<Future<Integer>> resultList = new ArrayList<Future<Integer>>();  
  for (int i = 0; i < 10; i++) {  
       MultiplyCalculator calculator = new MultiplyCalculator((int)(Math.random()*10), (int)(Math.random()*10));  
       Future<Integer> result = executor.submit(calculator);  
       resultList.add(result);  
     }  
     while (executor.getCompletedTaskCount() < resultList.size()) {  
         try {  
         Thread.sleep(50);  
       } catch (InterruptedException e) {  
         e.printStackTrace();  
       }  
     }//Waits for the tasks to complete  
     for (int i = 0; i < resultList.size(); i++) {  
       Future<Integer> result = resultList.get(i);  
       try {        
         System.out.println("The result from the task "+i+ " is:" + result.get());  
       } catch (Exception e) {  
         e.printStackTrace();  
       }   
     }  
 ..........

In the previous example, the Callable objects that perform a multiply operation are sent to the executor using the submit() method, the program waits until all the tasks are finished verifying the getCompletedTaskCount() method of the executor, once they are done the results of the operations are obtained with the get() method of the Future object.

 The ExecutorService has the invokeAny(tasks) method which receives a Collection of tasks then it executes them and returns the result of the firs task that finishes without throwing an Exception, tasks that have not completed are cancelled.

An example where this method could be used is for look up services, an application that wants to look up a database connection in different servers, the first task that finds the service available is the one that is going to be used, the other tasks are ignored.

The previous example implemented with the invokeAny() method would look like this:
 .................  
     ExecutorService executor = (ExecutorService) Executors.newCachedThreadPool();  
     List<MultiplyCalculator> taskList = new ArrayList<MultiplyCalculator>();  
     for (int i = 0; i < 10; i++) {  
       MultiplyCalculator calculator = new MultiplyCalculator((int)(Math.random()*10), (int)(Math.random()*10));  
       taskList.add(calculator);  
     }  
     try {  
        Integer result = executor.invokeAny(taskList);  
        System.out.println("The result from the first task in finish is:" + result);  
     } catch (Exception e) {e.printStackTrace();}  
     // Shutdown the Executor  
     executor.shutdown();  
  .............  

The ExecutorService has another mechanism for running multiple tasks and process the result of all tasks, the invokeAll(tasks) method receives a Collection of tasks, executes them and returns a List of Future objects.

Example:
     ExecutorService executor = (ExecutorService) Executors.newCachedThreadPool();  
     List<MultiplyCalculator> taskList = new ArrayList<MultiplyCalculator>();  
     for (int i = 0; i < 10; i++) {  
       MultiplyCalculator calculator = new MultiplyCalculator((int)(Math.random()*10), (int)(Math.random()*10));  
       taskList.add(calculator);  
     }      
     List<Future<Integer>> resultList = null;      
     try {  
        resultList = executor.invokeAll(taskList);         
     } catch (Exception e) {e.printStackTrace();}      
     executor.shutdown();  
     for (int i = 0; i < resultList.size(); i++) {  
       Future<Integer> result = resultList.get(i);  
       try {        
         System.out.println("The result from the task "+i+ " is:" + result.get());  
       } catch (Exception e) {  
         e.printStackTrace();  
       }   
     }  

In the example instead of sending each task to the executor with the submit() method, all the tasks are grouped in a list and send them to execute through the invokeAll() method.

Schedule Tasks

The Executors utility class can create a pool that schedules tasks after a given delay, or executes them periodically. This pool implements the java.util.concurrent.ScheduledExecutorService interface.

Example:
 ScheduledExecutorService executor=
       (ScheduledExecutorService)Executors.newScheduledThreadPool(1);  
 List<Future<Integer>> resultList = new ArrayList<Future<Integer>>();  
 for (int i=0; i<5; i++) {  
      MultiplyCalculator calculator = new MultiplyCalculator((int)(Math.random()*10), (int)(Math.random()*10));  
                Future future<Integer> = executor.schedule(calculator,i+1 , TimeUnit.SECONDS);  
                 resultList.add(future);  
 }  
 executor.shutdown();  
 // Waits for the finalization of the executor  
 try {  
      executor.awaitTermination(1, TimeUnit.DAYS);  
 } catch (InterruptedException e) {  
      e.printStackTrace();  
 }  
 ..............  

In the example a scheduled pool is created with a pool size of 1, then each thread is scheduled using the schedule() method, this method receives as parameters the task to execute, the period of time to wait before the execution and unit of time.
The executor uses the awaitTermination() method which blocks until all tasks have completed or a timeout occurs.

Rejected tasks

If a task is send to the executor between the shutdown() and the end of its execution, the task is rejected. The executor provides a mechanism to manage this, it just requires an instance of an object that implements the java.util.concurrent.RejectedExecutionHandler interface.

Example:

 public class RejectedTaskController implements RejectedExecutionHandler {  
   public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {  
     System.out.println("The task has been rejected");     
   }  
 }  
 ........  
 RejectedTaskController controller = new RejectedTaskController();      
     ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newCachedThreadPool();  
     executor.setRejectedExecutionHandler(controller)  
 ........  

When a task is rejected the rejecedExecution() method of the RejectedExecutionHandler instance is called.

Go to part 6

No comments:

Post a Comment