dtonhofer

dtonhofer

Functional Programming in Java, Second Edition: Chapter 9, p.164 addendum to "parallel stream"

In Chapter 9, p.164 the stream is parallelized in 1 step.

This inspired my to write some test some code to call a task “in parallel”:

  • Not in parallel, inside a loop
  • Using ThreadGroups (essentially ‘temporary worker pools’) to run “slices” of the list the tasks in parallel
  • Using a Java 8 ForkJoinPool
  • Using a Java 8 parallel stream.

Not sure whether this is of interest, the most interesting part is the handling of checked and unchecked exceptions.

package chapter9;

import org.junit.jupiter.api.Test;

import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class RunningInParallel {

    // This could also implement Runnable instead.

    public static class DoSomething {

        private final int index;

        public DoSomething(int index) {
            this.index = index;
        }

        // This method should not throw any checked exceptions

        public void doSomething() {
            try {
                // Math.random is synchronized, so we can use it here
                final long sleep_ms = (long) (Math.random() * 1000.0);
                System.out.println("Thread " + index + " starts on thread '" + Thread.currentThread().getName() + "', sleeping for " + sleep_ms + " ms");
                Thread.sleep(sleep_ms);
            } catch (InterruptedException ex) {
                // Someone told us to stop sleeping, so we do!
                // Set the "interrupted" bit again and get out.
                Thread.currentThread().interrupt();
            }
        }

    }

    private List<DoSomething> createElements() {
        return IntStream.rangeClosed(1, 20)
                .mapToObj(DoSomething::new)
                .collect(Collectors.toList());
    }

    private static ThreadGroup startThreads(final int sliceStart, final int sliceEndIncl, final List<DoSomething> elements) {
        final int slizeSize = sliceEndIncl - sliceStart + 1;
        ThreadGroup tGroup = new ThreadGroup("slice [" + sliceStart + "," + sliceEndIncl + "] of size " + slizeSize);
        for (int threadIndex = 0; threadIndex < slizeSize; threadIndex++) {
            final int elementIndex = sliceStart + threadIndex;
            // no need to retain reference to the Thread, we will get it back from the ThreadGroup
            new Thread(tGroup, new Runnable() {
                @Override
                public void run() {
                    // If doSomething() threw a checked exception, we would have to catch it here
                    // If doSomething() throws a RuntimeException, the Exception is left up the stack here,
                    // terminating the worker thread.
                    elements.get(elementIndex).doSomething();
                }
            }).start();
        }
        return tGroup;
    }

    private static void waitForThreadEnd(final ThreadGroup tGroup, final int slizeSize) throws InterruptedException {
        final Thread[] threads = new Thread[slizeSize];
        final int count = tGroup.enumerate(threads);
        assert count <= slizeSize; // some threads may already have finished
        for (int i = 0; i < count; i++) {
            try {
                // Dangerous, as infinite waiting may follow, there should be a timeout value!!
                System.out.println("Joining thread " + (i + 1) + " of " + count);
                threads[i].join();
            } catch (InterruptedException ex) {
                // What should we do here? Just set the interrupt flag and throw...
                Thread.currentThread().interrupt();
                throw ex;
            }
        }
    }

    @Test
    void preJava8_singleThread() {
        final List<DoSomething> elements = Collections.unmodifiableList(createElements());
        for (DoSomething elem : elements) {
            // If doSomething() threw a checked exception, we would have to catch it here.
            // If doSomething() throws a RuntimeException, the Exception is left up the stack here.
            elem.doSomething();
        }
        System.out.println("DONE!");
    }

    @Test
    void preJava8_rollYourOwnMultipleThreads() throws InterruptedException {
        final List<DoSomething> elements = Collections.unmodifiableList(createElements());
        final int threadCount = 7;
        // Iterate over "slices" of "threadCount" threads.
        int sliceIndex = 0;
        while (sliceIndex * threadCount < elements.size()) {
            final int sliceStart = sliceIndex * threadCount;
            final int sliceEndIncl = Math.min(sliceStart + threadCount - 1, elements.size() - 1);
            final int slizeSize = sliceEndIncl - sliceStart + 1;
            ThreadGroup tGroup = startThreads(sliceStart, sliceEndIncl, elements);
            waitForThreadEnd(tGroup, slizeSize);
            System.out.println("Done with ThreadGroup '" + tGroup.getName() + "' running " + slizeSize + " threads");
            sliceIndex++;
        }
        System.out.println("DONE!");
    }

    @Test
    void java8_multipleThreadsWithForkJoinPool() throws InterruptedException {
        final List<DoSomething> elements = Collections.unmodifiableList(createElements());
        final List<ForkJoinTask<?>> tasks = new LinkedList<>();
        for (DoSomething elem : elements) {
            // If doSomething() threw a checked exception, we COULD NOT use it as argument to submit()
            // We would need to wrap doSomething().
            // Note that we use the "common pool" provided by the runtime environment.
            // We could also create our own pool instead, but why bother?
            // Note that some of the tasks will actaully be run by the main thread instead
            // of by a thread from the pool.
            tasks.add(ForkJoinPool.commonPool().submit(elem::doSomething));
        }
        for (ForkJoinTask<?> task : tasks) {
            try {
                task.get();
            } catch (ExecutionException ex) {
                // If doSomething() throws a RuntimeException it will be rethrown as an ExecutionException.
                // The thrown RuntimeException will appear as the "cause".
                System.err.println("Task failed to finish properly, got ExecutionException: '" + ex.getMessage()
                        + "' caused by: '" + ex.getCause() + "'");
            } catch (CancellationException ex) {
                System.err.println("Task was cancelled, hot CancellationException: " + ex.getMessage());
            } catch (InterruptedException ex) {
                // What should we do here? Just set the interrupt flag and throw.
                // Note the ,ethod "doSomething()" does not actually throw it.
                Thread.currentThread().interrupt();
                throw ex;
            }
        }
        System.out.println("DONE!");
    }

    @Test
    void java8_multipleThreadsWithStream() throws InterruptedException {
        final List<DoSomething> elements = Collections.unmodifiableList(createElements());
        // If doSomething() threw a checked exception (a subclass of Exception),
        // we COULD NOT use it as argument to forEach().
        // If doSomething() throws a RuntimeException, the stream pipeline would terminate
        // arbitrarily with any of the exceptions thrown in any of the threads.
        elements.stream().parallel().forEach(DoSomething::doSomething);
        System.out.println("DONE!");
    }
}

