13. Threading
13.1. Different ways to create threads
13.1.1. Implementing Runnable
1public static class TikTok implements Runnable {
2 final int maxIters = 5;
3 final int sleepTime = 500;
4
5 public void run() {
6 for (int i = 0; i < maxIters; i++) {
7 System.out.println("hi");
8
9 try {
10 Thread.sleep(sleepTime);
11 } catch (InterruptedException e) {
12 // swallow
13 }
14 }
15 }
16}
1var t = new Thread(new TikTok());
2t.start();
13.1.2. Extending Thread
1public static class TikTok extends Thread {
2 final int maxIters = 5;
3 final int sleepTime = 500;
4
5 @Override
6 public void run() {
7 for (int i = 0; i < maxIters; i++) {
8 System.out.println("hi");
9
10 try {
11 Thread.sleep(sleepTime);
12 } catch (InterruptedException e) {
13 // swallow
14 }
15 }
16 }
17}
1var t = new Thread(new TikTok());
2t.start();
13.1.3. Anonymous subclass
1var t = new Thread("test") {
2 final int maxIters = 5;
3 final int sleepTime = 500;
4
5 @Override
6 public void run() {
7 for (int i = 0; i < maxIters; i++) {
8 System.out.println("hi");
9
10 try {
11 Thread.sleep(sleepTime);
12 } catch (InterruptedException e) {
13 // swallow
14 }
15 }
16 }
17};
18
19t.start();
13.1.4. Lambda runnable
1Thread t = new Thread(() -> {
2 final int maxIters = 5;
3 final int sleepTime = 500;
4
5 for (int i = 0; i < maxIters; i++) {
6 System.out.println("hi");
7
8 try {
9 Thread.sleep(sleepTime);
10 } catch (InterruptedException e) {
11 // swallow
12 }
13 }
14});
15
16t.start();
13.2. Multiple threads
1Thread[] threads = new Thread[5];
2
3for (int i = 0; i < threads.length; i++) {
4 final int id = i;
5 threads[i] = new Thread(() -> {
6 for (int j = 0; j < 5; j++) {
7 System.out.println("hi " + id + ": " + j);
8
9 try {
10 Thread.sleep(1000L);
11 } catch (InterruptedException e) {
12 e.printStackTrace();
13 }
14 }
15 });
16}
17
18for (Thread t : threads) {
19 t.start();
20}
21
22for (Thread t : threads) {
23 t.join();
24}
13.3. Timer and TimerTask
1Timer timer = new Timer();
2
3timer.schedule(new TimerTask() {
4 @Override
5 public void run() {
6 System.out.println("hi");
7 }
8 }, 1000, 500);
9
10timer.schedule(new TimerTask() {
11 @Override
12 public void run() {
13 System.out.println("bye");
14 }
15}, 1000, 200);
16
17try {
18 Thread.sleep(3000L);
19} catch (InterruptedException e) {
20 // swallow
21}
22
23timer.cancel();
13.4. Executor
1import java.util.concurrent.CountDownLatch;
2import java.util.concurrent.ExecutorService;
3import java.util.concurrent.Executors;
1public static class Worker extends Thread {
2 private final int delay;
3 private final CountDownLatch latch;
4
5 public Worker(String name, int delay, CountDownLatch latch) {
6 super(name);
7 this.delay = delay;
8 this.latch = latch;
9 }
10
11 @Override
12 public void run() {
13 while (latch.getCount() > 0) {
14 System.out.println(getName() + ": " + latch.getCount());
15
16 latch.countDown();
17
18 try {
19 Thread.sleep(delay);
20 } catch (InterruptedException e) {
21 // swallow
22 }
23 }
24 }
25}
1String[] names = { "a", "b", "c" };
2int[] delays = { 500, 1000, 2000 };
3CountDownLatch[] latches = {
4 new CountDownLatch(5),
5 new CountDownLatch(10),
6 new CountDownLatch(3)
7};
8
9Thread[] threads = new Thread[3];
10for (int i = 0; i < threads.length; i++) {
11 threads[i] = new Worker(names[i], delays[i], latches[i]);
12}
13
14ExecutorService service = Executors.newFixedThreadPool(5);
15for (Thread thread : threads) {
16 service.execute(thread);
17}
18
19for (CountDownLatch latch : latches) {
20 try {
21 latch.await();
22 } catch (InterruptedException e) {
23 // swallow
24 }
25}
26
27service.shutdown();
28
29System.out.println("done!");
13.5. Callable
1import java.util.ArrayList;
2import java.util.List;
3import java.util.Random;
4import java.util.concurrent.Callable;
5import java.util.concurrent.ExecutorService;
6import java.util.concurrent.Executors;
7import java.util.concurrent.Future;
1public static class Tuple {
2 public final String name;
3 public final Integer factorial;
4
5 public Tuple(String name, Integer factorial) {
6 this.name = name;
7 this.factorial = factorial;
8 }
9
10 @Override
11 public String toString() {
12 return name + " : " + factorial;
13 }
14}
15
16public static class FactorialWorker implements Callable<Tuple> {
17 private final String name;
18 private final Integer num;
19
20 public FactorialWorker(String name, Integer num) {
21 this.name = name;
22 this.num = num;
23 }
24
25 @Override
26 public Tuple call() throws Exception {
27 int value = 1;
28 for (int i = 2; i < num; i++) {
29 value *= i;
30 }
31 return new Tuple(name, value);
32 }
33}
1final int numWorkers = 100;
2final Random random = new Random(37L);
3
4List<Callable<Tuple>> callables = new ArrayList<>();
5for (int i = 0; i < numWorkers; i++) {
6 Integer num = random.nextInt(15);
7 Callable<Tuple> callable = new FactorialWorker(String.valueOf(i), num);
8 callables.add(callable);
9}
10
11ExecutorService service = Executors.newFixedThreadPool(5);
12List<Future<Tuple>> futures = new ArrayList<>();
13for (var callable : callables) {
14 futures.add(service.submit(callable));
15}
16
17for (var future : futures) {
18 System.out.println(future.get());
19}
20
21service.shutdown();
22
23System.out.println("done!");
13.7. Fork/Join and Recursive Task
1import java.io.IOException;
2import java.nio.file.Files;
3import java.nio.file.Paths;
4import java.util.ArrayList;
5import java.util.Arrays;
6import java.util.HashMap;
7import java.util.List;
8import java.util.Map;
9import java.util.concurrent.ForkJoinPool;
10import java.util.concurrent.RecursiveTask;
1public static class TokenLengthCounter extends RecursiveTask<Map<Integer, Integer>> {
2 private final long workLoad = 100;
3 private final String[] tokens;
4
5 public TokenLengthCounter(String[] tokens) {
6 this.tokens = tokens;
7 }
8
9 @Override
10 protected Map<Integer, Integer> compute() {
11 if (tokens.length > workLoad) {
12 List<TokenLengthCounter> counters = createSubtasks();
13 for (var counter : counters) {
14 counter.fork();
15 }
16
17 Map<Integer, Integer> counts = new HashMap<>();
18
19 for (var counter : counters) {
20 Map<Integer, Integer> m = counter.join();
21
22 for (var entry : m.entrySet()) {
23 Integer key = entry.getKey();
24 Integer val = entry.getValue();
25
26 if (!counts.containsKey(key)) {
27 counts.put(key, 0);
28 }
29
30 Integer total = counts.get(key) + val;
31 counts.put(key, total);
32 }
33 }
34
35 return counts;
36 } else {
37 return doCount();
38 }
39 }
40
41 private List<TokenLengthCounter> createSubtasks() {
42 final int n = tokens.length;
43 final String[] lhs = Arrays.copyOfRange(tokens, 0, (n + 1) / 2);
44 final String[] rhs = Arrays.copyOfRange(tokens, (n + 1) / 2, n);
45
46 List<TokenLengthCounter> counters = new ArrayList<>() {{
47 add(new TokenLengthCounter(lhs));
48 add(new TokenLengthCounter(rhs));
49 }};
50
51 return counters;
52 }
53
54 private Map<Integer, Integer> doCount() {
55 Map<Integer, Integer> counts = new HashMap<>();
56 for (var token : tokens) {
57 Integer length = token.length();
58
59 if (!counts.containsKey(length)) {
60 counts.put(length, 0);
61 }
62
63 Integer total = counts.get(length) + 1;
64 counts.put(length, total);
65 }
66 return counts;
67 }
68}
1private static String[] getTokens() throws IOException {
2 String text = new String(
3 Files.readAllBytes(
4 Paths.get("5827-8.txt")));
5 return text.split("\\s+");
6}
1String[] tokens = getTokens();
2TokenLengthCounter counter = new TokenLengthCounter(getTokens());
3
4ForkJoinPool pool = ForkJoinPool.commonPool();
5Map<Integer, Integer> counts = pool.invoke(counter);
6
7for (var entry : counts.entrySet()) {
8 System.out.println(entry.getKey() + " : " + entry.getValue());
9}