11 min read

In this article by Javier Fernández González, author of the book Java 9 Concurrency Cookbook – Second Edition we will cover how to run tasks asynchronously.

When you execute ForkJoinTask in ForkJoinPool, you can do it in a synchronous or asynchronous way. When you do it in a synchronous way, the method that sends the task to the pool doesn’t return until the task sent finishes its execution. When you do it in an asynchronous way, the method that sends the task to the executor returns immediately, so the task can continue with its execution.

(For more resources related to this topic, see here.)

You should be aware of a big difference between the two methods. When you use the synchronized methods, the task that calls one of these methods (for example, the invokeAll() method) is suspended until the tasks it sent to the pool finish their execution. This allows the ForkJoinPool class to use the work-stealing algorithm to assign a new task to the worker thread that executed the sleeping task. On the contrary, when you use the asynchronous methods (for example, the fork() method), the task continues with its execution, so the ForkJoinPool class can’t use the work-stealing algorithm to increase the performance of the application. In this case, only when you call the join() or get() methods to wait for the finalization of a task, the ForkJoinPool class can use that algorithm.

In addition to RecursiveAction and RecursiveTask classes, Java 8 introduced a new ForkJoinTask with the CountedCompleter class. With this kind of tasks you can include a completion action that will be executed when is launched and there is no child pending tasks. This mechanism is based in a method included in the class (the onCompletion() method) and a counter of pending tasks.

This counter is initialized to zero by default and you can increment it when you need in an atomic way. Normally, you will increment this counter one by one when you launch a child task. Finally, when a task has finished is execution, you can try to complete the execution of the task and consequently, executes the onCompletion() method. If the pending count is bigger than zero, it is decremented by one. If it’s zero, the onCompletion() method is executed and then the parent task is tried to complete.

In this article, you will learn how to use the asynchronous methods provided by the ForkJoinPool and CountedCompleter classes for the management of tasks. You are going to implement a program that will search for files with a determined extension inside a folder and its subfolders. The CountedCompleter class you’re going to implement will process the content of a folder. For each subfolder inside that folder, it will send a new task to the ForkJoinPool class in an asynchronous way. For each file inside that folder, the task will check the extension of the file and add it to the result list if it proceeds. When a task is completed, it will insert the result lists of all its child tasks in its result task.

How to do it…

