Functional Programming in Java, Second Edition: Chapter 9, p.164 addendum to "parallel stream"

In Chapter 9, p.164 the stream is parallelized in 1 step.
This inspired my to write some test some code to call a task “in parallel”:
- Not in parallel, inside a loop
- Using ThreadGroups (essentially ‘temporary worker pools’) to run “slices” of the list the tasks in parallel
- Using a Java 8
ForkJoinPool
- Using a Java 8 parallel stream.
Not sure whether this is of interest, the most interesting part is the handling of checked and unchecked exceptions.
package chapter9;
import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class RunningInParallel {
// This could also implement Runnable instead.
public static class DoSomething {
private final int index;
public DoSomething(int index) {
this.index = index;
}
// This method should not throw any checked exceptions
public void doSomething() {
try {
// Math.random is synchronized, so we can use it here
final long sleep_ms = (long) (Math.random() * 1000.0);
System.out.println("Thread " + index + " starts on thread '" + Thread.currentThread().getName() + "', sleeping for " + sleep_ms + " ms");
Thread.sleep(sleep_ms);
} catch (InterruptedException ex) {
// Someone told us to stop sleeping, so we do!
// Set the "interrupted" bit again and get out.
Thread.currentThread().interrupt();
}
}
}
private List<DoSomething> createElements() {
return IntStream.rangeClosed(1, 20)
.mapToObj(DoSomething::new)
.collect(Collectors.toList());
}
private static ThreadGroup startThreads(final int sliceStart, final int sliceEndIncl, final List<DoSomething> elements) {
final int slizeSize = sliceEndIncl - sliceStart + 1;
ThreadGroup tGroup = new ThreadGroup("slice [" + sliceStart + "," + sliceEndIncl + "] of size " + slizeSize);
for (int threadIndex = 0; threadIndex < slizeSize; threadIndex++) {
final int elementIndex = sliceStart + threadIndex;
// no need to retain reference to the Thread, we will get it back from the ThreadGroup
new Thread(tGroup, new Runnable() {
@Override
public void run() {
// If doSomething() threw a checked exception, we would have to catch it here
// If doSomething() throws a RuntimeException, the Exception is left up the stack here,
// terminating the worker thread.
elements.get(elementIndex).doSomething();
}
}).start();
}
return tGroup;
}
private static void waitForThreadEnd(final ThreadGroup tGroup, final int slizeSize) throws InterruptedException {
final Thread[] threads = new Thread[slizeSize];
final int count = tGroup.enumerate(threads);
assert count <= slizeSize; // some threads may already have finished
for (int i = 0; i < count; i++) {
try {
// Dangerous, as infinite waiting may follow, there should be a timeout value!!
System.out.println("Joining thread " + (i + 1) + " of " + count);
threads[i].join();
} catch (InterruptedException ex) {
// What should we do here? Just set the interrupt flag and throw...
Thread.currentThread().interrupt();
throw ex;
}
}
}
@Test
void preJava8_singleThread() {
final List<DoSomething> elements = Collections.unmodifiableList(createElements());
for (DoSomething elem : elements) {
// If doSomething() threw a checked exception, we would have to catch it here.
// If doSomething() throws a RuntimeException, the Exception is left up the stack here.
elem.doSomething();
}
System.out.println("DONE!");
}
@Test
void preJava8_rollYourOwnMultipleThreads() throws InterruptedException {
final List<DoSomething> elements = Collections.unmodifiableList(createElements());
final int threadCount = 7;
// Iterate over "slices" of "threadCount" threads.
int sliceIndex = 0;
while (sliceIndex * threadCount < elements.size()) {
final int sliceStart = sliceIndex * threadCount;
final int sliceEndIncl = Math.min(sliceStart + threadCount - 1, elements.size() - 1);
final int slizeSize = sliceEndIncl - sliceStart + 1;
ThreadGroup tGroup = startThreads(sliceStart, sliceEndIncl, elements);
waitForThreadEnd(tGroup, slizeSize);
System.out.println("Done with ThreadGroup '" + tGroup.getName() + "' running " + slizeSize + " threads");
sliceIndex++;
}
System.out.println("DONE!");
}
@Test
void java8_multipleThreadsWithForkJoinPool() throws InterruptedException {
final List<DoSomething> elements = Collections.unmodifiableList(createElements());
final List<ForkJoinTask<?>> tasks = new LinkedList<>();
for (DoSomething elem : elements) {
// If doSomething() threw a checked exception, we COULD NOT use it as argument to submit()
// We would need to wrap doSomething().
// Note that we use the "common pool" provided by the runtime environment.
// We could also create our own pool instead, but why bother?
// Note that some of the tasks will actaully be run by the main thread instead
// of by a thread from the pool.
tasks.add(ForkJoinPool.commonPool().submit(elem::doSomething));
}
for (ForkJoinTask<?> task : tasks) {
try {
task.get();
} catch (ExecutionException ex) {
// If doSomething() throws a RuntimeException it will be rethrown as an ExecutionException.
// The thrown RuntimeException will appear as the "cause".
System.err.println("Task failed to finish properly, got ExecutionException: '" + ex.getMessage()
+ "' caused by: '" + ex.getCause() + "'");
} catch (CancellationException ex) {
System.err.println("Task was cancelled, hot CancellationException: " + ex.getMessage());
} catch (InterruptedException ex) {
// What should we do here? Just set the interrupt flag and throw.
// Note the ,ethod "doSomething()" does not actually throw it.
Thread.currentThread().interrupt();
throw ex;
}
}
System.out.println("DONE!");
}
@Test
void java8_multipleThreadsWithStream() throws InterruptedException {
final List<DoSomething> elements = Collections.unmodifiableList(createElements());
// If doSomething() threw a checked exception (a subclass of Exception),
// we COULD NOT use it as argument to forEach().
// If doSomething() throws a RuntimeException, the stream pipeline would terminate
// arbitrarily with any of the exceptions thrown in any of the threads.
elements.stream().parallel().forEach(DoSomething::doSomething);
System.out.println("DONE!");
}
}
Popular Pragprog topics