Where Next?

Popular Pragmatic Bookshelf topics Top

johnp
Running the examples in chapter 5 c under pytest 5.4.1 causes an AttributeError: ‘module’ object has no attribute ‘config’. In particula...
New
jeffmcompsci
Title: Design and Build Great Web APIs - typo “https://company-atk.herokuapp.com/2258ie4t68jv” (page 19, third bullet in URL list) Typo:...
New
jdufour
Hello! On page xix of the preface, it says there is a community forum "… for help if your’re stuck on one of the exercises in this book… ...
New
JohnS
I can’t setup the Rails source code. This happens in a working directory containing multiple (postgres) Rails apps. With: ruby-3.0.0 s...
New
raul
Page 28: It implements io.ReaderAt on the store type. Sorry if it’s a dumb question but was the io.ReaderAt supposed to be io.ReadAt? ...
New
cro
I am working on the “Your Turn” for chapter one and building out the restart button talked about on page 27. It recommends looking into ...
New
jskubick
I’m under the impression that when the reader gets to page 136 (“View Data with the Database Inspector”), the code SHOULD be able to buil...
New
Keton
When running the program in chapter 8, “Implementing Combat”, the printout Health before attack was never printed so I assumed something ...
New
SlowburnAZ
Getting an error when installing the dependencies at the start of this chapter: could not compile dependency :exla, "mix compile" failed...
New
dachristenson
I just bought this book to learn about Android development, and I’m already running into a major issue in Ch. 1, p. 20: “Update activity...
New

Other popular topics Top

Exadra37
Please tell us what is your preferred monitor setup for programming(not gaming) and why you have chosen it. Does your monitor have eye p...
New
dasdom
No chair. I have a standing desk. This post was split into a dedicated thread from our thread about chairs :slight_smile:
New
AstonJ
I’ve been hearing quite a lot of comments relating to the sound of a keyboard, with one of the most desirable of these called ‘thock’, he...
New
dimitarvp
Small essay with thoughts on macOS vs. Linux: I know @Exadra37 is just waiting around the corner to scream at me “I TOLD YOU SO!!!” but I...
New
Exadra37
Oh just spent so much time on this to discover now that RancherOS is in end of life but Rancher is refusing to mark the Github repo as su...
New
PragmaticBookshelf
Author Spotlight Rebecca Skinner @RebeccaSkinner Welcome to our latest author spotlight, where we sit down with Rebecca Skinner, auth...
New
PragmaticBookshelf
Leverage Elixir and the Nx ecosystem to build intelligent applications that solve real-world problems in computer vision, natural languag...
New
sir.laksmana_wenk
I’m able to do the “artistic” part of game-development; character designing/modeling, music, environment modeling, etc. However, I don’t...
New
AstonJ
If you’re getting errors like this: psql: error: connection to server on socket “/tmp/.s.PGSQL.5432” failed: No such file or directory ...
New
PragmaticBookshelf
Fight complexity and reclaim the original spirit of agility by learning to simplify how you develop software. The result: a more humane a...
New

Latest in Functional Programming in Java, Second Edition

Functional Programming in Java, Second Edition Portal

Sub Categories: