Concurrent Utilities (JSR 166)

Intro

Problem

Concurrency: Theory and Practice

Solution

JSR 166: Concurrency Utilities

java.util.concurrent

Whom to thank?

Doug Lea

Doug Lea

Consist

  • java.util.concurrent.atomic - пакет с набором атомарных классов, позволяющих использовать принцип действия механизма оптимистической блокировки для выполнения атомарных операций.

  • Concurrency collections - набор более эффективно работающих в многопоточной среде коллекций нежели стандартные универсальные коллекции из java.util пакета

  • Queues - объекты создания блокирующих и неблокирующих очередей с поддержкой многопоточности.

Consist

  • java.util.concurrent.locks - пакет з набором механизмов синхронизации потоков, альтернативных базовым synchronized, wait(), notify(), notifyAll()

  • Synchronizers - объекты синхронизации, позволяющие разработчику управлять и/или ограничивать работу нескольких потоков.

  • Executors - механизмы создания пулов потоков и планирования работы асинхронных задач

java.util.concurrent.atomic

java.util.concurrent.atomic

  • включает 9 атомарных классов для выполнения, так называемых, атомарных операций.

  • Операция является атомарной, если её можно безопасно выполнять при параллельных вычислениях в нескольких потоках, не используя при этом ни блокировок, ни синхронизацию synchronized.

Consist

  • Atomic classes for Data Types:

    • AtomicBoolean

    • AtomicInteger

    • AtomicLong

    • AtomicReference

  • Atomic classes for Arrays:

    • AtomicIntegerArray

    • AtomicLongArray

    • AtomicReferenceArray

Consist

  • Atomic classes for to update fields (use Reflection):

    • AtomicIntegerFieldUpdater

    • AtomicLongFieldUpdater

    • AtomicReferenceFieldUpdater

  • Atomic classes for to build algorithms:

    • AtomicStampedReference

    • AtomicMarkableReference

Concurrency collections

Consist

  • CopyOnWriteArrayList - реализует алгоритм CopyOnWrite и является потокобезопасным аналогом ArrayList. Класс CopyOnWriteArrayList содержит изменяемую ссылку на неизменяемый массив, обеспечивая преимущества потокобезопасности без необходимости использования блокировок, т.е. при выполнении модифицирующей операции

  • ConcurrentHashMap<K, V> - реализует интерфейс java.util.concurrent.ConcurrentMap и отличается от HashMap и Hashtable внутренней структурой хранения пар key-value.

Consist

  • CopyOnWriteArraySet - выполнен на основе CopyOnWriteArrayList с реализацией интерфейса Set

  • ConcurrentNavigableMap - расширяет возможности интерфейса NavigableMap для использования в многопоточных приложениях

  • ConcurrentSkipListMap - является аналогом коллекции TreeMap с сортировкой данных по ключу и с поддержкой многопоточности

  • ConcurrentSkipListSet - выполнен на основе ConcurrentSkipListMap с реализацией интерфейса Set

java.concurrent.locks

Interface Lock

Methods

  • lock(): void – захватить блокировку

  • unlock(): void – отпустить блокировку

  • newCondition(): Condition– создать условие

ReentrantLock

Methods

  • lockInterruptibly() throws InterruptedException: void

  • tryLock(): boolean

Example

public class CommonResource {
    public int x = 0;
}

Example

public class CountThread implements Runnable {
    private final CommonResource res;
    private final ReentrantLock lock;

    public CountThread(CommonResource res, ReentrantLock lock) {
        this.res = res;
        this.lock = lock;
    }

    public void run() {
        lock.lock(); // устанавливаем блокировку
        try {
            for (int i = 1; i <= 4; i++) {
                System.out.printf("%s %d \n", Thread.currentThread().getName(), res.x);
                res.x++;
                Thread.sleep(100);
            }
        } catch(InterruptedException e) {
            System.out.println(e.getMessage());
        } finally {
            lock.unlock(); // снимаем блокировку
        }
    }
}

Example

