7. Threading

Modern Java code should start with virtual threads for high-concurrency tasks that mostly wait on I/O. Platform threads, executors, atomics, and fork/join are still useful, but they are no longer the only practical tools.

7.1. Virtual threads

Virtual threads are lightweight Java threads managed by the JDK. They are most useful when you have many concurrent tasks that block while waiting for files, network calls, databases, or other I/O.

1import java.time.Duration;
2import java.util.concurrent.Executors;
3import java.util.stream.IntStream;
 1public static void main(String[] args) {
 2  try (var executor = Executors.newVirtualThreadPerTaskExecutor()) {
 3    IntStream.range(0, 5).forEach(i ->
 4        executor.submit(() -> {
 5          Thread.sleep(Duration.ofMillis(10));
 6          System.out.println("task " + i + " ran on " + Thread.currentThread());
 7          return i;
 8        }));
 9  }
10}

Do not pool virtual threads to make them cheaper. They are already cheap. Use a separate limit, such as a semaphore, when you need to protect a database, service, or other constrained resource.

7.2. Scoped values

Scoped values share immutable data with methods called inside a bounded dynamic scope. They are easier to reason about than thread-local variables, especially with virtual threads.

 1private static final ScopedValue<String> REQUEST_ID = ScopedValue.newInstance();
 2
 3public static void main(String[] args) {
 4  ScopedValue.where(REQUEST_ID, "request-1001")
 5      .run(ScopedValueDemo::handle);
 6}
 7
 8private static void handle() {
 9  System.out.println("handling " + REQUEST_ID.get());
10}

Use scoped values for contextual data such as request IDs, authenticated users, or trace information when passing an argument through every method would make the call chain noisy.

7.3. Different ways to create threads

The following examples show the older platform-thread APIs. They are still part of Java and are important when reading existing code.

7.3.1. Implementing Runnable

 1public static class TikTok implements Runnable {
 2  final int maxIters = 5;
 3  final int sleepTime = 500;
 4
 5  public void run() {
 6    for (int i = 0; i < maxIters; i++) {
 7      System.out.println("hi");
 8
 9      try {
10        Thread.sleep(sleepTime);
11      } catch (InterruptedException e) {
12        // swallow
13      }
14    }
15  }
16}
1var t = new Thread(new TikTok());
2t.start();

7.3.2. Extending Thread

 1public static class TikTok extends Thread {
 2  final int maxIters = 5;
 3  final int sleepTime = 500;
 4
 5  @Override
 6  public void run() {
 7    for (int i = 0; i < maxIters; i++) {
 8      System.out.println("hi");
 9
10      try {
11        Thread.sleep(sleepTime);
12      } catch (InterruptedException e) {
13        // swallow
14      }
15    }
16  }
17}
1var t = new Thread(new TikTok());
2t.start();

7.3.3. Anonymous subclass

 1var t = new Thread("test") {
 2  final int maxIters = 5;
 3  final int sleepTime = 500;
 4
 5  @Override
 6  public void run() {
 7    for (int i = 0; i < maxIters; i++) {
 8      System.out.println("hi");
 9
10      try {
11        Thread.sleep(sleepTime);
12      } catch (InterruptedException e) {
13        // swallow
14      }
15    }
16  }
17};
18
19t.start();

7.3.4. Lambda runnable

 1Thread t = new Thread(() -> {
 2  final int maxIters = 5;
 3  final int sleepTime = 500;
 4
 5  for (int i = 0; i < maxIters; i++) {
 6    System.out.println("hi");
 7
 8    try {
 9      Thread.sleep(sleepTime);
10    } catch (InterruptedException e) {
11      // swallow
12    }
13  }
14});
15
16t.start();

7.4. Multiple threads

 1Thread[] threads = new Thread[5];
 2
 3for (int i = 0; i < threads.length; i++) {
 4  final int id = i;
 5  threads[i] = new Thread(() -> {
 6    for (int j = 0; j < 5; j++) {
 7      System.out.println("hi " + id + ": " + j);
 8
 9      try {
10        Thread.sleep(1000L);
11      } catch (InterruptedException e) {
12        e.printStackTrace();
13      }
14    }
15  });
16}
17
18for (Thread t : threads) {
19  t.start();
20}
21
22for (Thread t : threads) {
23  t.join();
24}

