Aayush: weblog

Java Concurrency Utilities (Part-02): Rejected Execution Handlers, Thread Factories and Runnable Queues

Posted by Aayush Bhatnagar on September 25, 2011


In the first part of Java concurrency utilities, we discussed the Callable interface, Future interface and FutureTask class. The post can be found here.

In this post, we will introduce the Rejected Execution Handler utility of the Java concurrency package.

The RejectedExecutionHandler is an interface, which can be implemented by the application. This interface acts as a “callback” interface and it is called when the thread pool executor is unable to execute a task.

The application can then do some “housekeeping” work – which may include the queuing of this task for future execution.

In this example, the thread pool is switched off on purpose, and the Runnable task fails. The failed runnable task is redirected to the rejected execution handler, which queues it in a Linked Blocking Queue.

There is another thread pool with a custom Thread Factory consisting of Deferred worker threads.

The deferred thread pool has all its worker threads “pre-started”, and they all block on the runnable queue. As the Runnable tasks start failing, they are added to the runnable queue and picked up by the deferred worker threads for execution.

Code Files:

1. RejectedTasksDemo.java – where all the action starts

2. RejectedHandler.java – which implements the RejectedExecutionHandler interface

3. WorkerThread.java – the thread which does all the hard work !

4. DeferredWorker.java – the worker thread which does the clean-up once the Runnable fails after reading from the runnable queue.

5. CustomThreadFactory.java – the thread factory for the deferred thread pool.

Code Snippets:

RejectedTasksDemo Class:

package org.demo.java.rejectedhandlers;

import java.util.Queue;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * 
 * @author aayush.bhatnagar
 * 
 * A class demonstrating the rejected execution handler.
 *
 */
public class RejectedTasksDemo 
{

// Thread pool with the rejected executor handler configured
public static ThreadPoolExecutor threadP = (ThreadPoolExecutor) Executors.newFixedThreadPool(3);

// Threads which will block on the runnable queue.
public static ThreadPoolExecutor threadR = (ThreadPoolExecutor) Executors.newFixedThreadPool(3);

// Queue for storing the runnable instances which cannot be executed by threadP
public static Queue<Runnable> runnable_queue = new LinkedBlockingQueue<Runnable>();

 public static void main (String[] args) throws InterruptedException 

 {
 threadR.setThreadFactory(new CustomThreadFactory());
 threadP.setRejectedExecutionHandler(new RejectedHandler());

 // pre start all core threads..
 threadR.prestartAllCoreThreads();

 threadP.submit(new WorkerThread("runnable executed by the thread pool executor.."));
 Thread.sleep(300L);
 // shutdown..
 threadP.shutdownNow();
 // Now the rejected tasks handler comes into the picture..
 threadP.submit(new WorkerThread("runnable which got rejected-1.."));
 threadP.submit(new WorkerThread("runnable which got rejected-2.."));
 threadP.submit(new WorkerThread("runnable which got rejected-3.."));

 // shut down the deferred thread pool:
 threadR.shutdown();

 }

}

RejectedHandler.java


package org.demo.java.rejectedhandlers;

import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * 
 * @author aayush.bhatnagar
 * 
 * The call back handler which is invoked when a task cannot be 
 * executed by the thread pool executor.
 *
 */
public class RejectedHandler implements RejectedExecutionHandler
{

 @Override
 public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) 
 {

 System.out.println("oops..race condition. Somebody turned off the thread pool executor..");
 System.out.println("need to complete the rejected task..");
 // Implementations may even decide to queue the task for deferred execution.
 System.out.println("Adding the rejected task to runnable queue -- "+RejectedTasksDemo.runnable_queue.add(task));

 }

}

WorkerThread.java

package org.demo.java.rejectedhandlers;

/**
 * 
 * @author aayush.bhatnagar
 * 
 * A worker thread.
 *
 */

public class WorkerThread implements Runnable
{
 private String description;

 public WorkerThread(String description)
 {
 this.description = description;
 }

 public WorkerThread() {

 }
 @Override
 public void run() {
 System.out.println("Thread type -- "+description);
 System.out.println("Doing some work....");
 System.out.println("Work done..retiring for the day..\n");

 }
}

DeferredWorker.java


