13. Threading

13.1. Different ways to create threads

13.1.1. Implementing Runnable

 1public static class TikTok implements Runnable {
 2  final int maxIters = 5;
 3  final int sleepTime = 500;
 4
 5  public void run() {
 6    for (int i = 0; i < maxIters; i++) {
 7      System.out.println("hi");
 8
 9      try {
10        Thread.sleep(sleepTime);
11      } catch (InterruptedException e) {
12        // swallow
13      }
14    }
15  }
16}
1var t = new Thread(new TikTok());
2t.start();

13.1.2. Extending Thread

 1public static class TikTok extends Thread {
 2  final int maxIters = 5;
 3  final int sleepTime = 500;
 4
 5  @Override
 6  public void run() {
 7    for (int i = 0; i < maxIters; i++) {
 8      System.out.println("hi");
 9
10      try {
11        Thread.sleep(sleepTime);
12      } catch (InterruptedException e) {
13        // swallow
14      }
15    }
16  }
17}
1var t = new Thread(new TikTok());
2t.start();

13.1.3. Anonymous subclass

 1var t = new Thread("test") {
 2  final int maxIters = 5;
 3  final int sleepTime = 500;
 4
 5  @Override
 6  public void run() {
 7    for (int i = 0; i < maxIters; i++) {
 8      System.out.println("hi");
 9
10      try {
11        Thread.sleep(sleepTime);
12      } catch (InterruptedException e) {
13        // swallow
14      }
15    }
16  }
17};
18
19t.start();

13.1.4. Lambda runnable

 1Thread t = new Thread(() -> {
 2  final int maxIters = 5;
 3  final int sleepTime = 500;
 4
 5  for (int i = 0; i < maxIters; i++) {
 6    System.out.println("hi");
 7
 8    try {
 9      Thread.sleep(sleepTime);
10    } catch (InterruptedException e) {
11      // swallow
12    }
13  }
14});
15
16t.start();

13.2. Multiple threads

 1Thread[] threads = new Thread[5];
 2
 3for (int i = 0; i < threads.length; i++) {
 4  final int id = i;
 5  threads[i] = new Thread(() -> {
 6    for (int j = 0; j < 5; j++) {
 7      System.out.println("hi " + id + ": " + j);
 8
 9      try {
10        Thread.sleep(1000L);
11      } catch (InterruptedException e) {
12        e.printStackTrace();
13      }
14    }
15  });
16}
17
18for (Thread t : threads) {
19  t.start();
20}
21
22for (Thread t : threads) {
23  t.join();
24}

13.3. Timer and TimerTask

 1Timer timer = new Timer();
 2
 3timer.schedule(new TimerTask() {
 4    @Override
 5    public void run() {
 6      System.out.println("hi");
 7    }
 8  }, 1000, 500);
 9
10timer.schedule(new TimerTask() {
11  @Override
12  public void run() {
13    System.out.println("bye");
14  }
15}, 1000, 200);
16
17try {
18  Thread.sleep(3000L);
19} catch (InterruptedException e) {
20  // swallow
21}
22
23timer.cancel();

13.4. Executor

1import java.util.concurrent.CountDownLatch;
2import java.util.concurrent.ExecutorService;
3import java.util.concurrent.Executors;
 1public static class Worker extends Thread {
 2  private final int delay;
 3  private final CountDownLatch latch;
 4
 5  public Worker(String name, int delay, CountDownLatch latch) {
 6    super(name);
 7    this.delay = delay;
 8    this.latch = latch;
 9  }
10
11  @Override
12  public void run() {
13    while (latch.getCount() > 0) {
14      System.out.println(getName() + ": " + latch.getCount());
15
16      latch.countDown();
17
18      try {
19        Thread.sleep(delay);
20      } catch (InterruptedException e) {
21        // swallow
22      }
23    }
24  }
25}
 1String[] names = { "a", "b", "c" };
 2int[] delays = { 500, 1000, 2000 };
 3CountDownLatch[] latches = {
 4    new CountDownLatch(5),
 5    new CountDownLatch(10),
 6    new CountDownLatch(3)
 7};
 8
 9Thread[] threads = new Thread[3];
10for (int i = 0; i < threads.length; i++) {
11  threads[i] = new Worker(names[i], delays[i], latches[i]);
12}
13
14ExecutorService service = Executors.newFixedThreadPool(5);
15for (Thread thread : threads) {
16  service.execute(thread);
17}
18
19for (CountDownLatch latch : latches) {
20  try {
21    latch.await();
22  } catch (InterruptedException e) {
23    // swallow
24  }
25}
26
27service.shutdown();
28
29System.out.println("done!");

13.5. Callable

1import java.util.ArrayList;
2import java.util.List;
3import java.util.Random;
4import java.util.concurrent.Callable;
5import java.util.concurrent.ExecutorService;
6import java.util.concurrent.Executors;
7import java.util.concurrent.Future;
 1public static class Tuple {
 2  public final String name;
 3  public final Integer factorial;
 4
 5  public Tuple(String name, Integer factorial) {
 6    this.name = name;
 7    this.factorial = factorial;
 8  }
 9
10  @Override
11  public String toString() {
12    return name + " : " + factorial;
13  }
14}
15
16public static class FactorialWorker implements Callable<Tuple> {
17  private final String name;
18  private final Integer num;
19
20  public FactorialWorker(String name, Integer num) {
21    this.name = name;
22    this.num = num;
23  }
24
25  @Override
26  public Tuple call() throws Exception {
27    int value = 1;
28    for (int i = 2; i < num; i++) {
29      value *= i;
30    }
31    return new Tuple(name, value);
32  }
33}
 1final int numWorkers = 100;
 2final Random random = new Random(37L);
 3
 4List<Callable<Tuple>> callables = new ArrayList<>();
 5for (int i = 0; i < numWorkers; i++) {
 6  Integer num = random.nextInt(15);
 7  Callable<Tuple> callable = new FactorialWorker(String.valueOf(i), num);
 8  callables.add(callable);
 9}
