dtonhofer

dtonhofer

Functional Programming in Java, Second Edition: Chapter 9, p.164 addendum to "parallel stream"

In Chapter 9, p.164 the stream is parallelized in 1 step.

This inspired my to write some test some code to call a task “in parallel”:

  • Not in parallel, inside a loop
  • Using ThreadGroups (essentially ‘temporary worker pools’) to run “slices” of the list the tasks in parallel
  • Using a Java 8 ForkJoinPool
  • Using a Java 8 parallel stream.

Not sure whether this is of interest, the most interesting part is the handling of checked and unchecked exceptions.

package chapter9;

import org.junit.jupiter.api.Test;

import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class RunningInParallel {

    // This could also implement Runnable instead.

    public static class DoSomething {

        private final int index;

        public DoSomething(int index) {
            this.index = index;
        }

        // This method should not throw any checked exceptions

        public void doSomething() {
            try {
                // Math.random is synchronized, so we can use it here
                final long sleep_ms = (long) (Math.random() * 1000.0);
                System.out.println("Thread " + index + " starts on thread '" + Thread.currentThread().getName() + "', sleeping for " + sleep_ms + " ms");
                Thread.sleep(sleep_ms);
            } catch (InterruptedException ex) {
                // Someone told us to stop sleeping, so we do!
                // Set the "interrupted" bit again and get out.
                Thread.currentThread().interrupt();
            }
        }

    }

    private List<DoSomething> createElements() {
        return IntStream.rangeClosed(1, 20)
                .mapToObj(DoSomething::new)
                .collect(Collectors.toList());
    }

    private static ThreadGroup startThreads(final int sliceStart, final int sliceEndIncl, final List<DoSomething> elements) {
        final int slizeSize = sliceEndIncl - sliceStart + 1;
        ThreadGroup tGroup = new ThreadGroup("slice [" + sliceStart + "," + sliceEndIncl + "] of size " + slizeSize);
        for (int threadIndex = 0; threadIndex < slizeSize; threadIndex++) {
            final int elementIndex = sliceStart + threadIndex;
            // no need to retain reference to the Thread, we will get it back from the ThreadGroup
            new Thread(tGroup, new Runnable() {
                @Override
                public void run() {
                    // If doSomething() threw a checked exception, we would have to catch it here
                    // If doSomething() throws a RuntimeException, the Exception is left up the stack here,
                    // terminating the worker thread.
                    elements.get(elementIndex).doSomething();
                }
            }).start();
        }
        return tGroup;
    }

    private static void waitForThreadEnd(final ThreadGroup tGroup, final int slizeSize) throws InterruptedException {
        final Thread[] threads = new Thread[slizeSize];
        final int count = tGroup.enumerate(threads);
        assert count <= slizeSize; // some threads may already have finished
        for (int i = 0; i < count; i++) {
            try {
                // Dangerous, as infinite waiting may follow, there should be a timeout value!!
                System.out.println("Joining thread " + (i + 1) + " of " + count);
                threads[i].join();
            } catch (InterruptedException ex) {
                // What should we do here? Just set the interrupt flag and throw...
                Thread.currentThread().interrupt();
                throw ex;
            }
        }
    }

    @Test
    void preJava8_singleThread() {
        final List<DoSomething> elements = Collections.unmodifiableList(createElements());
        for (DoSomething elem : elements) {
            // If doSomething() threw a checked exception, we would have to catch it here.
            // If doSomething() throws a RuntimeException, the Exception is left up the stack here.
            elem.doSomething();
        }
        System.out.println("DONE!");
    }

    @Test
    void preJava8_rollYourOwnMultipleThreads() throws InterruptedException {
        final List<DoSomething> elements = Collections.unmodifiableList(createElements());
        final int threadCount = 7;
        // Iterate over "slices" of "threadCount" threads.
        int sliceIndex = 0;
        while (sliceIndex * threadCount < elements.size()) {
            final int sliceStart = sliceIndex * threadCount;
            final int sliceEndIncl = Math.min(sliceStart + threadCount - 1, elements.size() - 1);
            final int slizeSize = sliceEndIncl - sliceStart + 1;
            ThreadGroup tGroup = startThreads(sliceStart, sliceEndIncl, elements);
            waitForThreadEnd(tGroup, slizeSize);
            System.out.println("Done with ThreadGroup '" + tGroup.getName() + "' running " + slizeSize + " threads");
            sliceIndex++;
        }
        System.out.println("DONE!");
    }

    @Test
    void java8_multipleThreadsWithForkJoinPool() throws InterruptedException {
        final List<DoSomething> elements = Collections.unmodifiableList(createElements());
        final List<ForkJoinTask<?>> tasks = new LinkedList<>();
        for (DoSomething elem : elements) {
            // If doSomething() threw a checked exception, we COULD NOT use it as argument to submit()
            // We would need to wrap doSomething().
            // Note that we use the "common pool" provided by the runtime environment.
            // We could also create our own pool instead, but why bother?
            // Note that some of the tasks will actaully be run by the main thread instead
            // of by a thread from the pool.
            tasks.add(ForkJoinPool.commonPool().submit(elem::doSomething));
        }
        for (ForkJoinTask<?> task : tasks) {
            try {
                task.get();
            } catch (ExecutionException ex) {
                // If doSomething() throws a RuntimeException it will be rethrown as an ExecutionException.
                // The thrown RuntimeException will appear as the "cause".
                System.err.println("Task failed to finish properly, got ExecutionException: '" + ex.getMessage()
                        + "' caused by: '" + ex.getCause() + "'");
            } catch (CancellationException ex) {
                System.err.println("Task was cancelled, hot CancellationException: " + ex.getMessage());
            } catch (InterruptedException ex) {
                // What should we do here? Just set the interrupt flag and throw.
                // Note the ,ethod "doSomething()" does not actually throw it.
                Thread.currentThread().interrupt();
                throw ex;
            }
        }
        System.out.println("DONE!");
    }

    @Test
    void java8_multipleThreadsWithStream() throws InterruptedException {
        final List<DoSomething> elements = Collections.unmodifiableList(createElements());
        // If doSomething() threw a checked exception (a subclass of Exception),
        // we COULD NOT use it as argument to forEach().
        // If doSomething() throws a RuntimeException, the stream pipeline would terminate
        // arbitrarily with any of the exceptions thrown in any of the threads.
        elements.stream().parallel().forEach(DoSomething::doSomething);
        System.out.println("DONE!");
    }
}

