dtonhofer

dtonhofer

Functional Programming in Java, Second Edition: Chapter 9, p.164 addendum to "parallel stream"

In Chapter 9, p.164 the stream is parallelized in 1 step.

This inspired my to write some test some code to call a task “in parallel”:

  • Not in parallel, inside a loop
  • Using ThreadGroups (essentially ‘temporary worker pools’) to run “slices” of the list the tasks in parallel
  • Using a Java 8 ForkJoinPool
  • Using a Java 8 parallel stream.

Not sure whether this is of interest, the most interesting part is the handling of checked and unchecked exceptions.

package chapter9;

import org.junit.jupiter.api.Test;

import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class RunningInParallel {

    // This could also implement Runnable instead.

    public static class DoSomething {

        private final int index;

        public DoSomething(int index) {
            this.index = index;
        }

        // This method should not throw any checked exceptions

        public void doSomething() {
            try {
                // Math.random is synchronized, so we can use it here
                final long sleep_ms = (long) (Math.random() * 1000.0);
                System.out.println("Thread " + index + " starts on thread '" + Thread.currentThread().getName() + "', sleeping for " + sleep_ms + " ms");
                Thread.sleep(sleep_ms);
            } catch (InterruptedException ex) {
                // Someone told us to stop sleeping, so we do!
                // Set the "interrupted" bit again and get out.
                Thread.currentThread().interrupt();
            }
        }

    }

    private List<DoSomething> createElements() {
        return IntStream.rangeClosed(1, 20)
                .mapToObj(DoSomething::new)
                .collect(Collectors.toList());
    }

    private static ThreadGroup startThreads(final int sliceStart, final int sliceEndIncl, final List<DoSomething> elements) {
        final int slizeSize = sliceEndIncl - sliceStart + 1;
        ThreadGroup tGroup = new ThreadGroup("slice [" + sliceStart + "," + sliceEndIncl + "] of size " + slizeSize);
        for (int threadIndex = 0; threadIndex < slizeSize; threadIndex++) {
            final int elementIndex = sliceStart + threadIndex;
            // no need to retain reference to the Thread, we will get it back from the ThreadGroup
            new Thread(tGroup, new Runnable() {
                @Override
                public void run() {
                    // If doSomething() threw a checked exception, we would have to catch it here
                    // If doSomething() throws a RuntimeException, the Exception is left up the stack here,
                    // terminating the worker thread.
                    elements.get(elementIndex).doSomething();
                }
            }).start();
        }
        return tGroup;
    }

    private static void waitForThreadEnd(final ThreadGroup tGroup, final int slizeSize) throws InterruptedException {
        final Thread[] threads = new Thread[slizeSize];
        final int count = tGroup.enumerate(threads);
        assert count <= slizeSize; // some threads may already have finished
        for (int i = 0; i < count; i++) {
            try {
                // Dangerous, as infinite waiting may follow, there should be a timeout value!!
                System.out.println("Joining thread " + (i + 1) + " of " + count);
                threads[i].join();
            } catch (InterruptedException ex) {
                // What should we do here? Just set the interrupt flag and throw...
                Thread.currentThread().interrupt();
                throw ex;
            }
        }
    }

    @Test
    void preJava8_singleThread() {
        final List<DoSomething> elements = Collections.unmodifiableList(createElements());
        for (DoSomething elem : elements) {
            // If doSomething() threw a checked exception, we would have to catch it here.
            // If doSomething() throws a RuntimeException, the Exception is left up the stack here.
            elem.doSomething();
        }
        System.out.println("DONE!");
    }

    @Test
    void preJava8_rollYourOwnMultipleThreads() throws InterruptedException {
        final List<DoSomething> elements = Collections.unmodifiableList(createElements());
        final int threadCount = 7;
        // Iterate over "slices" of "threadCount" threads.
        int sliceIndex = 0;
        while (sliceIndex * threadCount < elements.size()) {
            final int sliceStart = sliceIndex * threadCount;
            final int sliceEndIncl = Math.min(sliceStart + threadCount - 1, elements.size() - 1);
            final int slizeSize = sliceEndIncl - sliceStart + 1;
            ThreadGroup tGroup = startThreads(sliceStart, sliceEndIncl, elements);
            waitForThreadEnd(tGroup, slizeSize);
            System.out.println("Done with ThreadGroup '" + tGroup.getName() + "' running " + slizeSize + " threads");
            sliceIndex++;
        }
        System.out.println("DONE!");
    }

    @Test
    void java8_multipleThreadsWithForkJoinPool() throws InterruptedException {
        final List<DoSomething> elements = Collections.unmodifiableList(createElements());
        final List<ForkJoinTask<?>> tasks = new LinkedList<>();
        for (DoSomething elem : elements) {
            // If doSomething() threw a checked exception, we COULD NOT use it as argument to submit()
            // We would need to wrap doSomething().
            // Note that we use the "common pool" provided by the runtime environment.
            // We could also create our own pool instead, but why bother?
            // Note that some of the tasks will actaully be run by the main thread instead
            // of by a thread from the pool.
            tasks.add(ForkJoinPool.commonPool().submit(elem::doSomething));
        }
        for (ForkJoinTask<?> task : tasks) {
            try {
                task.get();
            } catch (ExecutionException ex) {
                // If doSomething() throws a RuntimeException it will be rethrown as an ExecutionException.
                // The thrown RuntimeException will appear as the "cause".
                System.err.println("Task failed to finish properly, got ExecutionException: '" + ex.getMessage()
                        + "' caused by: '" + ex.getCause() + "'");
            } catch (CancellationException ex) {
                System.err.println("Task was cancelled, hot CancellationException: " + ex.getMessage());
            } catch (InterruptedException ex) {
                // What should we do here? Just set the interrupt flag and throw.
                // Note the ,ethod "doSomething()" does not actually throw it.
                Thread.currentThread().interrupt();
                throw ex;
            }
        }
        System.out.println("DONE!");
    }

    @Test
    void java8_multipleThreadsWithStream() throws InterruptedException {
        final List<DoSomething> elements = Collections.unmodifiableList(createElements());
        // If doSomething() threw a checked exception (a subclass of Exception),
        // we COULD NOT use it as argument to forEach().
        // If doSomething() throws a RuntimeException, the stream pipeline would terminate
        // arbitrarily with any of the exceptions thrown in any of the threads.
        elements.stream().parallel().forEach(DoSomething::doSomething);
        System.out.println("DONE!");
    }
}

