dtonhofer
Functional Programming in Java, Second Edition: Chapter 9, p.164 addendum to "parallel stream"
In Chapter 9, p.164 the stream is parallelized in 1 step.
This inspired my to write some test some code to call a task “in parallel”:
- Not in parallel, inside a loop
- Using ThreadGroups (essentially ‘temporary worker pools’) to run “slices” of the list the tasks in parallel
- Using a Java 8
ForkJoinPool - Using a Java 8 parallel stream.
Not sure whether this is of interest, the most interesting part is the handling of checked and unchecked exceptions.
package chapter9;
import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class RunningInParallel {
// This could also implement Runnable instead.
public static class DoSomething {
private final int index;
public DoSomething(int index) {
this.index = index;
}
// This method should not throw any checked exceptions
public void doSomething() {
try {
// Math.random is synchronized, so we can use it here
final long sleep_ms = (long) (Math.random() * 1000.0);
System.out.println("Thread " + index + " starts on thread '" + Thread.currentThread().getName() + "', sleeping for " + sleep_ms + " ms");
Thread.sleep(sleep_ms);
} catch (InterruptedException ex) {
// Someone told us to stop sleeping, so we do!
// Set the "interrupted" bit again and get out.
Thread.currentThread().interrupt();
}
}
}
private List<DoSomething> createElements() {
return IntStream.rangeClosed(1, 20)
.mapToObj(DoSomething::new)
.collect(Collectors.toList());
}
private static ThreadGroup startThreads(final int sliceStart, final int sliceEndIncl, final List<DoSomething> elements) {
final int slizeSize = sliceEndIncl - sliceStart + 1;
ThreadGroup tGroup = new ThreadGroup("slice [" + sliceStart + "," + sliceEndIncl + "] of size " + slizeSize);
for (int threadIndex = 0; threadIndex < slizeSize; threadIndex++) {
final int elementIndex = sliceStart + threadIndex;
// no need to retain reference to the Thread, we will get it back from the ThreadGroup
new Thread(tGroup, new Runnable() {
@Override
public void run() {
// If doSomething() threw a checked exception, we would have to catch it here
// If doSomething() throws a RuntimeException, the Exception is left up the stack here,
// terminating the worker thread.
elements.get(elementIndex).doSomething();
}
}).start();
}
return tGroup;
}
private static void waitForThreadEnd(final ThreadGroup tGroup, final int slizeSize) throws InterruptedException {
final Thread[] threads = new Thread[slizeSize];
final int count = tGroup.enumerate(threads);
assert count <= slizeSize; // some threads may already have finished
for (int i = 0; i < count; i++) {
try {
// Dangerous, as infinite waiting may follow, there should be a timeout value!!
System.out.println("Joining thread " + (i + 1) + " of " + count);
threads[i].join();
} catch (InterruptedException ex) {
// What should we do here? Just set the interrupt flag and throw...
Thread.currentThread().interrupt();
throw ex;
}
}
}
@Test
void preJava8_singleThread() {
final List<DoSomething> elements = Collections.unmodifiableList(createElements());
for (DoSomething elem : elements) {
// If doSomething() threw a checked exception, we would have to catch it here.
// If doSomething() throws a RuntimeException, the Exception is left up the stack here.
elem.doSomething();
}
System.out.println("DONE!");
}
@Test
void preJava8_rollYourOwnMultipleThreads() throws InterruptedException {
final List<DoSomething> elements = Collections.unmodifiableList(createElements());
final int threadCount = 7;
// Iterate over "slices" of "threadCount" threads.
int sliceIndex = 0;
while (sliceIndex * threadCount < elements.size()) {
final int sliceStart = sliceIndex * threadCount;
final int sliceEndIncl = Math.min(sliceStart + threadCount - 1, elements.size() - 1);
final int slizeSize = sliceEndIncl - sliceStart + 1;
ThreadGroup tGroup = startThreads(sliceStart, sliceEndIncl, elements);
waitForThreadEnd(tGroup, slizeSize);
System.out.println("Done with ThreadGroup '" + tGroup.getName() + "' running " + slizeSize + " threads");
sliceIndex++;
}
System.out.println("DONE!");
}
@Test
void java8_multipleThreadsWithForkJoinPool() throws InterruptedException {
final List<DoSomething> elements = Collections.unmodifiableList(createElements());
final List<ForkJoinTask<?>> tasks = new LinkedList<>();
for (DoSomething elem : elements) {
// If doSomething() threw a checked exception, we COULD NOT use it as argument to submit()
// We would need to wrap doSomething().
// Note that we use the "common pool" provided by the runtime environment.
// We could also create our own pool instead, but why bother?
// Note that some of the tasks will actaully be run by the main thread instead
// of by a thread from the pool.
tasks.add(ForkJoinPool.commonPool().submit(elem::doSomething));
}
for (ForkJoinTask<?> task : tasks) {
try {
task.get();
} catch (ExecutionException ex) {
// If doSomething() throws a RuntimeException it will be rethrown as an ExecutionException.
// The thrown RuntimeException will appear as the "cause".
System.err.println("Task failed to finish properly, got ExecutionException: '" + ex.getMessage()
+ "' caused by: '" + ex.getCause() + "'");
} catch (CancellationException ex) {
System.err.println("Task was cancelled, hot CancellationException: " + ex.getMessage());
} catch (InterruptedException ex) {
// What should we do here? Just set the interrupt flag and throw.
// Note the ,ethod "doSomething()" does not actually throw it.
Thread.currentThread().interrupt();
throw ex;
}
}
System.out.println("DONE!");
}
@Test
void java8_multipleThreadsWithStream() throws InterruptedException {
final List<DoSomething> elements = Collections.unmodifiableList(createElements());
// If doSomething() threw a checked exception (a subclass of Exception),
// we COULD NOT use it as argument to forEach().
// If doSomething() throws a RuntimeException, the stream pipeline would terminate
// arbitrarily with any of the exceptions thrown in any of the threads.
elements.stream().parallel().forEach(DoSomething::doSomething);
System.out.println("DONE!");
}
}
Popular Pragmatic Bookshelf topics
Hi! I know not the intentions behind this narrative when called, on page XI:
mount() |> handle_event() |> render()
but the correc...
New
Hello! Thanks for the great book.
I was attempting the Trie (chap 17) exercises and for number 4 the solution provided for the autocorre...
New
I ran this command after installing the sample application:
$ cards add do something --owner Brian
And got a file not found error:
Fil...
New
I’m new to Rust and am using this book to learn more as well as to feed my interest in game dev. I’ve just finished the flappy dragon exa...
New
This is as much a suggestion as a question, as a note for others.
Locally the SGP30 wasn’t available, so I ordered a SGP40. On page 53, ...
New
Hi, I have just acquired Michael Fazio’s “Kotlin and Android Development” to learn about game programming for Android. I have a game in p...
New
Hi all,
currently I wonder how the Tailwind colours work (or don’t work).
For example, in app/views/layouts/application.html.erb I have...
New
@parrt
In the context of Chapter 4.3, the grammar Java.g4, meant to parse Java 6 compilation units, no longer passes ANTLR (currently 4....
New
Docker-Machine became part of the Docker Toolbox, which was deprecated in 2020, long after Docker Desktop supported Docker Engine nativel...
New
From page 13:
On Python 3.7, you can install the libraries with pip by running these commands inside a Python venv using Visual Studio ...
New
Other popular topics
If it’s a mechanical keyboard, which switches do you have?
Would you recommend it? Why?
What will your next keyboard be?
Pics always w...
New
Which, if any, games do you play? On what platform?
I just bought (and completed) Minecraft Dungeons for my Nintendo Switch. Other than ...
New
There’s a whole world of custom keycaps out there that I didn’t know existed!
Check out all of our Keycaps threads here:
https://forum....
New
I ended up cancelling my Moonlander order as I think it’s just going to be a bit too bulky for me.
I think the Planck and the Preonic (o...
New
Do the test and post your score :nerd_face:
:keyboard:
If possible, please add info such as the keyboard you’re using, the layout (Qw...
New
The V Programming Language
Simple language for building maintainable programs
V is already mentioned couple of times in the forum, but I...
New
I am trying to crate a game for the Nintendo switch, I wanted to use Java as I am comfortable with that programming language. Can you use...
New
New
I’m able to do the “artistic” part of game-development; character designing/modeling, music, environment modeling, etc.
However, I don’t...
New
If you’re getting errors like this:
psql: error: connection to server on socket “/tmp/.s.PGSQL.5432” failed: No such file or directory ...
New
Categories:
Sub Categories:
Popular Portals
- /elixir
- /rust
- /wasm
- /ruby
- /erlang
- /phoenix
- /keyboards
- /python
- /js
- /rails
- /security
- /go
- /swift
- /vim
- /clojure
- /java
- /emacs
- /haskell
- /typescript
- /svelte
- /onivim
- /kotlin
- /c-plus-plus
- /crystal
- /tailwind
- /react
- /gleam
- /ocaml
- /vscode
- /elm
- /flutter
- /ash
- /html
- /deepseek
- /opensuse
- /zig
- /centos
- /php
- /scala
- /react-native
- /lisp
- /textmate
- /sublime-text
- /nixos
- /debian
- /agda
- /deno
- /django
- /kubuntu
- /arch-linux
- /nodejs
- /ubuntu
- /spring
- /revery
- /manjaro
- /lua
- /julia
- /diversity
- /markdown
- /quarkus