In Chapter 3, the source for index introduces Config on page 31, followed by more code including tests; Config isn’t introduced until pag...
New

As per the title, thanks.
New

page 37
ANTLRInputStream input = new ANTLRInputStream(is);
as of ANTLR 4 .8 should be:
CharStream stream = CharStreams.fromStream(i...
New

Python Testing With Pytest - Chapter 2, warnings for “unregistered custom marks”
While running the smoke tests in Chapter 2, I get these...
New

Hi everyone!
There is an error on the page 71 in the book “Programming machine learning from coding to depp learning” P. Perrotta. You c...
New

Hello! On page xix of the preface, it says there is a community forum "… for help if your’re stuck on one of the exercises in this b...
New

It seems the second code snippet is missing the code to set the current_user:
current_user: Accounts.get_user_by_session_token(session[&...
New

Book: Programming Phoenix LiveView, page 142 (157/378), file lib/pento_web/live/product_live/form_component.ex, in the function below:
d...
New

Docker-Machine became part of the Docker Toolbox, which was deprecated in 2020, long after Docker Desktop supported Docker Engine nativel...
New

Getting an error when installing the dependencies at the start of this chapter:
could not compile dependency :exla, "mix compile&qu...
New
Other popular topics

What chair do you have while working… and why?
Is there a ‘best’ type of chair or working position for developers?
New

I’ve been really enjoying obsidian.md:
It is very snappy (even though it is based on Electron). I love that it is all local by defaul...
New

I’m thinking of buying a monitor that I can rotate to use as a vertical monitor?
Also, I want to know if someone is using it for program...
New

I know that -t flag is used along with -i flag for getting an interactive shell. But I cannot digest what the man page for docker run com...
New

There’s a whole world of custom keycaps out there that I didn’t know existed!
Check out all of our Keycaps threads here:
https://forum....
New

Here’s our thread for the Keyboardio Atreus. It is a mechanical keyboard based on and a slight update of the original Atreus (Keyboardio ...
New

Biggest jackpot ever apparently! :upside_down_face:
I don’t (usually) gamble/play the lottery, but working on a program to predict the...
New

The File System Access API with Origin Private File System.
WebKit supports new API that makes it possible for web apps to create, open,...
New

Author Spotlight: David Bryant Copeland (@davetron5000)
We’re so happy to bring you another Author Spotlight, a series where we sit d...
New

A Ruby-Centric Chat with Noel Rappin @noelrappin
Once you start noodling around with Ruby you quickly figure out, as Noel Rappi...
New
Latest in Pragprog
Latest (all)
Categories:
My Saved Portals
-
None saved yet
Popular Portals
- /elixir
- /opensuse
- /rust
- /kotlin
- /ruby
- /erlang
- /python
- /clojure
- /react
- /quarkus
- /go
- /vapor
- /v
- /react-native
- /wasm
- /security
- /django
- /nodejs
- /centos
- /haskell
- /rails
- /fable
- /gleam
- /swift
- /js
- /deno
- /assemblyscript
- /tailwind
- /laravel
- /symfony
- /phoenix
- /crystal
- /typescript
- /debian
- /adonisjs
- /julia
- /arch-linux
- /svelte
- /spring
- /c-plus-plus
- /flutter
- /preact
- /actix
- /java
- /angular
- /ocaml
- /zig
- /kubuntu
- /scala
- /zotonic
- /vim
- /rocky
- /lisp
- /html
- /keyboards
- /vuejs
- /nim
- /emacs
- /nerves
- /elm