Where Next?

Popular Pragmatic Bookshelf topics Top

johnp
Running the examples in chapter 5 c under pytest 5.4.1 causes an AttributeError: ‘module’ object has no attribute ‘config’. In particula...
New
Alexandr
Hi everyone! There is an error on the page 71 in the book “Programming machine learning from coding to depp learning” P. Perrotta. You c...
New
gilesdotcodes
In case this helps anyone, I’ve had issues setting up the rails source code. Here were the solutions: In Gemfile, change gem 'rails' t...
New
swlaschin
The book has the same “Problem space/Solution space” diagram on page 18 as is on page 17. The correct Problem/Solution space diagrams ar...
New
jskubick
I’m running Android Studio “Arctic Fox” 2020.3.1 Patch 2, and I’m embarrassed to admit that I only made it to page 8 before running into ...
New
fynn
This is as much a suggestion as a question, as a note for others. Locally the SGP30 wasn’t available, so I ordered a SGP40. On page 53, ...
New
brunogirin
When installing Cards as an editable package, I get the following error: ERROR: File “setup.py” not found. Directory cannot be installe...
New
AufHe
I’m a newbie to Rails 7 and have hit an issue with the bin/Dev script mentioned on pages 112-113. Iteration A1 - Seeing the list of prod...
New
ggerico
I got this error when executing the plot files on macOS Ventura 13.0.1 with Python 3.10.8 and matplotlib 3.6.1: programming_ML/code/03_...
New
New

Other popular topics Top

PragmaticBookshelf
Brace yourself for a fun challenge: build a photorealistic 3D renderer from scratch! In just a couple of weeks, build a ray tracer that r...
New
DevotionGeo
I know that these benchmarks might not be the exact picture of real-world scenario, but still I expect a Rust web framework performing a ...
New
PragmaticBookshelf
Tailwind CSS is an exciting new CSS framework that allows you to design your site by composing simple utility classes to create complex e...
New
Maartz
Hi folks, I don’t know if I saw this here but, here’s a new programming language, called Roc Reminds me a bit of Elm and thus Haskell. ...
New
mafinar
This is going to be a long an frequently posted thread. While talking to a friend of mine who has taken data structure and algorithm cou...
New
AstonJ
Was just curious to see if any were around, found this one: I got 51/100: Not sure if it was meant to buy I am sure at times the b...
New
PragmaticBookshelf
Author Spotlight: Peter Ullrich @PJUllrich Data is at the core of every business, but it is useless if nobody can access and analyze ...
New
New
PragmaticBookshelf
Build modern server-driven web applications using htmx. Whatever programming language you use, you’ll write less (and cleaner) code. ...
New
PragmaticBookshelf
Explore the power of Ash Framework by modeling and building the domain for a real-world web application. Rebecca Le @sevenseacat and ...
New

Latest in Functional Programming in Java, Second Edition

Functional Programming in Java, Second Edition Portal

Sub Categories: