Aayush: weblog

Java Concurrency Utilities (Part-02): Rejected Execution Handlers, Thread Factories and Runnable Queues

Posted by Aayush Bhatnagar on September 25, 2011


In the first part of Java concurrency utilities, we discussed the Callable interface, Future interface and FutureTask class. The post can be found here.

In this post, we will introduce the Rejected Execution Handler utility of the Java concurrency package.

The RejectedExecutionHandler is an interface, which can be implemented by the application. This interface acts as a “callback” interface and it is called when the thread pool executor is unable to execute a task.

The application can then do some “housekeeping” work – which may include the queuing of this task for future execution.

In this example, the thread pool is switched off on purpose, and the Runnable task fails. The failed runnable task is redirected to the rejected execution handler, which queues it in a Linked Blocking Queue.

There is another thread pool with a custom Thread Factory consisting of Deferred worker threads.

The deferred thread pool has all its worker threads “pre-started”, and they all block on the runnable queue. As the Runnable tasks start failing, they are added to the runnable queue and picked up by the deferred worker threads for execution.

Code Files:

1. RejectedTasksDemo.java – where all the action starts

2. RejectedHandler.java – which implements the RejectedExecutionHandler interface

3. WorkerThread.java – the thread which does all the hard work !

4. DeferredWorker.java – the worker thread which does the clean-up once the Runnable fails after reading from the runnable queue.

5. CustomThreadFactory.java – the thread factory for the deferred thread pool.

Code Snippets:

RejectedTasksDemo Class:

package org.demo.java.rejectedhandlers;

import java.util.Queue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * 
 * @author aayush.bhatnagar
 * 
 * A class demonstrating the rejected execution handler.
 *
 */
public class RejectedTasksDemo 
{

// Thread pool with the rejected executor handler configured
public static ThreadPoolExecutor threadP = (ThreadPoolExecutor) Executors.newFixedThreadPool(3);

// Threads which will block on the runnable queue.
public static ThreadPoolExecutor threadR = (ThreadPoolExecutor) Executors.newFixedThreadPool(3);

// Queue for storing the runnable instances which cannot be executed by threadP
public static Queue<Runnable> runnable_queue = new LinkedBlockingQueue<Runnable>();

 public static void main (String[] args) throws InterruptedException 

 {
 threadR.setThreadFactory(new CustomThreadFactory());
 threadP.setRejectedExecutionHandler(new RejectedHandler());

 // pre start all core threads..
 threadR.prestartAllCoreThreads();

 threadP.submit(new WorkerThread("runnable executed by the thread pool executor.."));
 Thread.sleep(300L);
 // shutdown..
 threadP.shutdownNow();
 // Now the rejected tasks handler comes into the picture..
 threadP.submit(new WorkerThread("runnable which got rejected-1.."));
 threadP.submit(new WorkerThread("runnable which got rejected-2.."));
 threadP.submit(new WorkerThread("runnable which got rejected-3.."));

 // shut down the deferred thread pool:
 threadR.shutdown();

 }

}

RejectedHandler.java


package org.demo.java.rejectedhandlers;

import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * 
 * @author aayush.bhatnagar
 * 
 * The call back handler which is invoked when a task cannot be 
 * executed by the thread pool executor.
 *
 */
public class RejectedHandler implements RejectedExecutionHandler
{

 @Override
 public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) 
 {

 System.out.println("oops..race condition. Somebody turned off the thread pool executor..");
 System.out.println("need to complete the rejected task..");
 // Implementations may even decide to queue the task for deferred execution.
 System.out.println("Adding the rejected task to runnable queue -- "+RejectedTasksDemo.runnable_queue.add(task));

 }

}

WorkerThread.java

package org.demo.java.rejectedhandlers;

/**
 * 
 * @author aayush.bhatnagar
 * 
 * A worker thread.
 *
 */

public class WorkerThread implements Runnable
{
 private String description;

 public WorkerThread(String description)
 {
 this.description = description;
 }

 public WorkerThread() {

 }
 @Override
 public void run() {
 System.out.println("Thread type -- "+description);
 System.out.println("Doing some work....");
 System.out.println("Work done..retiring for the day..\n");

 }
}

DeferredWorker.java


package org.demo.java.rejectedhandlers;

/**
 * 
 * @author aayush.bhatnagar
 * 
 * Thread which blocks on the runnable queue and picks up tasks from it 
 * for deferred execution.
 *
 */
public class DeferredWorker extends Thread
{

 private boolean exit_flag = true;
 public DeferredWorker() 
 {

 }

 @Override
 public void run() {
 System.out.println("Deferred thread started -- "+ this.getName());

 while(exit_flag)
 {
 if(RejectedTasksDemo.runnable_queue.peek()!=null)
 {
 System.out.println("Iterating over the runnable queue == "+this.getName());
 Runnable task = RejectedTasksDemo.runnable_queue.poll();
 if(task!=null)
 {
 task.run();

 if(RejectedTasksDemo.runnable_queue.isEmpty())
 this.exit_flag = false;
 }
 }
 }

 }

}

CustomThreadFactory.java

package org.demo.java.rejectedhandlers;

import java.util.concurrent.ThreadFactory;

/**
 * 
 * @author aayush.bhatnagar
 * 
 * The custom thread creation factory
 *
 */
public class CustomThreadFactory implements ThreadFactory {

 @Override
 public Thread newThread(Runnable task) 
 {
 Thread t = new DeferredWorker();
 t.setName("deferred");
 return t;
 }

}

Advertisements

One Response to “Java Concurrency Utilities (Part-02): Rejected Execution Handlers, Thread Factories and Runnable Queues”

  1. Rivu Chakraborty said

    It is really a nice example for RejectedExecutionHandler.
    But, I think it could be somehow more better if you consider including the output also.

    Thanks.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

 
%d bloggers like this: