
dtonhofer
Functional Programming in Java, Second Edition: Chapter 9, p.164 addendum to "parallel stream"
In Chapter 9, p.164 the stream is parallelized in 1 step.
This inspired my to write some test some code to call a task “in parallel”:
- Not in parallel, inside a loop
- Using ThreadGroups (essentially ‘temporary worker pools’) to run “slices” of the list the tasks in parallel
- Using a Java 8
ForkJoinPool
- Using a Java 8 parallel stream.
Not sure whether this is of interest, the most interesting part is the handling of checked and unchecked exceptions.
package chapter9;
import org.junit.jupiter.api.Test;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class RunningInParallel {
// This could also implement Runnable instead.
public static class DoSomething {
private final int index;
public DoSomething(int index) {
this.index = index;
}
// This method should not throw any checked exceptions
public void doSomething() {
try {
// Math.random is synchronized, so we can use it here
final long sleep_ms = (long) (Math.random() * 1000.0);
System.out.println("Thread " + index + " starts on thread '" + Thread.currentThread().getName() + "', sleeping for " + sleep_ms + " ms");
Thread.sleep(sleep_ms);
} catch (InterruptedException ex) {
// Someone told us to stop sleeping, so we do!
// Set the "interrupted" bit again and get out.
Thread.currentThread().interrupt();
}
}
}
private List<DoSomething> createElements() {
return IntStream.rangeClosed(1, 20)
.mapToObj(DoSomething::new)
.collect(Collectors.toList());
}
private static ThreadGroup startThreads(final int sliceStart, final int sliceEndIncl, final List<DoSomething> elements) {
final int slizeSize = sliceEndIncl - sliceStart + 1;
ThreadGroup tGroup = new ThreadGroup("slice [" + sliceStart + "," + sliceEndIncl + "] of size " + slizeSize);
for (int threadIndex = 0; threadIndex < slizeSize; threadIndex++) {
final int elementIndex = sliceStart + threadIndex;
// no need to retain reference to the Thread, we will get it back from the ThreadGroup
new Thread(tGroup, new Runnable() {
@Override
public void run() {
// If doSomething() threw a checked exception, we would have to catch it here
// If doSomething() throws a RuntimeException, the Exception is left up the stack here,
// terminating the worker thread.
elements.get(elementIndex).doSomething();
}
}).start();
}
return tGroup;
}
private static void waitForThreadEnd(final ThreadGroup tGroup, final int slizeSize) throws InterruptedException {
final Thread[] threads = new Thread[slizeSize];
final int count = tGroup.enumerate(threads);
assert count <= slizeSize; // some threads may already have finished
for (int i = 0; i < count; i++) {
try {
// Dangerous, as infinite waiting may follow, there should be a timeout value!!
System.out.println("Joining thread " + (i + 1) + " of " + count);
threads[i].join();
} catch (InterruptedException ex) {
// What should we do here? Just set the interrupt flag and throw...
Thread.currentThread().interrupt();
throw ex;
}
}
}
@Test
void preJava8_singleThread() {
final List<DoSomething> elements = Collections.unmodifiableList(createElements());
for (DoSomething elem : elements) {
// If doSomething() threw a checked exception, we would have to catch it here.
// If doSomething() throws a RuntimeException, the Exception is left up the stack here.
elem.doSomething();
}
System.out.println("DONE!");
}
@Test
void preJava8_rollYourOwnMultipleThreads() throws InterruptedException {
final List<DoSomething> elements = Collections.unmodifiableList(createElements());
final int threadCount = 7;
// Iterate over "slices" of "threadCount" threads.
int sliceIndex = 0;
while (sliceIndex * threadCount < elements.size()) {
final int sliceStart = sliceIndex * threadCount;
final int sliceEndIncl = Math.min(sliceStart + threadCount - 1, elements.size() - 1);
final int slizeSize = sliceEndIncl - sliceStart + 1;
ThreadGroup tGroup = startThreads(sliceStart, sliceEndIncl, elements);
waitForThreadEnd(tGroup, slizeSize);
System.out.println("Done with ThreadGroup '" + tGroup.getName() + "' running " + slizeSize + " threads");
sliceIndex++;
}
System.out.println("DONE!");
}
@Test
void java8_multipleThreadsWithForkJoinPool() throws InterruptedException {
final List<DoSomething> elements = Collections.unmodifiableList(createElements());
final List<ForkJoinTask<?>> tasks = new LinkedList<>();
for (DoSomething elem : elements) {
// If doSomething() threw a checked exception, we COULD NOT use it as argument to submit()
// We would need to wrap doSomething().
// Note that we use the "common pool" provided by the runtime environment.
// We could also create our own pool instead, but why bother?
// Note that some of the tasks will actaully be run by the main thread instead
// of by a thread from the pool.
tasks.add(ForkJoinPool.commonPool().submit(elem::doSomething));
}
for (ForkJoinTask<?> task : tasks) {
try {
task.get();
} catch (ExecutionException ex) {
// If doSomething() throws a RuntimeException it will be rethrown as an ExecutionException.
// The thrown RuntimeException will appear as the "cause".
System.err.println("Task failed to finish properly, got ExecutionException: '" + ex.getMessage()
+ "' caused by: '" + ex.getCause() + "'");
} catch (CancellationException ex) {
System.err.println("Task was cancelled, hot CancellationException: " + ex.getMessage());
} catch (InterruptedException ex) {
// What should we do here? Just set the interrupt flag and throw.
// Note the ,ethod "doSomething()" does not actually throw it.
Thread.currentThread().interrupt();
throw ex;
}
}
System.out.println("DONE!");
}
@Test
void java8_multipleThreadsWithStream() throws InterruptedException {
final List<DoSomething> elements = Collections.unmodifiableList(createElements());
// If doSomething() threw a checked exception (a subclass of Exception),
// we COULD NOT use it as argument to forEach().
// If doSomething() throws a RuntimeException, the stream pipeline would terminate
// arbitrarily with any of the exceptions thrown in any of the threads.
elements.stream().parallel().forEach(DoSomething::doSomething);
System.out.println("DONE!");
}
}
Popular Pragmatic Bookshelf topics

