
dtonhofer
Functional Programming in Java, Second Edition: Chapter 9, p.164 addendum to "parallel stream"
In Chapter 9, p.164 the stream is parallelized in 1 step.
This inspired my to write some test some code to call a task “in parallel”:
- Not in parallel, inside a loop
- Using ThreadGroups (essentially ‘temporary worker pools’) to run “slices” of the list the tasks in parallel
- Using a Java 8
ForkJoinPool
- Using a Java 8 parallel stream.
Not sure whether this is of interest, the most interesting part is the handling of checked and unchecked exceptions.
package chapter9;
import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class RunningInParallel {
// This could also implement Runnable instead.
public static class DoSomething {
private final int index;
public DoSomething(int index) {
this.index = index;
}
// This method should not throw any checked exceptions
public void doSomething() {
try {
// Math.random is synchronized, so we can use it here
final long sleep_ms = (long) (Math.random() * 1000.0);
System.out.println("Thread " + index + " starts on thread '" + Thread.currentThread().getName() + "', sleeping for " + sleep_ms + " ms");
Thread.sleep(sleep_ms);
} catch (InterruptedException ex) {
// Someone told us to stop sleeping, so we do!
// Set the "interrupted" bit again and get out.
Thread.currentThread().interrupt();
}
}
}
private List<DoSomething> createElements() {
return IntStream.rangeClosed(1, 20)
.mapToObj(DoSomething::new)
.collect(Collectors.toList());
}
private static ThreadGroup startThreads(final int sliceStart, final int sliceEndIncl, final List<DoSomething> elements) {
final int slizeSize = sliceEndIncl - sliceStart + 1;
ThreadGroup tGroup = new ThreadGroup("slice [" + sliceStart + "," + sliceEndIncl + "] of size " + slizeSize);
for (int threadIndex = 0; threadIndex < slizeSize; threadIndex++) {
final int elementIndex = sliceStart + threadIndex;
// no need to retain reference to the Thread, we will get it back from the ThreadGroup
new Thread(tGroup, new Runnable() {
@Override
public void run() {
// If doSomething() threw a checked exception, we would have to catch it here
// If doSomething() throws a RuntimeException, the Exception is left up the stack here,
// terminating the worker thread.
elements.get(elementIndex).doSomething();
}
}).start();
}
return tGroup;
}
private static void waitForThreadEnd(final ThreadGroup tGroup, final int slizeSize) throws InterruptedException {
final Thread[] threads = new Thread[slizeSize];
final int count = tGroup.enumerate(threads);
assert count <= slizeSize; // some threads may already have finished
for (int i = 0; i < count; i++) {
try {
// Dangerous, as infinite waiting may follow, there should be a timeout value!!
System.out.println("Joining thread " + (i + 1) + " of " + count);
threads[i].join();
} catch (InterruptedException ex) {
// What should we do here? Just set the interrupt flag and throw...
Thread.currentThread().interrupt();
throw ex;
}
}
}
@Test
void preJava8_singleThread() {
final List<DoSomething> elements = Collections.unmodifiableList(createElements());
for (DoSomething elem : elements) {
// If doSomething() threw a checked exception, we would have to catch it here.
// If doSomething() throws a RuntimeException, the Exception is left up the stack here.
elem.doSomething();
}
System.out.println("DONE!");
}
@Test
void preJava8_rollYourOwnMultipleThreads() throws InterruptedException {
final List<DoSomething> elements = Collections.unmodifiableList(createElements());
final int threadCount = 7;
// Iterate over "slices" of "threadCount" threads.
int sliceIndex = 0;
while (sliceIndex * threadCount < elements.size()) {
final int sliceStart = sliceIndex * threadCount;
final int sliceEndIncl = Math.min(sliceStart + threadCount - 1, elements.size() - 1);
final int slizeSize = sliceEndIncl - sliceStart + 1;
ThreadGroup tGroup = startThreads(sliceStart, sliceEndIncl, elements);
waitForThreadEnd(tGroup, slizeSize);
System.out.println("Done with ThreadGroup '" + tGroup.getName() + "' running " + slizeSize + " threads");
sliceIndex++;
}
System.out.println("DONE!");
}
@Test
void java8_multipleThreadsWithForkJoinPool() throws InterruptedException {
final List<DoSomething> elements = Collections.unmodifiableList(createElements());
final List<ForkJoinTask<?>> tasks = new LinkedList<>();
for (DoSomething elem : elements) {
// If doSomething() threw a checked exception, we COULD NOT use it as argument to submit()
// We would need to wrap doSomething().
// Note that we use the "common pool" provided by the runtime environment.
// We could also create our own pool instead, but why bother?
// Note that some of the tasks will actaully be run by the main thread instead
// of by a thread from the pool.
tasks.add(ForkJoinPool.commonPool().submit(elem::doSomething));
}
for (ForkJoinTask<?> task : tasks) {
try {
task.get();
} catch (ExecutionException ex) {
// If doSomething() throws a RuntimeException it will be rethrown as an ExecutionException.
// The thrown RuntimeException will appear as the "cause".
System.err.println("Task failed to finish properly, got ExecutionException: '" + ex.getMessage()
+ "' caused by: '" + ex.getCause() + "'");
} catch (CancellationException ex) {
System.err.println("Task was cancelled, hot CancellationException: " + ex.getMessage());
} catch (InterruptedException ex) {
// What should we do here? Just set the interrupt flag and throw.
// Note the ,ethod "doSomething()" does not actually throw it.
Thread.currentThread().interrupt();
throw ex;
}
}
System.out.println("DONE!");
}
@Test
void java8_multipleThreadsWithStream() throws InterruptedException {
final List<DoSomething> elements = Collections.unmodifiableList(createElements());
// If doSomething() threw a checked exception (a subclass of Exception),
// we COULD NOT use it as argument to forEach().
// If doSomething() throws a RuntimeException, the stream pipeline would terminate
// arbitrarily with any of the exceptions thrown in any of the threads.
elements.stream().parallel().forEach(DoSomething::doSomething);
System.out.println("DONE!");
}
}
Popular Pragmatic Bookshelf topics