7.5. Timer and TimerTask

Timer and TimerTask are older scheduling APIs. Prefer modern executor services for new code unless you are maintaining an existing program that already uses Timer.

 1Timer timer = new Timer();
 2
 3timer.schedule(new TimerTask() {
 4    @Override
 5    public void run() {
 6      System.out.println("hi");
 7    }
 8  }, 1000, 500);
 9
10timer.schedule(new TimerTask() {
11  @Override
12  public void run() {
13    System.out.println("bye");
14  }
15}, 1000, 200);
16
17try {
18  Thread.sleep(3000L);
19} catch (InterruptedException e) {
20  // swallow
21}
22
23timer.cancel();

7.6. Executor

1import java.util.concurrent.CountDownLatch;
2import java.util.concurrent.ExecutorService;
3import java.util.concurrent.Executors;
 1public static class Worker extends Thread {
 2  private final int delay;
 3  private final CountDownLatch latch;
 4
 5  public Worker(String name, int delay, CountDownLatch latch) {
 6    super(name);
 7    this.delay = delay;
 8    this.latch = latch;
 9  }
10
11  @Override
12  public void run() {
13    while (latch.getCount() > 0) {
14      System.out.println(getName() + ": " + latch.getCount());
15
16      latch.countDown();
17
18      try {
19        Thread.sleep(delay);
20      } catch (InterruptedException e) {
21        // swallow
22      }
23    }
24  }
25}
 1String[] names = { "a", "b", "c" };
 2int[] delays = { 500, 1000, 2000 };
 3CountDownLatch[] latches = {
 4    new CountDownLatch(5),
 5    new CountDownLatch(10),
 6    new CountDownLatch(3)
 7};
 8
 9Thread[] threads = new Thread[3];
10for (int i = 0; i < threads.length; i++) {
11  threads[i] = new Worker(names[i], delays[i], latches[i]);
12}
13
14ExecutorService service = Executors.newFixedThreadPool(5);
15for (Thread thread : threads) {
16  service.execute(thread);
17}
18
19for (CountDownLatch latch : latches) {
20  try {
21    latch.await();
22  } catch (InterruptedException e) {
23    // swallow
24  }
25}
26
27service.shutdown();
28
29System.out.println("done!");

7.7. Callable

1import java.util.ArrayList;
2import java.util.List;
3import java.util.Random;
4import java.util.concurrent.Callable;
5import java.util.concurrent.ExecutorService;
6import java.util.concurrent.Executors;
7import java.util.concurrent.Future;
 1public static class Tuple {
 2  public final String name;
 3  public final Integer factorial;
 4
 5  public Tuple(String name, Integer factorial) {
 6    this.name = name;
 7    this.factorial = factorial;
 8  }
 9
10  @Override
11  public String toString() {
12    return name + " : " + factorial;
13  }
14}
15
16public static class FactorialWorker implements Callable<Tuple> {
17  private final String name;
18  private final Integer num;
19
20  public FactorialWorker(String name, Integer num) {
21    this.name = name;
22    this.num = num;
23  }
24
25  @Override
26  public Tuple call() throws Exception {
27    int value = 1;
28    for (int i = 2; i < num; i++) {
29      value *= i;
30    }
31    return new Tuple(name, value);
32  }
33}
 1final int numWorkers = 100;
 2final Random random = new Random(37L);
 3
 4List<Callable<Tuple>> callables = new ArrayList<>();
 5for (int i = 0; i < numWorkers; i++) {
 6  Integer num = random.nextInt(15);
 7  Callable<Tuple> callable = new FactorialWorker(String.valueOf(i), num);
 8  callables.add(callable);
 9}
10
11ExecutorService service = Executors.newFixedThreadPool(5);
12List<Future<Tuple>> futures = new ArrayList<>();
13for (var callable : callables) {
14  futures.add(service.submit(callable));
15}
16
17for (var future : futures) {
18  System.out.println(future.get());
19}
20
21service.shutdown();
22
23System.out.println("done!");

7.8. Sharing a variable across threads

