Home Data Tutorials Running Parallel Data Operations using Java Streams

Running Parallel Data Operations using Java Streams

0
5582
Java Streams
7 min read

[box type=”note” align=”” class=”” width=””]Our article is an excerpt from a book co-authored by Richard M. Reese and Jennifer L. Reese, titled Java for Data Science. This book provides in-depth understanding of important tools and techniques used across data science projects in a Java environment.[/box]

This article will give you an advantage of using Java 8 for solving complex and math-intensive problems on larger datasets using Java streams and lambda expressions. You will explore short demonstrations for performing matrix multiplication and map-reduce using Java 8.

Learn Programming & Development with a Packt Subscription

The release of Java 8 came with a number of important enhancements to the language. The two enhancements of interest to us include lambda expressions and streams. A lambda expression is essentially an anonymous function that adds a functional programming dimension to Java. The concept of streams, as introduced in Java 8, does not refer to IO streams. Instead, you can think of it as a sequence of objects that can be generated and manipulated using a fluent style of programming. This style will be demonstrated shortly.

As with most APIs, programmers must be careful to consider the actual execution performance of their code using realistic test cases and environments. If not used properly, streams may not actually provide performance improvements. In particular, parallel streams, if not crafted carefully, can produce incorrect results.

We will start with a quick introduction to lambda expressions and streams. If you are familiar with these concepts you may want to skip over the next section.

Understanding Java 8 lambda expressions and streams

A lambda expression can be expressed in several different forms. The following illustrates a simple lambda expression where the symbol, ->, is the lambda operator. This will take some value, e, and return the value multiplied by two. There is nothing special about the name e. Any valid Java variable name can be used:

e -> 2 * e

It can also be expressed in other forms, such as the following:

(int e) -> 2 * e

(double e) -> 2 * e

(int e) -> {return 2 * e;

The form used depends on the intended value of e. Lambda expressions are frequently used as arguments to a method, as we will see shortly.

A stream can be created using a number of techniques. In the following example, a stream is created from an array. The IntStream interface is a type of stream that uses integers. The Arrays class’ stream method converts an array into a stream:

IntStream stream = Arrays.stream(numbers);

We can then apply various stream methods to perform an operation. In the following statement, the
forEach method will simply display each integer in the stream:

stream.forEach(e -> out.printf("%d ", e));

There are a variety of stream methods that can be applied to a stream. In the following example, the mapToDouble method will take an integer, multiply it by 2, and then return it as a double. The forEach method will then display these values:

stream

.mapToDouble(e-> 2 * e)

.forEach(e -> out.printf("%.4f ", e));

The cascading of method invocations is referred to as fluent programing.

Using Java 8 to perform matrix multiplication

Here, we will illustrate how streams can be used to perform matrix multiplication. The definitions of the A, B, and C matrices are the same as declared in the Implementing basic matrix operations section. They are duplicated here for your convenience:

double A[][] = {

{0.1950, 0.0311},

{0.3588, 0.2203},

{0.1716, 0.5931},

{0.2105, 0.3242}};

double B[][] = {

{0.0502, 0.9823, 0.9472},

{0.5732, 0.2694, 0.916}};

double C[][] = new double[n][p];

The following sequence is a stream implementation of matrix multiplication. A detailed explanation of the code follows:

C = Arrays.stream(A)

.parallel()

.map(AMatrixRow -> IntStream.range(0, B[0].length)

.mapToDouble(i -> IntStream.range(0, B.length)

.mapToDouble(j -> AMatrixRow[j] * B[j][i])

.sum()

).toArray()).toArray(double[][]::new);

The first map method, shown as follows, creates a stream of double vectors representing the 4 rows of the A matrix. The range method will return a list of stream elements ranging from its first argument to the second argument.

.map(AMatrixRow -> IntStream.range(0, B[0].length)

The variable i corresponds to the numbers generated by the second range method, which corresponds to the number of rows in the B matrix (2). The variable j corresponds to the numbers generated by the third range method, representing the number of columns of the B matrix (3). At the heart of the statement is the matrix multiplication, where the sum method calculates the sum:

.mapToDouble(j -> AMatrixRow[j] * B[j][i])

.sum()

The last part of the expression creates the two-dimensional array for the C matrix. The operator, ::new, is called a method reference and is a shorter way of invoking the new operator to create a new object:

).toArray()).toArray(double[][]::new);