Follow these steps to implement the example:

  1. Create a class named FolderProcessor and specify that it extends the CountedCompleter class parameterized with the List<String> type.
    public class FolderProcessor extends CountedCompleter<List<String>> {
  2. Declare the serial version UID of the class. This element is necessary because the parent class of the RecursiveTask class, the ForkJoinTask class, implements the Serializable interface.
    private static final long serialVersionUID = -1826436670135695513L;
  3. Declare a private String attribute named path. This attribute will store the full path of the folder this task is going to process.
    private String path;
  4. Declare a private String attribute named extension. This attribute will store the name of the extension of the files this task is going to look for.
    private String extension;
  5. Declare two List private attributes named tasks and resultList. We will use the first one to store all the child tasks launched from this task and the other one to store the list of results of this task.
    private List<FolderProcessor> tasks;
    private List<String> resultList;
  6. Implement one constructor for the class to initialize its attributes and its parent class. We declared this constructor as protected as it will only be used internally
    protected FolderProcessor (CountedCompleter<?> completer, String path, String extension) {
      super(completer);
      this.path=path;
      this.extension=extension;
    }
    
    • We implement other public constructor to be used externally. As the task created by this constructor won’t have parent task, we don’t include this object as parameter.
    public FolderProcessor (String path, String extension) {
      this.path=path;
      this.extension=extension;
    }
    
  7. Implement the compute() method. As the base class of our task is the CountedCompleter class, the return type of this method is void.
    @Override
      protected void compute() {
  8. First, initialize the two list attributes.
    resultList=new ArrayList<>();
      tasks=new ArrayList<>();
    
  9. Get the content of the folder.
    File file=new File(path);
    File content[] = file.listFiles();
    
  10. For each element in the folder, if there is a subfolder, create a new FolderProcessor object and execute it asynchronously using the fork() method. We use the first constructor of the class and pass the current task as the completer task of the new one. We also increment the counter of pending tasks using the addToPendingCount() method.
    if (content != null) {
          for (int i = 0; i < content.length; i++) {
            if (content[i].isDirectory()) {
              FolderProcessor task=new FolderProcessor(this, content[i].getAbsolutePath(), extension);
              task.fork();
     addToPendingCount(1);
              tasks.add(task);
    
  11. Otherwise, compare the extension of the file with the extension you are looking for using the checkFile() method and, if they are equal, store the full path of the file in the list of strings declared earlier.
    } else {
              if (checkFile(content[i].getName())){
                list.add(content[i].getAbsolutePath());
              }
            }
          }
    
  12. If the list of the FolderProcessor subtasks has more than 50 elements, write a message to the console to indicate this circumstance.
    if (tasks.size()>50) {
            System.out.printf("%s: %d tasks ran.n",file.getAbsolutePath(),tasks.size());
          }
    }
    
    • Finally, try to complete the current task using the tryComplete() method:
      tryComplete();
      }
      
    • Implement the onCompletion() method. This method will be executed when all the child tasks (all the tasks that have been forked from the current task) have finished their execution. We add the result list of all the child tasks to the result list of the current task.
      @Override
      public void onCompletion(CountedCompleter<?> completer) {
        for (FolderProcessor childTask : tasks) {
          resultList.addAll(childTask.getResultList());
        }
      }
      
  13. Implement the checkFile() method. This method compares if the name of a file passed as a parameter ends with the extension you are looking for. If so, the method returns the true value, otherwise it returns the false value.
    private boolean checkFile(String name) {
         return name.endsWith(extension);
      }
    
  14. Finally, implement the getResultList() method to return the result list of a task. The code of this method is very simple so it won’t be included.
  15. Implement the main class of the example by creating a class named Main with a main() method.
    public class Main {
      public static void main(String[] args) {
    
  16. Create ForkJoinPool using the default constructor.
    ForkJoinPool pool=new ForkJoinPool();
  17. Create three FolderProcessor tasks. Initialize each one with a different folder path.
    FolderProcessor system=new FolderProcessor("C:\Windows", "log");
        FolderProcessor apps=new
    FolderProcessor("C:\Program Files","log");
        FolderProcessor documents=new FolderProcessor("C:\Documents And Settings","log");
    
  18. Execute the three tasks in the pool using the execute() method.
    pool.execute(system);
        pool.execute(apps);
        pool.execute(documents);
    
  19. Write to the console information about the status of the pool every second until the three tasks have finished their execution.
    do {
          System.out.printf("******************************************n");
          System.out.printf("Main: Parallelism: %dn",pool.getParallelism());
          System.out.printf("Main: Active Threads: %dn",pool.getActiveThreadCount());
          System.out.printf("Main: Task Count: %dn",pool.getQueuedTaskCount());
          System.out.printf("Main: Steal Count: %dn",pool.getStealCount());
          System.out.printf("******************************************n");
          try {
            TimeUnit.SECONDS.sleep(1);
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        } while ((!system.isDone())||(!apps.isDone())||(!documents.isDone()));
    
  20. Shut down ForkJoinPool using the shutdown() method.
    pool.shutdown();
  21. Write the number of results generated by each task to the console.
    List<String> results;
        
        results=system.join();
        System.out.printf("System: %d files found.n",results.size());
        
        results=apps.join();
        System.out.printf("Apps: %d files found.n",results.size());
        
        results=documents.join();
        System.out.printf("Documents: %d files found.n",results.size());
    

How it works…

The following screenshot shows part of an execution of this example:

Java 9 Concurrency Cookbook - Second Edition

The key of this example is in the FolderProcessor class. Each task processes the content of a folder. As you know, this content has the following two kinds of elements:

  • Files
  • Other folders

If the task finds a folder, it creates another FolderProcessor object to process that folder and sends it to the pool using the fork() method. This method sends the task to the pool that will execute it if it has a free worker-thread or it can create a new one. The method returns immediately, so the task can continue processing the content of the folder. For every file, a task compares its extension with the one it’s looking for and, if they are equal, adds the name of the file to the list of results.

Once the task has processed all the content of the assigned folder, we try to complete the current task. As we explained in the introduction of this article, when we try to complete a task, the code of the CountedCompleter looks for the value of the pending task counter. If this value is bigger than 0, it decrease of that counter. On the contrary, if the value is 0, the task executes the onCompletion() method and then try to completes its parent task. In our case, when a task is processing a folder and it finds a subfolder, it creates a new child task, launch that task using the fork() method and increment the counter of pending tasks. So, when a task has processed all its content, the counter of pending tasks of the task will be equal to the number of child tasks we have launched. When we call the tryComplete() method, if the folder of the current task has subfolders, this call will decrease the number of pending tasks. Only when all its child tasks have been completed, its onCompletion() method is executed. If the folder of the current task hasn’t got any subfolders, the counter of pending tasks will be zero and the onComplete() method will be called immediately and then it will try to complete its parent task. By this way, we create a tree of tasks from top to bottom that are completed from bottom to top. In the onComplete() method, we process all the result lists of the child tasks and add their elements in the result list of the current task.

The ForkJoinPool class also allows the execution of tasks in an asynchronous way. You have used the execute() method to send the three initial tasks to the pool. In the Main class, you also finished the pool using the shutdown() method and wrote information about the status and the evolution of the tasks that are running in it. The ForkJoinPool class includes more methods that can be useful for this purpose.

There’s more…

In this example we have used the addToPendingCount() method to increment the counter of pending tasks, but we have other methods we can use to change the value of this counter.

  • setPendingCount(): This method establish the value of the counter of pending tasks.
  • compareAndSetPendingCount(): This method receives two parameters. The first one is the expected value and the second one is the new value. If the value of the counter of pending tasks is equal to the expected value, establish its value to the new one.
  • decrementPendingCountUnlessZero(): This method decrements the value of the counter of pending tasks unless it’s equal to zero.

The CountedCompleter class also includes other methods to manage the completion of the tasks. These are the most significant ones:

  • complete(): This method executes the onCompletion() method independently of the value of the counter of pending tasks try to complete its completer (parent) task.
  • onExceptionalCompletion(): This method is executed when the completeExceptionally() method has been called or the compute() method has thrown an Exception. Override this method to include your code to process those exceptions.

In this example, you have used the join() method to wait for the finalization of tasks and get their results. You can also use one of the two versions of the get() method with this purpose:

  • get(long timeout, TimeUnit unit): This version of the get() method, if the result of the task isn’t available, waits the specified time for it. If the specified period of time passes and the result isn’t yet available, the method returns a null value. The TimeUnit class is an enumeration with the following constants: DAYS, HOURS, MICROSECONDS, MILLISECONDS, MINUTES, NANOSECONDS, and SECONDS. The join() method can’t be interrupted. If you interrupt the thread that called the join() method, the method throws an InterruptedException exception.

Summary

In this article we learned how to use the asynchronous methods provided by the ForkJoinPool and CountedCompleter classes for the management of tasks.

Resources for Article:


Further resources on this subject:


LEAVE A REPLY

Please enter your comment!
Please enter your name here