public class CommonResource {
public int x = 0;
}
JSR 166: Concurrency Utilities
java.util.concurrent
Doug Lea
java.util.concurrent.atomic
- пакет с набором атомарных классов, позволяющих использовать принцип действия механизма оптимистической блокировки для выполнения атомарных операций.
Concurrency collections - набор более эффективно работающих в многопоточной среде коллекций нежели стандартные универсальные коллекции из java.util
пакета
Queues - объекты создания блокирующих и неблокирующих очередей с поддержкой многопоточности.
java.util.concurrent.locks
- пакет з набором механизмов синхронизации потоков, альтернативных базовым synchronized
, wait()
, notify()
, notifyAll()
Synchronizers - объекты синхронизации, позволяющие разработчику управлять и/или ограничивать работу нескольких потоков.
Executors - механизмы создания пулов потоков и планирования работы асинхронных задач
java.util.concurrent.atomic
java.util.concurrent.atomic
включает 9 атомарных классов для выполнения, так называемых, атомарных операций.
Операция является атомарной, если её можно безопасно выполнять при параллельных вычислениях в нескольких потоках, не используя при этом ни блокировок, ни синхронизацию synchronized
.
Atomic classes for Data Types:
AtomicBoolean
AtomicInteger
AtomicLong
AtomicReference
Atomic classes for Arrays:
AtomicIntegerArray
AtomicLongArray
AtomicReferenceArray
Atomic classes for to update fields (use Reflection):
AtomicIntegerFieldUpdater
AtomicLongFieldUpdater
AtomicReferenceFieldUpdater
Atomic classes for to build algorithms:
AtomicStampedReference
AtomicMarkableReference
CopyOnWriteArrayList
- реализует алгоритм CopyOnWrite и является потокобезопасным аналогом ArrayList
. Класс CopyOnWriteArrayList
содержит изменяемую ссылку на неизменяемый массив, обеспечивая преимущества потокобезопасности без необходимости использования блокировок, т.е. при выполнении модифицирующей операции
ConcurrentHashMap<K, V>
- реализует интерфейс java.util.concurrent.ConcurrentMap
и отличается от HashMap
и Hashtable
внутренней структурой хранения пар key-value.
CopyOnWriteArraySet
- выполнен на основе CopyOnWriteArrayList
с реализацией интерфейса Set
ConcurrentNavigableMap
- расширяет возможности интерфейса NavigableMap
для использования в многопоточных приложениях
ConcurrentSkipListMap
- является аналогом коллекции TreeMap
с сортировкой данных по ключу и с поддержкой многопоточности
ConcurrentSkipListSet
- выполнен на основе ConcurrentSkipListMap
с реализацией интерфейса Set
java.concurrent.locks
Lock
lock(): void
– захватить блокировку
unlock(): void
– отпустить блокировку
newCondition(): Condition
– создать условие
ReentrantLock
lockInterruptibly() throws InterruptedException: void
tryLock(): boolean
public class CommonResource {
public int x = 0;
}
public class CountThread implements Runnable {
private final CommonResource res;
private final ReentrantLock lock;
public CountThread(CommonResource res, ReentrantLock lock) {
this.res = res;
this.lock = lock;
}
public void run() {
lock.lock(); // устанавливаем блокировку
try {
for (int i = 1; i <= 4; i++) {
System.out.printf("%s %d \n", Thread.currentThread().getName(), res.x);
res.x++;
Thread.sleep(100);
}
} catch(InterruptedException e) {
System.out.println(e.getMessage());
} finally {
lock.unlock(); // снимаем блокировку
}
}
}
import java.util.concurrent.locks.ReentrantLock;
public class Program {
public static void main(String[] args) {
CommonResource commonResource = new CommonResource();
ReentrantLock locker = new ReentrantLock(); // создаем заглушку
for (int i = 1; i <= 5; i++) {
Thread t = new Thread(new CountThread(commonResource, locker));
t.setName("Thread " + i);
t.start();
}
}
}
ReadWriteLock
Condition
await(): void
await(long, TimeUnit): boolean
signal(): void
signalAll(): void
public class Store {
private int product = 0;
private final ReentrantLock lock;
private final Condition condition;
public Store() {
this.lock = new ReentrantLock(); // создаем блокировку
this.condition = this.lock.newCondition(); // получаем условие, связанное с блокировкой
}
public void get() {
this.lock.lock();
try {
while (this.product < 1) { // пока нет доступных товаров на складе
this.condition.await(); // ожидаем
}
this.product--;
System.out.println("Покупатель купил 1 товар");
System.out.println("Товаров на складе: " + this.product);
this.condition.signalAll(); // сигнализируем
} catch (InterruptedException e) {
System.out.println(e.getMessage());
} finally {
this.lock.unlock();
}
}
public void put() {
this.lock.lock();
try {
while (this.product >= 3) { // пока на складе 3 товара
condition.await(); // ждем освобождения места
}
this.product++;
System.out.println("Производитель добавил 1 товар");
System.out.println("Товаров на складе: " + this.product);
this.condition.signalAll(); // сигнализируем
} catch (InterruptedException e) {
System.out.println(e.getMessage());
} finally {
this.lock.unlock();
}
}
}
class Producer implements Runnable {
private final Store store;
public Producer(Store store) {
this.store = store;
}
public void run() {
for (int i = 1; i <= 5; i++) {
store.put();
}
}
}
class Consumer implements Runnable {
private final Store store;
public Consumer(Store store) {
this.store = store;
}
public void run() {
for (int i = 1; i <= 5; i++) {
store.get();
}
}
}
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.Condition;
public class Program {
public static void main(String[] args) {
Store store = new Store();
Producer producer = new Producer(store);
Consumer consumer = new Consumer(store);
new Thread(producer).start();
new Thread(consumer).start();
}
}
Semaphore
Semaphore
Ограничение на количество одновременно выполняемых сетевых запросов.
Ограничение на количество одновременных соединений к БД.
Ограничение на создание потоков выполнения.
Ограничение задач, высоко нагружающих память или процессор.
Семафор имеет емкость, указываемую при создании
Semaphore(int permits)
Semaphore(int permits, boolean fair)
acquire(n?)
– получить разрешение
release(n?)
– отдать разрешение
tryAquire(n?, time?)
– попробовать получить разрешение
reducePermits(n)
– уменьшить количество разрешений
Semaphore
Semaphore
Semaphore
CyclicBarrier
CyclicBarrier
Потоки блокируются пока все потоки не прибудут к барьеру.
Многоразовый
await()
- ожидание у барьера всех участников
reset()
- сброс барьера до первоначального состояния
CyclicBarrier
CountDownLatch
CountDownLatch
Exchanger<V>
Exchanger<V>
Phaser
register(): int
— регистрирует нового участника, который выполняет фазы. Возвращает номер текущей фазы;
getPhase(): int
— возвращает номер текущей фазы;
arriveAndAwaitAdvance(): int
— указывает что поток завершил выполнение фазы. Поток приостанавливается до момента, пока все остальные стороны не закончат выполнять данную фазу. Точный аналог CyclicBarrier.await()
. Возвращает номер текущей фазы;
arrive(): int
— сообщает, что поток завершил фазу, и возвращает номер фазы. При вызове данного метода поток не приостанавливается, а продолжает выполнятся;
arriveAndDeregister(): int
— сообщает о завершении всех фаз потоком и снимает его с регистрации. Возвращает номер текущей фазы;
awaitAdvance(int phase): int
— если phase равно номеру текущей фазы, приостанавливает вызвавший его поток до её окончания. В противном случае сразу возвращает аргумент.
Phaser
ExecutorService
ExecutorService
альтернатива классу Thread
предназначенному для управления потоками
в основе положен интерфейс Executor
работает с интерфейсами Runnable
, Callable<V>
, Future<V>
Executor
package java.util.concurrent;
public interface Executor {
void execute(Runnable command);
}
Callable<V>
package java.util.concurrent;
@FunctionalInterface
public interface Callable<V> {
V call() throws Exception;
}
Future<V>
package java.util.concurrent;
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
ExecutorService
shutdown(): void
isShutdown(): boolean
shutdownNow(): List<Runnable>
awaitTermination(long timeout, TimeUnit unit): boolean
isTerminated(): boolean
execute(Runnable): void
ExecutorService
invokeAny(Collection<? extends Callable<T>> tasks): T
invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit): T
invokeAll (Collection<? extends Callable<T>> tasks): List<Future<T>>
invokeAll (Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit): List<Future<T>>
ExecutorService
submit(Callable<T> task): Future<T>
submit(Runnable task, T result): Future<T>
submit(Runnable task): Future<?>
ExecutorService
SingleThreadExecutor
FixedThreadPool
CachedThreadPool
ScheduledThreadPool
Runnable
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.execute(new Runnable() {
public void run() {
System.out.println("Asynchronous task");
}
});
executorService.shutdown();
ExecutorService executorService = Executors.newFixedThreadPool(10);
executorService.execute(new Runnable() {
public void run() {
System.out.println("Asynchronous task");
}
});
executorService.shutdown();
Runnable
ExecutorService executorService = Executors.newSingleThreadExecutor();
Future future = executorService.submit(new Runnable() {
public void run() {
System.out.println("Asynchronous task");
}
});
future.get(); // returns null if the task has finished correctly.
executorService.shutdown();
invokeAny()
ExecutorService executorService = Executors.newSingleThreadExecutor();
Set<Callable<String>> callables = new HashSet<>();
callables.add(new Callable<String>() {
public String call() throws Exception {
return "Task 1";
}
});
callables.add(new Callable<String>() {
public String call() throws Exception {
return "Task 2";
}
});
callables.add(new Callable<String>() {
public String call() throws Exception {
return "Task 3";
}
});
String result = executorService.invokeAny(callables);
System.out.println("result = " + result);
executorService.shutdown();
invokeAll()
ExecutorService executorService = Executors.newSingleThreadExecutor();
Set<Callable<String>> callables = new HashSet<>();
callables.add(new Callable<String>() {
public String call() throws Exception {
return "Task 1";
}
});
callables.add(new Callable<String>() {
public String call() throws Exception {
return "Task 2";
}
});
callables.add(new Callable<String>() {
public String call() throws Exception {
return "Task 3";
}
});
List<Future<String>> futures = executorService.invokeAll(callables);
for (Future<String> future : futures) {
System.out.println("future.get = " + future.get());
}
executorService.shutdown();