package org.demo.java.rejectedhandlers;

/**
 * 
 * @author aayush.bhatnagar
 * 
 * Thread which blocks on the runnable queue and picks up tasks from it 
 * for deferred execution.
 *
 */
public class DeferredWorker extends Thread
{

 private boolean exit_flag = true;
 public DeferredWorker() 
 {

 }

 @Override
 public void run() {
 System.out.println("Deferred thread started -- "+ this.getName());

 while(exit_flag)
 {
 if(RejectedTasksDemo.runnable_queue.peek()!=null)
 {
 System.out.println("Iterating over the runnable queue == "+this.getName());
 Runnable task = RejectedTasksDemo.runnable_queue.poll();
 if(task!=null)
 {
 task.run();

 if(RejectedTasksDemo.runnable_queue.isEmpty())
 this.exit_flag = false;
 }
 }
 }

 }

}

CustomThreadFactory.java

package org.demo.java.rejectedhandlers;

import java.util.concurrent.ThreadFactory;

/**
 * 
 * @author aayush.bhatnagar
 * 
 * The custom thread creation factory
 *
 */
public class CustomThreadFactory implements ThreadFactory {

 @Override
 public Thread newThread(Runnable task) 
 {
 Thread t = new DeferredWorker();
 t.setName("deferred");
 return t;
 }

}

Posted in Java, Uncategorized | Tagged: , , | 1 Comment »

Java Concurrency Utilities (Part-01): Callable, Future and FutureTask

Posted by Aayush Bhatnagar on September 25, 2011


In this post, I will demonstrate through a very simple program the usage of the Callable, Future and FutureTask utilities. These utilities are present in the java.util.concurrent package.

However, before we come to the actual code – it is important to understand the use-cases behind the need for these utilities.

The use-case for having a Callable interface:

Sometimes, in our applications – we feel the need for our worker threads to have a return value. Doing so with usual threads is not possible, as the run() method does not have a return type.

In such cases, we can create a class which implements the Callable “type” and pass the instance of that class  to the in-built java thread pool.

The class which implements callable, is executed in a thread inside the Executor, and the return value of the “callable” is made available once the thread’s execution completes.

The use-case for Future and FutureTask:

It is often seen, that while designing APIs (being exposed to 3rd party applications), developers may provide API variants which expose a “synchronous” behavior as well as an option of “asynchronous” behavior, as viewed from the 3rd party application.

The processing for each incoming API call may happen in a worker thread, which in turn exchanges data over the network (or does some file I/O etc), and then a return value needs to be presented (think Callable), which has to be sent back to the API caller.

Hence, for the API caller, it seems that the invocation was synchronous. However, under the hood – the API spawns a worker thread, which is executed asynchronously by a thread pool and returns a value at some point in the “future”, when the processing is done.

In such cases, the concepts around Future and Future Task become important and come in handy for developers.

One of the practical usages of Future utilities can be from a protocol stack perspective – where the client sends a message through the stack. The stack sends the message over the network in a request submitter thread, receives a response “in the future” in a response listener thread and then this response is returned back to the caller thread.