Where Next?

Popular Pragmatic Bookshelf topics Top

jimschubert
In Chapter 3, the source for index introduces Config on page 31, followed by more code including tests; Config isn’t introduced until pag...
New
GilWright
Working through the steps (checking that the Info,plist matches exactly), run the demo game and what appears is grey but does not fill th...
New
AleksandrKudashkin
On the page xv there is an instruction to run bin/setup from the main folder. I downloaded the source code today (12/03/21) and can’t see...
New
alanq
This isn’t directly about the book contents so maybe not the right forum…but in some of the code apps (e.g. turbo/06) it sends a TURBO_ST...
New
jeremyhuiskamp
Title: Web Development with Clojure, Third Edition, vB17.0 (p9) The create table guestbook syntax suggested doesn’t seem to be accepted ...
New
Chrichton
Dear Sophie. I tried to do the “Authorization” exercise and have two questions: When trying to plug in an email-service, I found the ...
New
jskubick
I’m running Android Studio “Arctic Fox” 2020.3.1 Patch 2, and I’m embarrassed to admit that I only made it to page 8 before running into ...
New
adamwoolhether
I’m not quite sure what’s going on here, but I’m unable to have to containers successfully complete the Readiness/Liveness checks. I’m im...
New
jskubick
I found an issue in Chapter 7 regarding android:backgroundTint vs app:backgroundTint. How to replicate: load chapter-7 from zipfile i...
New
New

Other popular topics Top

PragmaticBookshelf
Free and open source software is the default choice for the technologies that run our world, and it’s built and maintained by people like...
New
PragmaticBookshelf
Learn from the award-winning programming series that inspired the Elixir language, and go on a step-by-step journey through the most impo...
New
siddhant3030
I’m thinking of buying a monitor that I can rotate to use as a vertical monitor? Also, I want to know if someone is using it for program...
New
dasdom
No chair. I have a standing desk. This post was split into a dedicated thread from our thread about chairs :slight_smile:
New
PragmaticBookshelf
From finance to artificial intelligence, genetic algorithms are a powerful tool with a wide array of applications. But you don't need an ...
New
AstonJ
There’s a whole world of custom keycaps out there that I didn’t know existed! Check out all of our Keycaps threads here: https://forum....
New
AstonJ
I ended up cancelling my Moonlander order as I think it’s just going to be a bit too bulky for me. I think the Planck and the Preonic (o...
New
PragmaticBookshelf
Learn different ways of writing concurrent code in Elixir and increase your application's performance, without sacrificing scalability or...
New
PragmaticBookshelf
Author Spotlight Mike Riley @mriley This month, we turn the spotlight on Mike Riley, author of Portable Python Projects. Mike’s book ...
New
AstonJ
This is a very quick guide, you just need to: Download LM Studio: https://lmstudio.ai/ Click on search Type DeepSeek, then select the o...
New

Latest in Functional Programming in Java, Second Edition

Functional Programming in Java, Second Edition Portal

Sub Categories: