Tuesday, April 14, 2015

CompletionService in Java Concurrency

Java has provided us very rich APIs' for implementing various multi-threading problems. The most common problem which we have been reading about is producer-consumer problem. There are multiple solutions to this problem and most of it depends on the application requirement. CompletionService is one of the solutions provided by JDK for producer-consumer problem.

The CompletionService decouples the production of new asynchronous tasks from the consumption of the results of completed tasks.  Producers submit tasks for execution. Consumers take completed tasks and process their results in the order they complete. The producers and consumers doesn't need to know each other. Also, you don't have to write any complex code to handle this.

In our application, we connect to a third party system to retrieve the data requested by user. On the once user request, we need to send multiple requests to the third party system and then send the data asynchronously to user. There will be multiple responses sent to user for one request. It's like data is sent in chunks to the user. We implemented this using CompletionService, which was very easy to implement. The amount of code required for this was also very less. We submit the requests to the CompletionService and then just wait for the response on CompletionService. As soon as we receive response, we send that to user.

See the example below: -

import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Executors;

public class ExecutorCompletionServiceExample {
public static void main(String[] args) throws InterruptedException,
ExecutionException {
Executor ex = Executors.newCachedThreadPool();
CompletionService<String> cs = new ExecutorCompletionService<String>(ex);
cs.submit(new WorkerThread("Thread - 1"));
cs.submit(new WorkerThread("Thread - 2"));
cs.submit(new WorkerThread("Thread - 3"));
for (int i = 0; i < 3; i++) {
String result = cs.take().get();
System.out.println("Result of the task: " + result);
}
}

}

class WorkerThread implements Callable<String> {
private String threadId;

public WorkerThread(String threadId) {
super();
this.threadId = threadId;
}

public String call() throws Exception {
return threadId + ":- " + System.currentTimeMillis();
}

}

No comments:

Post a Comment