dtonhofer
Functional Programming in Java, Second Edition: Chapter 9, p.164 addendum to "parallel stream"
In Chapter 9, p.164 the stream is parallelized in 1 step.
This inspired my to write some test some code to call a task “in parallel”:
- Not in parallel, inside a loop
- Using ThreadGroups (essentially ‘temporary worker pools’) to run “slices” of the list the tasks in parallel
- Using a Java 8
ForkJoinPool - Using a Java 8 parallel stream.
Not sure whether this is of interest, the most interesting part is the handling of checked and unchecked exceptions.
package chapter9;
import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class RunningInParallel {
// This could also implement Runnable instead.
public static class DoSomething {
private final int index;
public DoSomething(int index) {
this.index = index;
}
// This method should not throw any checked exceptions
public void doSomething() {
try {
// Math.random is synchronized, so we can use it here
final long sleep_ms = (long) (Math.random() * 1000.0);
System.out.println("Thread " + index + " starts on thread '" + Thread.currentThread().getName() + "', sleeping for " + sleep_ms + " ms");
Thread.sleep(sleep_ms);
} catch (InterruptedException ex) {
// Someone told us to stop sleeping, so we do!
// Set the "interrupted" bit again and get out.
Thread.currentThread().interrupt();
}
}
}
private List<DoSomething> createElements() {
return IntStream.rangeClosed(1, 20)
.mapToObj(DoSomething::new)
.collect(Collectors.toList());
}
private static ThreadGroup startThreads(final int sliceStart, final int sliceEndIncl, final List<DoSomething> elements) {
final int slizeSize = sliceEndIncl - sliceStart + 1;
ThreadGroup tGroup = new ThreadGroup("slice [" + sliceStart + "," + sliceEndIncl + "] of size " + slizeSize);
for (int threadIndex = 0; threadIndex < slizeSize; threadIndex++) {
final int elementIndex = sliceStart + threadIndex;
// no need to retain reference to the Thread, we will get it back from the ThreadGroup
new Thread(tGroup, new Runnable() {
@Override
public void run() {
// If doSomething() threw a checked exception, we would have to catch it here
// If doSomething() throws a RuntimeException, the Exception is left up the stack here,
// terminating the worker thread.
elements.get(elementIndex).doSomething();
}
}).start();
}
return tGroup;
}
private static void waitForThreadEnd(final ThreadGroup tGroup, final int slizeSize) throws InterruptedException {
final Thread[] threads = new Thread[slizeSize];
final int count = tGroup.enumerate(threads);
assert count <= slizeSize; // some threads may already have finished
for (int i = 0; i < count; i++) {
try {
// Dangerous, as infinite waiting may follow, there should be a timeout value!!
System.out.println("Joining thread " + (i + 1) + " of " + count);
threads[i].join();
} catch (InterruptedException ex) {
// What should we do here? Just set the interrupt flag and throw...
Thread.currentThread().interrupt();
throw ex;
}
}
}
@Test
void preJava8_singleThread() {
final List<DoSomething> elements = Collections.unmodifiableList(createElements());
for (DoSomething elem : elements) {
// If doSomething() threw a checked exception, we would have to catch it here.
// If doSomething() throws a RuntimeException, the Exception is left up the stack here.
elem.doSomething();
}
System.out.println("DONE!");
}
@Test
void preJava8_rollYourOwnMultipleThreads() throws InterruptedException {
final List<DoSomething> elements = Collections.unmodifiableList(createElements());
final int threadCount = 7;
// Iterate over "slices" of "threadCount" threads.
int sliceIndex = 0;
while (sliceIndex * threadCount < elements.size()) {
final int sliceStart = sliceIndex * threadCount;
final int sliceEndIncl = Math.min(sliceStart + threadCount - 1, elements.size() - 1);
final int slizeSize = sliceEndIncl - sliceStart + 1;
ThreadGroup tGroup = startThreads(sliceStart, sliceEndIncl, elements);
waitForThreadEnd(tGroup, slizeSize);
System.out.println("Done with ThreadGroup '" + tGroup.getName() + "' running " + slizeSize + " threads");
sliceIndex++;
}
System.out.println("DONE!");
}
@Test
void java8_multipleThreadsWithForkJoinPool() throws InterruptedException {
final List<DoSomething> elements = Collections.unmodifiableList(createElements());
final List<ForkJoinTask<?>> tasks = new LinkedList<>();
for (DoSomething elem : elements) {
// If doSomething() threw a checked exception, we COULD NOT use it as argument to submit()
// We would need to wrap doSomething().
// Note that we use the "common pool" provided by the runtime environment.
// We could also create our own pool instead, but why bother?
// Note that some of the tasks will actaully be run by the main thread instead
// of by a thread from the pool.
tasks.add(ForkJoinPool.commonPool().submit(elem::doSomething));
}
for (ForkJoinTask<?> task : tasks) {
try {
task.get();
} catch (ExecutionException ex) {
// If doSomething() throws a RuntimeException it will be rethrown as an ExecutionException.
// The thrown RuntimeException will appear as the "cause".
System.err.println("Task failed to finish properly, got ExecutionException: '" + ex.getMessage()
+ "' caused by: '" + ex.getCause() + "'");
} catch (CancellationException ex) {
System.err.println("Task was cancelled, hot CancellationException: " + ex.getMessage());
} catch (InterruptedException ex) {
// What should we do here? Just set the interrupt flag and throw.
// Note the ,ethod "doSomething()" does not actually throw it.
Thread.currentThread().interrupt();
throw ex;
}
}
System.out.println("DONE!");
}
@Test
void java8_multipleThreadsWithStream() throws InterruptedException {
final List<DoSomething> elements = Collections.unmodifiableList(createElements());
// If doSomething() threw a checked exception (a subclass of Exception),
// we COULD NOT use it as argument to forEach().
// If doSomething() throws a RuntimeException, the stream pipeline would terminate
// arbitrarily with any of the exceptions thrown in any of the threads.
elements.stream().parallel().forEach(DoSomething::doSomething);
System.out.println("DONE!");
}
}
Popular Pragmatic Bookshelf topics
page 20: … protoc command…
I had to additionally run the following go get commands in order to be able to compile protobuf code using go...
New
Python Testing With Pytest - Chapter 2, warnings for “unregistered custom marks”
While running the smoke tests in Chapter 2, I get these...
New
your book suggests to use Image.toByteData() to convert image to bytes, however I get the following error: "the getter ‘toByteData’ isn’t...
New
Hi Jamis,
I think there’s an issue with a test on chapter 6. I own the ebook, version P1.0 Feb. 2019.
This test doesn’t pass for me:
...
New
I’m running Android Studio “Arctic Fox” 2020.3.1 Patch 2, and I’m embarrassed to admit that I only made it to page 8 before running into ...
New
The test is as follows:
Scenario: Intersecting a scaled sphere with a ray
Given r ← ray(point(0, 0, -5), vector(0, 0, 1))
And s ← sphere...
New
Hi,
I completed chapter 6 but am getting the following error when running:
thread 'main' panicked at 'Failed to load texture: IoError(O...
New
Modern front-end development for Rails, second edition - Struggling to get the first chapter to work
After running /bin/setup, the first error was: The foreman' command exists in these Ruby versions: That was easy to fix: gem install fore...
New
I am using Android Studio Chipmunk | 2021.2.1 Patch 2
Build #AI-212.5712.43.2112.8815526, built on July 10, 2022
Runtime version: 11.0....
New
Getting an error when installing the dependencies at the start of this chapter:
could not compile dependency :exla, "mix compile" failed...
New
Other popular topics
Hello Devtalk World!
Please let us know a little about who you are and where you’re from :nerd_face:
New
Andy and Dave wrote this influential, classic book to help their clients create better software and rediscover the joy of coding. Almost ...
New
Learn from the award-winning programming series that inspired the Elixir language, and go on a step-by-step journey through the most impo...
New
Rust is an exciting new programming language combining the power of C with memory safety, fearless concurrency, and productivity boosters...
New
Do the test and post your score :nerd_face:
:keyboard:
If possible, please add info such as the keyboard you’re using, the layout (Qw...
New
I am asking for any distro that only has the bare-bones to be able to get a shell in the server and then just install the packages as we ...
New
Build efficient applications that exploit the unique benefits of a pure functional language, learning from an engineer who uses Haskell t...
New
Author Spotlight:
Peter Ullrich
@PJUllrich
Data is at the core of every business, but it is useless if nobody can access and analyze ...
New
Will Swifties’ war on AI fakes spark a deepfake porn reckoning?
New
As digital systems increasingly run the world, mastery of the recurring patterns of software development risk is the key to fast and effe...
New
Categories:
Sub Categories:
Popular Portals
- /elixir
- /rust
- /wasm
- /ruby
- /erlang
- /phoenix
- /keyboards
- /python
- /js
- /rails
- /security
- /go
- /swift
- /vim
- /clojure
- /java
- /emacs
- /haskell
- /typescript
- /svelte
- /onivim
- /kotlin
- /c-plus-plus
- /crystal
- /tailwind
- /react
- /gleam
- /ocaml
- /vscode
- /flutter
- /elm
- /ash
- /html
- /deepseek
- /opensuse
- /zig
- /centos
- /php
- /scala
- /react-native
- /lisp
- /sublime-text
- /textmate
- /nixos
- /debian
- /agda
- /deno
- /django
- /kubuntu
- /arch-linux
- /nodejs
- /ubuntu
- /spring
- /revery
- /manjaro
- /julia
- /lua
- /diversity
- /quarkus
- /markdown