The request submitter thread and the response receiver thread can communicate through an “Exchanger” concurrency utility of Java –  (http://download.oracle.com/javase/6/docs/api/java/util/concurrent/Exchanger.html). However, this is a story for another time.

The example below is not that complex ! It only introduces these utilities.

Both these use-cases are illustrated below in the form of code snippets:

Code Snippet – Demonstrating Callable, Future and Future Task:

The main class – where we do all the stuff:

package org.demo.java.future;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;

/**
 * 
 * @author aayush.bhatnagar
 * 
 * Demo for Callable, Future and FutureTask.
 *
 */
public class FutureDemo 
{
 // This is our thread pool, which uses Java's internal thread pooling utilities.
 public static ExecutorService threadPool = Executors.newFixedThreadPool(3);

 public static void main (String[] args) throws InterruptedException, ExecutionException
 {
 // Demonstrating the Callable Interface usage.
 /*
 * The SomeTask.java class implements the Callable Interface, and is passed as an 
 * argument to the thread pool's submit method.
 * 
 * Please note, that the SomeTask class is not a Runnable type, but a Callable type.
 * 
 * Being a callable type means, that SomeTask can return a value once the task is executed by the
 * worker thread.
 */

 // The return value of the Callable, is stored here as a "Future" type.
 // In this particular example, we are expecting a String return type.
 Future<String> result = threadPool.submit(new SomeTask());
 // Getting the result as a Future type.
 String res = result.get();

 System.out.println("Result --> "+res);

 /*
 * A FutureTask class implements the Future interface. It provides for some
 * utility and control mechanisms on how the "Future" task would behave as follows - 
 */
 // Created a new future task with a Callable argument.
 FutureTask<String> task = new FutureTask<String>(new SomeTask());
 // Submitting to the thread pool..
 threadPool.submit(task);
 // Getting the result from the Future type (Future Task in this case).
 String res1 = task.get();

 System.out.println("Result --> "+res1);

 }

}

The SomeTask.java class which implements the Callable interface:

package org.demo.java.future;

import java.util.concurrent.Callable;

/**
 * 
 * @author aayush.bhatnagar
 * 
 * This class implements the Callable interface and
 * provides the implementation to the call ( ) method.
 *
 */
public class SomeTask implements Callable<String>
{

 @Override
 public String call() throws Exception 
 {
 // Here we do some dummy work.
 System.out.println("processing....");
 try {
 Thread.sleep(200L);
 } catch (InterruptedException e) {

 e.printStackTrace();
 }
 System.out.println("processing complete..exiting..");

 return "processing is complete";
 }

}

Posted in Java, Uncategorized | Tagged: , , , , , , | 1 Comment »

TM Forum’s Frameworx Introduction

Posted by Aayush Bhatnagar on August 14, 2011


Introduction:

The Tele Management Forum (www.tmforum.org) introduced the concept of an Integrated Business Architecture to epitomize the end-to-end view of the OSS/BSS ecosystem of a Service Provider’s network.

The Integrated Business Architecture is a convergence of the four major frameworks of the TM Forum standards.

These four major frameworks put together are collectively known as the “frameworx“;  namely –

1. eTOM (Enhanced Telecom Operations Map) which is the Business Process Framework

2. SID (Shared Information and Data Model) which is the Information Framework

3. TAM (Telecom Application Map) which is the Applications Framework

4. TNA (Technology Neutral Architecture) which is the Integration Framework

Frameworx one by one – 

The Business Process Framework (eTOM) standardizes the business process view of a Service Provider’s business operations. The business process framework is a logical layout of horizontals and vertical constructs where the horizontals concentrate on a particular domain of business operations, while the verticals are end-to-end views spanning across these horizontals.

At the intersection of these horizontals and verticals, the business process framework defines certain process elements – which are a reusable set of business activities to be performed. The service provider can stitch these process elements together to realize end-to-end business process flows.

The Information Framework (SID) standardizes and defines a set of business entities – which represent the information (entities and their attributes) which flows across the organization. The Information framework defines an information model, which can be extended and realized to present an implementable data model.

These business entities are created, read, updated and deleted (CRUD) by the process elements defined in the business process framework. The unified modelling language (UML) is used for representing the business entities and for extending them for use.

The Application Framework (TAM), defines a set of applications which are a realization of how the business processes are implemented as part of a software architecture. The TAM applications represent a set of software applications which can be used as a reference for implementation by ISVs and as a common reference map for the procurement of OSS/BSS software and systems. The TAM also presents a non-exhaustive list of capabilities for each application which is a useful tool for gauging compliance.

The TM Forum community is working on defining mappings between TAM applications, SID business entities and eTOM process elements for providing an end-to-end logical view and reference for the service provider and ISVs.

The Integration Framework defines a methodology and guideline for enabling systems integration between systems by the use of standardized ‘Business Services’. The role of the integration framework is to promote interoperability between systems and to define a common paradigm for systems integration across the industry using a SOA based integration framework realization – such as an ESB.

In addition to the ESB capabilities (which are expected), the integration framework defines a capability-dependency model of business service design. Several business services come together to form software components, which are responsible for the flow of information between disparate systems.

The integration framework is supported by the TM Forum Inteface standardization activities which include the following interfaces-

– MTOSI (Multi-Technology Operations Systems Interface)

– MTNM (Multi-Technology Network Management)

– IPDR (Internet Protocol Detail Record)

– OSS/J (OSS using Java)

– ESC (Enterprise Security)

– SPM (Service Problem Management)

Moreover, work is ongoing to harmonize the TM Forum Interfaces with the 3GPP defined Integration Reference Points (IRPs). An old post detailing the IRPs can be found here – OAM Fundamentals for IMS

Summary:

The TM Forum Frameworx provides a complete end-to-end view of business operations for any Service Provider’s network. It is necessary that for interoperability and smooth operations, ISVs and SPs comply with the TM Forum standards.

Standards compliance is required for Greenfield operations, so that the OSS/BSS implementation is done correctly the first time itself and the SP achieves fast Time to Market.

For Brownfield operations, standards conformance and alignment is required – so that future enhancements to the OSS/BSS stack can be done smoothly, and the same stack can be re-used for multiple lines of businesses.

 

Posted in 3gpp, eTOM, frameworx, MTNM, MTOSI, NMS, OAM, SID, TAM, TM Forum | Tagged: , , , , , , , , | 2 Comments »

IP Multimedia Subsystem (IMS) Market Deployment Update

Posted by Aayush Bhatnagar on May 22, 2011


3GPP IMS has been buzzing in the past 18 months. I believe this is a good time to round up the IMS deployments around the world and try to summarize them in a short post.

The list below is not a complete list. Please feel free to add more names to the list by dropping a comment and I will update the post.

However, the list vindicates the fact that IMS is no longer a piped dream and has come of age. More and more carriers around the world have “silently” adopted IMS as the platform of choice for next generation communications and services.

IMS Deployments:

1. Vodafone in the Czech Republic have Ericsson’s IMS infrastructure and Atos Origin NGIN platform deployed for business customers.

2. T-Com Hungary has an IMS deployment.

3. China Mobile has deployed Huawei’s IMS infrastructure.

4. Verizon in the USA have a multi-vendor supplied IMS core.

5. AT&T had deployed IMS long back and also deployed services such as UVerse and Video Sharing.

6. Telekom Malaysia deployed Huawei’s IMS core network platform.

7. Alcatel-Lucent has deployed its end-to-end IP Multimedia Subsystem (IMS) solution with Vietnam Posts & Telecommunications Corporation Group (VNPT) and its subsidiary Vietnam Telecom National (VTN)

8. Atheeb was the first operator to start IMS network deployment and commercial operation . Ethiad Atheeb Telecom (Atheeb) is the second largest fixed-line operator in the Kingdom of Saudi Arabia. It acquired its license in 2007, and holds a 3.5GHz spectrum across 13 regional divisions within the Kingdom. Atheeb’s licence permits fixed and wireless services such as voice telephone communications, data services, internet services, and broadband internet services via WiMAX 802.16e technology. Atheeb works on ZTE IMS platform ,Motorola’s WiMAX system and WIPRO’s OSS/BSS

9. Orange Netherlands (Now France Telecom) has an IMS deployment which has matured over the years.

10.  Portugal Telecom deployed residential telephone services on IMS

11. MTN Rwanda has a MMTel solution enhanced with Presence services.

12. Optimus Portugal has their WebPhone service called TAG deployed on IMS

13.  In Brazil, Alcatel-Lucent started to deploy an IMS network at Oi (former Brasil Telecom) to replace 385 legacy switch and provide service to 500K subscribers.

14. YTL Communications (Kuala Lumpur, Malaysia) has deployed Samsung IMS on a mobile WiMAX access network.

15. Telecom Italia in Italy provides Contact Center services based on IMS infrastructure

16. Wateen Pakistan has a WiMAX access network providing voice services over an IMS core

17. Softbank Mobile Corporation in Japan has an IMS network for several years.

18. Telia Sonera has an IMS network from Nokia Siemens Networks in addition to their OneNDS product. They serve customers in six countries – Denmark, Estonia, Finland, Lithuania, Norway and Sweden.

In addition to these, US cable players have been exploring Packet Cable based access to the IMS core to provide entertainment services. Updates on them will be posted as more information emerges.

Please feel free to add more names to this list so that it can keep evolving.

Posted in IMS | 5 Comments »