import java.util.concurrent.locks.ReentrantLock;

public class Program {
    public static void main(String[] args) {
        CommonResource commonResource = new CommonResource();
        ReentrantLock locker = new ReentrantLock(); // создаем заглушку
        for (int i = 1; i <= 5; i++) {
            Thread t = new Thread(new CountThread(commonResource, locker));
            t.setName("Thread " + i);
            t.start();
        }
    }
}

ReadWriteLock

Interface Condition

Methods

  • await(): void

  • await(long, TimeUnit): boolean

  • signal(): void

  • signalAll(): void

Example

public class Store {
    private int product = 0;
    private final ReentrantLock lock;
    private final Condition condition;

    public Store() {
        this.lock = new ReentrantLock(); // создаем блокировку
        this.condition = this.lock.newCondition(); // получаем условие, связанное с блокировкой
    }

    public void get() {
        this.lock.lock();
        try {
            while (this.product < 1) { // пока нет доступных товаров на складе
                this.condition.await(); // ожидаем
            }

            this.product--;
            System.out.println("Покупатель купил 1 товар");
            System.out.println("Товаров на складе: " + this.product);

            this.condition.signalAll(); // сигнализируем
        } catch (InterruptedException e) {
            System.out.println(e.getMessage());
        } finally {
            this.lock.unlock();
        }
    }

    public void put() {
        this.lock.lock();
        try {
            while (this.product >= 3) { // пока на складе 3 товара
                condition.await(); // ждем освобождения места
            }

            this.product++;
            System.out.println("Производитель добавил 1 товар");
            System.out.println("Товаров на складе: " + this.product);

            this.condition.signalAll(); // сигнализируем
        } catch (InterruptedException e) {
            System.out.println(e.getMessage());
        } finally {
            this.lock.unlock();
        }
    }
}

Example

class Producer implements Runnable {
    private final Store store;

    public Producer(Store store) {
       this.store = store;
    }

    public void run() {
        for (int i = 1; i <= 5; i++) {
            store.put();
        }
    }
}

Example

class Consumer implements Runnable {
    private final Store store;

    public Consumer(Store store) {
       this.store = store;
    }

    public void run() {
        for (int i = 1; i <= 5; i++) {
            store.get();
        }
    }
}

Example

import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.Condition;

public class Program {
    public static void main(String[] args) {
        Store store = new Store();
        Producer producer = new Producer(store);
        Consumer consumer = new Consumer(store);
        new Thread(producer).start();
        new Thread(consumer).start();
    }
}

Synchronizers

Semaphore

Semaphore

  • Ограничение на количество одновременно выполняемых сетевых запросов.

  • Ограничение на количество одновременных соединений к БД.

  • Ограничение на создание потоков выполнения.

  • Ограничение задач, высоко нагружающих память или процессор.

  • Семафор имеет емкость, указываемую при создании

Constructors

  • Semaphore(int permits)

  • Semaphore(int permits, boolean fair)

Methods

  • acquire(n?) – получить разрешение

  • release(n?) – отдать разрешение

  • tryAquire(n?, time?) – попробовать получить разрешение

  • reducePermits(n) – уменьшить количество разрешений

Semaphore

Semaphore

Semaphore

Semaphore

Semaphore

Java monitor

CyclicBarrier

CyclicBarrier

  • Потоки блокируются пока все потоки не прибудут к барьеру.

  • Многоразовый

Methods

  • await() - ожидание у барьера всех участников

  • reset() - сброс барьера до первоначального состояния

CyclicBarrier

CyclicBarrier

CountDownLatch

CountDownLatch

CountDownLatch

Exchanger<V>

Exchanger<V>

Exchanger<V>

Phaser

Methods

  • register(): int — регистрирует нового участника, который выполняет фазы. Возвращает номер текущей фазы;

  • getPhase(): int — возвращает номер текущей фазы;

  • arriveAndAwaitAdvance(): int — указывает что поток завершил выполнение фазы. Поток приостанавливается до момента, пока все остальные стороны не закончат выполнять данную фазу. Точный аналог CyclicBarrier.await(). Возвращает номер текущей фазы;