1import java.util.ArrayList;
2import java.util.List;
3import java.util.Random;
4import java.util.concurrent.Callable;
5import java.util.concurrent.ExecutorService;
6import java.util.concurrent.Executors;
7import java.util.concurrent.Future;
8import java.util.concurrent.atomic.AtomicInteger;
 1private static final Random _random = new Random(37L);
 2
 3public static class Worker implements Callable<Integer> {
 4  private final AtomicInteger counter;
 5  private final int delay;
 6
 7  public Worker(AtomicInteger counter) {
 8    this.counter = counter;
 9    this.delay = _random.nextInt(2000);
10  }
11
12  @Override
13  public Integer call() throws Exception {
14    try {
15      Thread.sleep(delay);
16    } catch (InterruptedException e) {
17      // swallow
18    } finally {
19      counter.addAndGet(1);
20    }
21
22    return delay;
23  }
24}
 1final AtomicInteger counter = new AtomicInteger(0);
 2
 3final int numWorkers = 100;
 4List<Callable<Integer>> callables = new ArrayList<>();
 5for (int i = 0; i < numWorkers; i++) {
 6  callables.add(new Worker(counter));
 7}
 8
 9ExecutorService service = Executors.newFixedThreadPool(5);
10List<Future<Integer>> futures = new ArrayList<>();
11for (var callable : callables) {
12  futures.add(service.submit(callable));
13}
14
15for (var future : futures) {
16  System.out.println(future.get());
17}
18
19service.shutdown();
20
21System.out.println("done!");
22System.out.println(counter.get());

7.9. Fork/Join and Recursive Task

 1import java.io.IOException;
 2import java.nio.file.Files;
 3import java.nio.file.Paths;
 4import java.util.ArrayList;
 5import java.util.Arrays;
 6import java.util.HashMap;
 7import java.util.List;
 8import java.util.Map;
 9import java.util.concurrent.ForkJoinPool;
10import java.util.concurrent.RecursiveTask;
 1public static class TokenLengthCounter extends RecursiveTask<Map<Integer, Integer>> {
 2  private final long workLoad = 100;
 3  private final String[] tokens;
 4
 5  public TokenLengthCounter(String[] tokens) {
 6    this.tokens = tokens;
 7  }
 8
 9  @Override
10  protected Map<Integer, Integer> compute() {
11    if (tokens.length > workLoad) {
12      List<TokenLengthCounter> counters = createSubtasks();
13      for (var counter : counters) {
14        counter.fork();
15      }
16
17      Map<Integer, Integer> counts = new HashMap<>();
18
19      for (var counter : counters) {
20        Map<Integer, Integer> m = counter.join();
21
22        for (var entry : m.entrySet()) {
23          Integer key = entry.getKey();
24          Integer val = entry.getValue();
25
26          if (!counts.containsKey(key)) {
27            counts.put(key, 0);
28          }
29
30          Integer total = counts.get(key) + val;
31          counts.put(key, total);
32        }
33      }
34
35      return counts;
36    } else {
37      return doCount();
38    }
39  }
40
41  private List<TokenLengthCounter> createSubtasks() {
42    final int n = tokens.length;
43    final String[] lhs = Arrays.copyOfRange(tokens, 0, (n + 1) / 2);
44    final String[] rhs = Arrays.copyOfRange(tokens, (n + 1) / 2, n);
45
46    List<TokenLengthCounter> counters = new ArrayList<>() {{
47      add(new TokenLengthCounter(lhs));
48      add(new TokenLengthCounter(rhs));
49    }};
50
51    return counters;
52  }
53
54  private Map<Integer, Integer> doCount() {
55    Map<Integer, Integer> counts = new HashMap<>();
56    for (var token : tokens) {
57      Integer length = token.length();
58
59      if (!counts.containsKey(length)) {
60        counts.put(length, 0);
61      }
62
63      Integer total = counts.get(length) + 1;
64      counts.put(length, total);
65    }
66    return counts;
67  }
68}
1private static String[] getTokens() throws IOException {
2  String text = new String(
3      Files.readAllBytes(
4          Paths.get("5827-8.txt")));
5  return text.split("\\s+");
6}
1String[] tokens = getTokens();
2TokenLengthCounter counter = new TokenLengthCounter(getTokens());
3
4ForkJoinPool pool = ForkJoinPool.commonPool();
5Map<Integer, Integer> counts = pool.invoke(counter);
6
7for (var entry : counts.entrySet()) {
8  System.out.println(entry.getKey() + " : " + entry.getValue());
9}