The displayResult method is as follows:

public void displayResult() {

out.println("Result");

for (int i = 0; i < n; i++) {

for (int j = 0; j < p; j++) {

out.printf("%.4f ", C[i][j]);

}

out.println();

}

}

The output of this sequence follows:

Result

0.0276 0.1999 0.2132

0.1443 0.4118 0.5417

0.3486 0.3283 0.7058

0.1964 0.2941 0.4964

Using Java 8 to perform map-reduce

In this section, we will use Java 8 streams to perform a map-reduce operation. In this example, we will use a Stream of Book objects. We will then demonstrate how to use the Java 8 reduce and average methods to get our total page count and average page count.

Rather than begin with a text file, as we did in the Hadoop example, we have created a Book class with title, author, and page-count fields. In the main method of the driver class, we have created new instances of Book and added them to an ArrayList called books. We have also created a double value average to hold our average, and initialized our variable totalPg to zero:

ArrayList<Book> books = new ArrayList<>();

double average;

int totalPg = 0;

books.add(new Book("Moby Dick", "Herman Melville", 822));

books.add(new Book("Charlotte's Web", "E.B. White", 189));

books.add(new Book("The Grapes of Wrath", "John Steinbeck", 212));

books.add(new Book("Jane Eyre", "Charlotte Bronte", 299));

books.add(new Book("A Tale of Two Cities", "Charles Dickens", 673));

books.add(new Book("War and Peace", "Leo Tolstoy", 1032));

books.add(new Book("The Great Gatsby", "F. Scott Fitzgerald", 275));

Next, we perform a map and reduce operation to calculate the total number of pages in our set of books. To accomplish this in a parallel manner, we use the stream and parallel methods. We then use the map method with a lambda expression to accumulate all of the page counts from each Book object. Finally, we use the reduce method to merge our page counts into one final value, which is to be assigned to totalPg:

totalPg = books

.stream()

.parallel()

.map((b) -> b.pgCnt)

.reduce(totalPg, (accumulator, _item) -> {

out.println(accumulator + " " +_item);

return accumulator + _item;

});

Notice in the preceding reduce method we have chosen to print out information about the reduction operation’s cumulative value and individual items. The accumulator represents the aggregation of our page counts. The _item represents the individual task within the map-reduce process undergoing reduction at any given moment.

In the output that follows, we will first see the accumulator value stay at zero as each individual book item is processed. Gradually, the accumulator value increases. The final operation is the reduction of the values 1223 and 2279. The sum of these two numbers is 3502, or the total page count for all of our books:

0 822

0 189

0 299

0 673

0 212

299 673

0 1032

0 275

1032 275

972 1307

189 212

822 401

1223 2279

Next, we will add code to calculate the average page count of our set of books. We multiply our totalPg value, determined using map-reduce, by 1.0 to prevent truncation when we divide by the integer returned by the size method. We then print out average.

average = 1.0 * totalPg / books.size();

out.printf("Average Page Count: %.4fn", average);

Our output is as follows:

Average Page Count: 500.2857

We could have used Java 8 streams to calculate the average directly using the map method. Add the following code to the main method. We use parallelStream with our map method to simultaneously get the page count for each of our books. We then use mapToDouble to ensure our data is of the correct type to calculate our average. Finally, we use the average and getAsDouble methods to calculate our average page count:

average = books

.parallelStream()

.map(b -> b.pgCnt)

.mapToDouble(s -> s)

.average()

.getAsDouble();

out.printf("Average Page Count: %.4fn", average);

Then we print out our average. Our output, identical to our previous example, is as follows:

Average Page Count: 500.2857

The above techniques leveraged Java 8 capabilities on the map-reduce framework to solve numeric problems. This type of process can also be applied to other types of data, including text-based data. The true benefit is seen when these processes handle extremely large datasets within a significant reduction in time frame.

To know various other mathematical and parallel techniques in Java for building a complete data analysis application, you may read through the book Java for Data Science to get a better integrated approach.

Java for Data Science

NO COMMENTS

LEAVE A REPLY

Please enter your comment!
Please enter your name here