Methods

  • arrive(): int — сообщает, что поток завершил фазу, и возвращает номер фазы. При вызове данного метода поток не приостанавливается, а продолжает выполнятся;

  • arriveAndDeregister(): int — сообщает о завершении всех фаз потоком и снимает его с регистрации. Возвращает номер текущей фазы;

  • awaitAdvance(int phase): int — если phase равно номеру текущей фазы, приостанавливает вызвавший его поток до её окончания. В противном случае сразу возвращает аргумент.

Phaser

Phaser

Executors

ExecutorService

ExecutorService

  • альтернатива классу Thread

  • предназначенному для управления потоками

  • в основе положен интерфейс Executor

  • работает с интерфейсами Runnable, Callable<V>, Future<V>

Interface Executor

package java.util.concurrent;

public interface Executor {
    void execute(Runnable command);
}

Interface Callable<V>

package java.util.concurrent;

@FunctionalInterface
public interface Callable<V> {
    V call() throws Exception;
}

Interface Future<V>

package java.util.concurrent;

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);

    boolean isCancelled();

    boolean isDone();

    V get() throws InterruptedException, ExecutionException;

    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

Interface ExecutorService

  • shutdown(): void

  • isShutdown(): boolean

  • shutdownNow(): List<Runnable>

  • awaitTermination(long timeout, TimeUnit unit): boolean

  • isTerminated(): boolean

  • execute(Runnable): void

Interface ExecutorService

  • invokeAny(Collection<? extends Callable<T>> tasks): T

  • invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit): T

  • invokeAll (Collection<? extends Callable<T>> tasks): List<Future<T>>

  • invokeAll (Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit): List<Future<T>>

Interface ExecutorService

  • submit(Callable<T> task): Future<T>

  • submit(Runnable task, T result): Future<T>

  • submit(Runnable task): Future<?>

Implementation for ExecutorService

  • SingleThreadExecutor

  • FixedThreadPool

  • CachedThreadPool

  • ScheduledThreadPool

Execute Runnable

ExecutorService executorService = Executors.newSingleThreadExecutor();

executorService.execute(new Runnable() {
    public void run() {
        System.out.println("Asynchronous task");
    }
});

executorService.shutdown();

Example

ExecutorService executorService = Executors.newFixedThreadPool(10);

executorService.execute(new Runnable() {
    public void run() {
        System.out.println("Asynchronous task");
    }
});

executorService.shutdown();

Submit Runnable

ExecutorService executorService = Executors.newSingleThreadExecutor();

Future future = executorService.submit(new Runnable() {
    public void run() {
        System.out.println("Asynchronous task");
    }
});

future.get(); // returns null if the task has finished correctly.

executorService.shutdown();

invokeAny()

ExecutorService executorService = Executors.newSingleThreadExecutor();

Set<Callable<String>> callables = new HashSet<>();

callables.add(new Callable<String>() {
    public String call() throws Exception {
        return "Task 1";
    }
});
callables.add(new Callable<String>() {
    public String call() throws Exception {
        return "Task 2";
    }
});
callables.add(new Callable<String>() {
    public String call() throws Exception {
        return "Task 3";
    }
});

String result = executorService.invokeAny(callables);

System.out.println("result = " + result);

executorService.shutdown();

invokeAll()

ExecutorService executorService = Executors.newSingleThreadExecutor();

Set<Callable<String>> callables = new HashSet<>();

callables.add(new Callable<String>() {
    public String call() throws Exception {
        return "Task 1";
    }
});
callables.add(new Callable<String>() {
    public String call() throws Exception {
        return "Task 2";
    }
});
callables.add(new Callable<String>() {
    public String call() throws Exception {
        return "Task 3";
    }
});

List<Future<String>> futures = executorService.invokeAll(callables);

for (Future<String> future : futures) {
    System.out.println("future.get = " + future.get());
}

executorService.shutdown();