dtonhofer
Functional Programming in Java, Second Edition: Chapter 9, p.164 addendum to "parallel stream"
In Chapter 9, p.164 the stream is parallelized in 1 step.
This inspired my to write some test some code to call a task “in parallel”:
- Not in parallel, inside a loop
- Using ThreadGroups (essentially ‘temporary worker pools’) to run “slices” of the list the tasks in parallel
- Using a Java 8
ForkJoinPool - Using a Java 8 parallel stream.
Not sure whether this is of interest, the most interesting part is the handling of checked and unchecked exceptions.
package chapter9;
import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class RunningInParallel {
// This could also implement Runnable instead.
public static class DoSomething {
private final int index;
public DoSomething(int index) {
this.index = index;
}
// This method should not throw any checked exceptions
public void doSomething() {
try {
// Math.random is synchronized, so we can use it here
final long sleep_ms = (long) (Math.random() * 1000.0);
System.out.println("Thread " + index + " starts on thread '" + Thread.currentThread().getName() + "', sleeping for " + sleep_ms + " ms");
Thread.sleep(sleep_ms);
} catch (InterruptedException ex) {
// Someone told us to stop sleeping, so we do!
// Set the "interrupted" bit again and get out.
Thread.currentThread().interrupt();
}
}
}
private List<DoSomething> createElements() {
return IntStream.rangeClosed(1, 20)
.mapToObj(DoSomething::new)
.collect(Collectors.toList());
}
private static ThreadGroup startThreads(final int sliceStart, final int sliceEndIncl, final List<DoSomething> elements) {
final int slizeSize = sliceEndIncl - sliceStart + 1;
ThreadGroup tGroup = new ThreadGroup("slice [" + sliceStart + "," + sliceEndIncl + "] of size " + slizeSize);
for (int threadIndex = 0; threadIndex < slizeSize; threadIndex++) {
final int elementIndex = sliceStart + threadIndex;
// no need to retain reference to the Thread, we will get it back from the ThreadGroup
new Thread(tGroup, new Runnable() {
@Override
public void run() {
// If doSomething() threw a checked exception, we would have to catch it here
// If doSomething() throws a RuntimeException, the Exception is left up the stack here,
// terminating the worker thread.
elements.get(elementIndex).doSomething();
}
}).start();
}
return tGroup;
}
private static void waitForThreadEnd(final ThreadGroup tGroup, final int slizeSize) throws InterruptedException {
final Thread[] threads = new Thread[slizeSize];
final int count = tGroup.enumerate(threads);
assert count <= slizeSize; // some threads may already have finished
for (int i = 0; i < count; i++) {
try {
// Dangerous, as infinite waiting may follow, there should be a timeout value!!
System.out.println("Joining thread " + (i + 1) + " of " + count);
threads[i].join();
} catch (InterruptedException ex) {
// What should we do here? Just set the interrupt flag and throw...
Thread.currentThread().interrupt();
throw ex;
}
}
}
@Test
void preJava8_singleThread() {
final List<DoSomething> elements = Collections.unmodifiableList(createElements());
for (DoSomething elem : elements) {
// If doSomething() threw a checked exception, we would have to catch it here.
// If doSomething() throws a RuntimeException, the Exception is left up the stack here.
elem.doSomething();
}
System.out.println("DONE!");
}
@Test
void preJava8_rollYourOwnMultipleThreads() throws InterruptedException {
final List<DoSomething> elements = Collections.unmodifiableList(createElements());
final int threadCount = 7;
// Iterate over "slices" of "threadCount" threads.
int sliceIndex = 0;
while (sliceIndex * threadCount < elements.size()) {
final int sliceStart = sliceIndex * threadCount;
final int sliceEndIncl = Math.min(sliceStart + threadCount - 1, elements.size() - 1);
final int slizeSize = sliceEndIncl - sliceStart + 1;
ThreadGroup tGroup = startThreads(sliceStart, sliceEndIncl, elements);
waitForThreadEnd(tGroup, slizeSize);
System.out.println("Done with ThreadGroup '" + tGroup.getName() + "' running " + slizeSize + " threads");
sliceIndex++;
}
System.out.println("DONE!");
}
@Test
void java8_multipleThreadsWithForkJoinPool() throws InterruptedException {
final List<DoSomething> elements = Collections.unmodifiableList(createElements());
final List<ForkJoinTask<?>> tasks = new LinkedList<>();
for (DoSomething elem : elements) {
// If doSomething() threw a checked exception, we COULD NOT use it as argument to submit()
// We would need to wrap doSomething().
// Note that we use the "common pool" provided by the runtime environment.
// We could also create our own pool instead, but why bother?
// Note that some of the tasks will actaully be run by the main thread instead
// of by a thread from the pool.
tasks.add(ForkJoinPool.commonPool().submit(elem::doSomething));
}
for (ForkJoinTask<?> task : tasks) {
try {
task.get();
} catch (ExecutionException ex) {
// If doSomething() throws a RuntimeException it will be rethrown as an ExecutionException.
// The thrown RuntimeException will appear as the "cause".
System.err.println("Task failed to finish properly, got ExecutionException: '" + ex.getMessage()
+ "' caused by: '" + ex.getCause() + "'");
} catch (CancellationException ex) {
System.err.println("Task was cancelled, hot CancellationException: " + ex.getMessage());
} catch (InterruptedException ex) {
// What should we do here? Just set the interrupt flag and throw.
// Note the ,ethod "doSomething()" does not actually throw it.
Thread.currentThread().interrupt();
throw ex;
}
}
System.out.println("DONE!");
}
@Test
void java8_multipleThreadsWithStream() throws InterruptedException {
final List<DoSomething> elements = Collections.unmodifiableList(createElements());
// If doSomething() threw a checked exception (a subclass of Exception),
// we COULD NOT use it as argument to forEach().
// If doSomething() throws a RuntimeException, the stream pipeline would terminate
// arbitrarily with any of the exceptions thrown in any of the threads.
elements.stream().parallel().forEach(DoSomething::doSomething);
System.out.println("DONE!");
}
}
Popular Pragmatic Bookshelf topics
In Chapter 3, the source for index introduces Config on page 31, followed by more code including tests; Config isn’t introduced until pag...
New
I thought that there might be interest in using the book with Rails 6.1 and Ruby 2.7.2. I’ll note what I needed to do differently here.
...
New
Hi Travis! Thank you for the cool book! :slight_smile:
I made a list of issues and thought I could post them chapter by chapter. I’m rev...
New
Page 28: It implements io.ReaderAt on the store type.
Sorry if it’s a dumb question but was the io.ReaderAt supposed to be io.ReadAt?
...
New
Title: Intuitive Python: docker run… denied error (page 2)
Attempted to run the docker command in both CLI and Powershell
PS C:\Users\r...
New
The book has the same “Problem space/Solution space” diagram on page 18 as is on page 17. The correct Problem/Solution space diagrams ar...
New
Running mix deps.get in the sensor_hub directory fails with the following error:
** (Mix) No SSH public keys found in ~/.ssh. An ssh aut...
New
Hey there,
I’m enjoying this book and have learned a few things alredayd. However, in Chapter 4 I believe we are meant to see the “>...
New
On page 78 the following code appears:
<%= link_to ‘Destroy’, product,
class: ‘hover:underline’,
method: :delete,
data: { confirm...
New
root_layout: {PentoWeb.LayoutView, :root},
This results in the following following error:
no “root” html template defined for PentoWeb...
New
Other popular topics
Hello Devtalk World!
Please let us know a little about who you are and where you’re from :nerd_face:
New
Take your Go skills to the next level by learning how to design, develop, and deploy a distributed service. Start from the bare essential...
New
I am thinking in building or buy a desktop computer for programing, both professionally and on my free time, and my choice of OS is Linux...
New
Please tell us what is your preferred monitor setup for programming(not gaming) and why you have chosen it.
Does your monitor have eye p...
New
Was just curious to see if any were around, found this one:
I got 51/100:
Not sure if it was meant to buy I am sure at times the b...
New
I am trying to crate a game for the Nintendo switch, I wanted to use Java as I am comfortable with that programming language. Can you use...
New
There appears to have been an update that has changed the terminology for what has previously been known as the Taskbar Overflow - this h...
New
I’m able to do the “artistic” part of game-development; character designing/modeling, music, environment modeling, etc.
However, I don’t...
New
Background
Lately I am in a quest to find a good quality TTS ai generation tool to run locally in order to create audio for some videos I...
New
Ok, well here are some thoughts and opinions on some of the ergonomic keyboards I have, I guess like mini review of each that I use enoug...
New
Categories:
Sub Categories:
Popular Portals
- /elixir
- /rust
- /wasm
- /ruby
- /erlang
- /phoenix
- /keyboards
- /python
- /js
- /rails
- /security
- /go
- /swift
- /vim
- /clojure
- /java
- /emacs
- /haskell
- /typescript
- /svelte
- /onivim
- /kotlin
- /c-plus-plus
- /crystal
- /tailwind
- /react
- /gleam
- /ocaml
- /elm
- /vscode
- /flutter
- /ash
- /html
- /deepseek
- /opensuse
- /zig
- /centos
- /php
- /scala
- /react-native
- /lisp
- /sublime-text
- /textmate
- /nixos
- /debian
- /agda
- /deno
- /django
- /kubuntu
- /arch-linux
- /nodejs
- /ubuntu
- /spring
- /revery
- /manjaro
- /diversity
- /julia
- /lua
- /markdown
- /laravel