10
11ExecutorService service = Executors.newFixedThreadPool(5);
12List<Future<Tuple>> futures = new ArrayList<>();
13for (var callable : callables) {
14  futures.add(service.submit(callable));
15}
16
17for (var future : futures) {
18  System.out.println(future.get());
19}
20
21service.shutdown();
22
23System.out.println("done!");

13.6. Sharing a variable across threads

1import java.util.ArrayList;
2import java.util.List;
3import java.util.Random;
4import java.util.concurrent.Callable;
5import java.util.concurrent.ExecutorService;
6import java.util.concurrent.Executors;
7import java.util.concurrent.Future;
8import java.util.concurrent.atomic.AtomicInteger;
 1private static final Random _random = new Random(37L);
 2
 3public static class Worker implements Callable<Integer> {
 4  private final AtomicInteger counter;
 5  private final int delay;
 6
 7  public Worker(AtomicInteger counter) {
 8    this.counter = counter;
 9    this.delay = _random.nextInt(2000);
10  }
11
12  @Override
13  public Integer call() throws Exception {
14    try {
15      Thread.sleep(delay);
16    } catch (InterruptedException e) {
17      // swallow
18    } finally {
19      counter.addAndGet(1);
20    }
21
22    return delay;
23  }
24}
 1final AtomicInteger counter = new AtomicInteger(0);
 2
 3final int numWorkers = 100;
 4List<Callable<Integer>> callables = new ArrayList<>();
 5for (int i = 0; i < numWorkers; i++) {
 6  callables.add(new Worker(counter));
 7}
 8
 9ExecutorService service = Executors.newFixedThreadPool(5);
10List<Future<Integer>> futures = new ArrayList<>();
11for (var callable : callables) {
12  futures.add(service.submit(callable));
13}
14
15for (var future : futures) {
16  System.out.println(future.get());
17}
18
19service.shutdown();
20
21System.out.println("done!");
22System.out.println(counter.get());

13.7. Fork/Join and Recursive Task

 1import java.io.IOException;
 2import java.nio.file.Files;
 3import java.nio.file.Paths;
 4import java.util.ArrayList;
 5import java.util.Arrays;
 6import java.util.HashMap;
 7import java.util.List;
 8import java.util.Map;
 9import java.util.concurrent.ForkJoinPool;
10import java.util.concurrent.RecursiveTask;
 1public static class TokenLengthCounter extends RecursiveTask<Map<Integer, Integer>> {
 2  private final long workLoad = 100;
 3  private final String[] tokens;
 4
 5  public TokenLengthCounter(String[] tokens) {
 6    this.tokens = tokens;
 7  }
 8
 9  @Override
10  protected Map<Integer, Integer> compute() {
11    if (tokens.length > workLoad) {
12      List<TokenLengthCounter> counters = createSubtasks();
13      for (var counter : counters) {
14        counter.fork();
15      }
16
17      Map<Integer, Integer> counts = new HashMap<>();
18
19      for (var counter : counters) {
20        Map<Integer, Integer> m = counter.join();
21
22        for (var entry : m.entrySet()) {
23          Integer key = entry.getKey();
24          Integer val = entry.getValue();
25
26          if (!counts.containsKey(key)) {
27            counts.put(key, 0);
28          }
29
30          Integer total = counts.get(key) + val;
31          counts.put(key, total);
32        }
33      }
34
35      return counts;
36    } else {
37      return doCount();
38    }
39  }
40
41  private List<TokenLengthCounter> createSubtasks() {
42    final int n = tokens.length;
43    final String[] lhs = Arrays.copyOfRange(tokens, 0, (n + 1) / 2);
44    final String[] rhs = Arrays.copyOfRange(tokens, (n + 1) / 2, n);
45
46    List<TokenLengthCounter> counters = new ArrayList<>() {{
47      add(new TokenLengthCounter(lhs));
48      add(new TokenLengthCounter(rhs));
49    }};
50
51    return counters;
52  }
53
54  private Map<Integer, Integer> doCount() {
55    Map<Integer, Integer> counts = new HashMap<>();
56    for (var token : tokens) {
57      Integer length = token.length();
58
59      if (!counts.containsKey(length)) {
60        counts.put(length, 0);
61      }
62
63      Integer total = counts.get(length) + 1;
64      counts.put(length, total);
65    }
66    return counts;
67  }
68}
1private static String[] getTokens() throws IOException {
2  String text = new String(
3      Files.readAllBytes(
4          Paths.get("5827-8.txt")));
5  return text.split("\\s+");
6}
1String[] tokens = getTokens();
2TokenLengthCounter counter = new TokenLengthCounter(getTokens());
3
4ForkJoinPool pool = ForkJoinPool.commonPool();
5Map<Integer, Integer> counts = pool.invoke(counter);
6
7for (var entry : counts.entrySet()) {
8  System.out.println(entry.getKey() + " : " + entry.getValue());
9}