page 20: … protoc command…
I had to additionally run the following go get commands in order to be able to compile protobuf code using go...
New

page 37
ANTLRInputStream input = new ANTLRInputStream(is);
as of ANTLR 4 .8 should be:
CharStream stream = CharStreams.fromStream(i...
New

Working through the steps (checking that the Info,plist matches exactly), run the demo game and what appears is grey but does not fill th...
New

Hi! I know not the intentions behind this narrative when called, on page XI:
mount() |> handle_event() |> render()
but the correc...
New

First, the code resources:
Page 237: rumbl_umbrella/apps/rumbl/mix.exs
Note: That this file is missing.
Page 238: rumbl_umbrella/app...
New

The book has the same “Problem space/Solution space” diagram on page 18 as is on page 17. The correct Problem/Solution space diagrams ar...
New

I’m running Android Studio “Arctic Fox” 2020.3.1 Patch 2, and I’m embarrassed to admit that I only made it to page 8 before running into ...
New

I found an issue in Chapter 7 regarding android:backgroundTint vs app:backgroundTint.
How to replicate:
load chapter-7 from zipfile i...
New

Hey there,
I’m enjoying this book and have learned a few things alredayd. However, in Chapter 4 I believe we are meant to see the “>...
New

The markup used to display the uploaded image results in a Phoenix.LiveView.HTMLTokenizer.ParseError error.
lib/pento_web/live/product_l...
New
Other popular topics

Reading something? Working on something? Planning something? Changing jobs even!?
If you’re up for sharing, please let us know what you’...
New

No chair. I have a standing desk.
This post was split into a dedicated thread from our thread about chairs :slight_smile:
New

Curious to know which languages and frameworks you’re all thinking about learning next :upside_down_face:
Perhaps if there’s enough peop...
New

My first contact with Erlang was about 2 years ago when I used RabbitMQ, which is written in Erlang, for my job. This made me curious and...
New

This looks like a stunning keycap set :orange_heart:
A LEGENDARY KEYBOARD LIVES ON
When you bought an Apple Macintosh computer in the e...
New

Oh just spent so much time on this to discover now that RancherOS is in end of life but Rancher is refusing to mark the Github repo as su...
New

Use WebRTC to build web applications that stream media and data in real time directly from one user to another, all in the browser.
...
New

Saw this on TikTok of all places! :lol:
Anyone heard of them before?
Lite:
New

Author Spotlight
Jamis Buck
@jamis
This month, we have the pleasure of spotlighting author Jamis Buck, who has written Mazes for Prog...
New

Author Spotlight
Erin Dees
@undees
Welcome to our new author spotlight! We had the pleasure of chatting with Erin Dees, co-author of ...
New
Categories:
Sub Categories:
Popular Portals
- /elixir
- /rust
- /ruby
- /wasm
- /erlang
- /phoenix
- /keyboards
- /rails
- /python
- /js
- /security
- /go
- /swift
- /vim
- /clojure
- /emacs
- /haskell
- /java
- /svelte
- /onivim
- /typescript
- /kotlin
- /c-plus-plus
- /crystal
- /tailwind
- /react
- /gleam
- /ocaml
- /elm
- /flutter
- /vscode
- /ash
- /opensuse
- /html
- /centos
- /php
- /deepseek
- /zig
- /scala
- /textmate
- /sublime-text
- /lisp
- /react-native
- /nixos
- /debian
- /agda
- /kubuntu
- /arch-linux
- /django
- /ubuntu
- /revery
- /manjaro
- /spring
- /nodejs
- /diversity
- /lua
- /deno
- /julia
- /slackware
- /c