Working through the steps (checking that the Info,plist matches exactly), run the demo game and what appears is grey but does not fill th...
New

Title: Web Development with Clojure, Third Edition, pg 116
Hi - I just started chapter 5 and I am stuck on page 116 while trying to star...
New

Hello! On page xix of the preface, it says there is a community forum "… for help if your’re stuck on one of the exercises in this book… ...
New

Hi, I have just acquired Michael Fazio’s “Kotlin and Android Development” to learn about game programming for Android. I have a game in p...
New

On page 78 the following code appears:
<%= link_to ‘Destroy’, product,
class: ‘hover:underline’,
method: :delete,
data: { confirm...
New

Skimming ahead, much of the following is explained in Chapter 3, but new readers (like me!) will hit a roadblock in Chapter 2 with their ...
New

Hi all,
currently I wonder how the Tailwind colours work (or don’t work).
For example, in app/views/layouts/application.html.erb I have...
New

Hi, I’ve got a question about the implementation of PubSub when using a Phoenix.Socket.Transport behaviour rather than channels.
Before ...
New

@mfazio23
I’m following the indications of the book and arriver ad chapter 10, but the app cannot be compiled due to an error in the Bas...
New

I just bought this book to learn about Android development, and I’m already running into a major issue in Ch. 1, p. 20: “Update activity...
New
Other popular topics

Which, if any, games do you play? On what platform?
I just bought (and completed) Minecraft Dungeons for my Nintendo Switch. Other than ...
New

SpaceVim seems to be gaining in features and popularity and I just wondered how it compares with SpaceMacs in 2020 - anyone have any thou...
New
New

Rust is an exciting new programming language combining the power of C with memory safety, fearless concurrency, and productivity boosters...
New

I ended up cancelling my Moonlander order as I think it’s just going to be a bit too bulky for me.
I think the Planck and the Preonic (o...
New

Do the test and post your score :nerd_face:
:keyboard:
If possible, please add info such as the keyboard you’re using, the layout (Qw...
New

Hi folks,
I don’t know if I saw this here but, here’s a new programming language, called Roc
Reminds me a bit of Elm and thus Haskell. ...
New

We’ve talked about his book briefly here but it is quickly becoming obsolete - so he’s decided to create a series of 7 podcasts, the firs...
New

Was just curious to see if any were around, found this one:
I got 51/100:
Not sure if it was meant to buy I am sure at times the b...
New

Ok, well here are some thoughts and opinions on some of the ergonomic keyboards I have, I guess like mini review of each that I use enoug...
New
Categories:
Sub Categories:
Popular Portals
- /elixir
- /rust
- /ruby
- /wasm
- /erlang
- /phoenix
- /keyboards
- /rails
- /python
- /js
- /security
- /go
- /swift
- /vim
- /clojure
- /emacs
- /haskell
- /java
- /onivim
- /svelte
- /typescript
- /c-plus-plus
- /crystal
- /kotlin
- /tailwind
- /react
- /gleam
- /ocaml
- /elm
- /flutter
- /vscode
- /ash
- /html
- /opensuse
- /centos
- /php
- /deepseek
- /zig
- /scala
- /textmate
- /lisp
- /sublime-text
- /nixos
- /debian
- /react-native
- /agda
- /kubuntu
- /arch-linux
- /django
- /revery
- /ubuntu
- /manjaro
- /spring
- /nodejs
- /diversity
- /deno
- /lua
- /julia
- /slackware
- /c