public class CommonResource {
public int x = 0;
}
JSR 166: Concurrency Utilities
java.util.concurrent
Doug Lea
java.util.concurrent.atomic - пакет с набором атомарных классов, позволяющих использовать принцип действия механизма оптимистической блокировки для выполнения атомарных операций.
Concurrency collections - набор более эффективно работающих в многопоточной среде коллекций нежели стандартные универсальные коллекции из java.util пакета
Queues - объекты создания блокирующих и неблокирующих очередей с поддержкой многопоточности.
java.util.concurrent.locks - пакет з набором механизмов синхронизации потоков, альтернативных базовым synchronized, wait(), notify(), notifyAll()
Synchronizers - объекты синхронизации, позволяющие разработчику управлять и/или ограничивать работу нескольких потоков.
Executors - механизмы создания пулов потоков и планирования работы асинхронных задач
java.util.concurrent.atomicjava.util.concurrent.atomicвключает 9 атомарных классов для выполнения, так называемых, атомарных операций.
Операция является атомарной, если её можно безопасно выполнять при параллельных вычислениях в нескольких потоках, не используя при этом ни блокировок, ни синхронизацию synchronized.
Atomic classes for Data Types:
AtomicBoolean
AtomicInteger
AtomicLong
AtomicReference
Atomic classes for Arrays:
AtomicIntegerArray
AtomicLongArray
AtomicReferenceArray
Atomic classes for to update fields (use Reflection):
AtomicIntegerFieldUpdater
AtomicLongFieldUpdater
AtomicReferenceFieldUpdater
Atomic classes for to build algorithms:
AtomicStampedReference
AtomicMarkableReference
CopyOnWriteArrayList - реализует алгоритм CopyOnWrite и является потокобезопасным аналогом ArrayList. Класс CopyOnWriteArrayList содержит изменяемую ссылку на неизменяемый массив, обеспечивая преимущества потокобезопасности без необходимости использования блокировок, т.е. при выполнении модифицирующей операции
ConcurrentHashMap<K, V> - реализует интерфейс java.util.concurrent.ConcurrentMap и отличается от HashMap и Hashtable внутренней структурой хранения пар key-value.
CopyOnWriteArraySet - выполнен на основе CopyOnWriteArrayList с реализацией интерфейса Set
ConcurrentNavigableMap - расширяет возможности интерфейса NavigableMap для использования в многопоточных приложениях
ConcurrentSkipListMap - является аналогом коллекции TreeMap с сортировкой данных по ключу и с поддержкой многопоточности
ConcurrentSkipListSet - выполнен на основе ConcurrentSkipListMap с реализацией интерфейса Set
java.concurrent.locksLocklock(): void – захватить блокировку
unlock(): void – отпустить блокировку
newCondition(): Condition– создать условие
ReentrantLocklockInterruptibly() throws InterruptedException: void
tryLock(): boolean
public class CommonResource {
public int x = 0;
}public class CountThread implements Runnable {
private final CommonResource res;
private final ReentrantLock lock;
public CountThread(CommonResource res, ReentrantLock lock) {
this.res = res;
this.lock = lock;
}
public void run() {
lock.lock(); // устанавливаем блокировку
try {
for (int i = 1; i <= 4; i++) {
System.out.printf("%s %d \n", Thread.currentThread().getName(), res.x);
res.x++;
Thread.sleep(100);
}
} catch(InterruptedException e) {
System.out.println(e.getMessage());
} finally {
lock.unlock(); // снимаем блокировку
}
}
}import java.util.concurrent.locks.ReentrantLock;
public class Program {
public static void main(String[] args) {
CommonResource commonResource = new CommonResource();
ReentrantLock locker = new ReentrantLock(); // создаем заглушку
for (int i = 1; i <= 5; i++) {
Thread t = new Thread(new CountThread(commonResource, locker));
t.setName("Thread " + i);
t.start();
}
}
}ReadWriteLockConditionawait(): void
await(long, TimeUnit): boolean
signal(): void
signalAll(): void
public class Store {
private int product = 0;
private final ReentrantLock lock;
private final Condition condition;
public Store() {
this.lock = new ReentrantLock(); // создаем блокировку
this.condition = this.lock.newCondition(); // получаем условие, связанное с блокировкой
}
public void get() {
this.lock.lock();
try {
while (this.product < 1) { // пока нет доступных товаров на складе
this.condition.await(); // ожидаем
}
this.product--;
System.out.println("Покупатель купил 1 товар");
System.out.println("Товаров на складе: " + this.product);
this.condition.signalAll(); // сигнализируем
} catch (InterruptedException e) {
System.out.println(e.getMessage());
} finally {
this.lock.unlock();
}
}
public void put() {
this.lock.lock();
try {
while (this.product >= 3) { // пока на складе 3 товара
condition.await(); // ждем освобождения места
}
this.product++;
System.out.println("Производитель добавил 1 товар");
System.out.println("Товаров на складе: " + this.product);
this.condition.signalAll(); // сигнализируем
} catch (InterruptedException e) {
System.out.println(e.getMessage());
} finally {
this.lock.unlock();
}
}
}class Producer implements Runnable {
private final Store store;
public Producer(Store store) {
this.store = store;
}
public void run() {
for (int i = 1; i <= 5; i++) {
store.put();
}
}
}class Consumer implements Runnable {
private final Store store;
public Consumer(Store store) {
this.store = store;
}
public void run() {
for (int i = 1; i <= 5; i++) {
store.get();
}
}
}import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.Condition;
public class Program {
public static void main(String[] args) {
Store store = new Store();
Producer producer = new Producer(store);
Consumer consumer = new Consumer(store);
new Thread(producer).start();
new Thread(consumer).start();
}
}SemaphoreSemaphoreОграничение на количество одновременно выполняемых сетевых запросов.
Ограничение на количество одновременных соединений к БД.
Ограничение на создание потоков выполнения.
Ограничение задач, высоко нагружающих память или процессор.
Семафор имеет емкость, указываемую при создании
Semaphore(int permits)
Semaphore(int permits, boolean fair)
acquire(n?) – получить разрешение
release(n?) – отдать разрешение
tryAquire(n?, time?) – попробовать получить разрешение
reducePermits(n) – уменьшить количество разрешений
Semaphore
Semaphore
Semaphore
CyclicBarrierCyclicBarrierПотоки блокируются пока все потоки не прибудут к барьеру.
Многоразовый
await() - ожидание у барьера всех участников
reset() - сброс барьера до первоначального состояния
CyclicBarrier
CountDownLatchCountDownLatch
Exchanger<V>Exchanger<V>
Phaserregister(): int — регистрирует нового участника, который выполняет фазы. Возвращает номер текущей фазы;
getPhase(): int — возвращает номер текущей фазы;
arriveAndAwaitAdvance(): int — указывает что поток завершил выполнение фазы. Поток приостанавливается до момента, пока все остальные стороны не закончат выполнять данную фазу. Точный аналог CyclicBarrier.await(). Возвращает номер текущей фазы;
arrive(): int — сообщает, что поток завершил фазу, и возвращает номер фазы. При вызове данного метода поток не приостанавливается, а продолжает выполнятся;
arriveAndDeregister(): int — сообщает о завершении всех фаз потоком и снимает его с регистрации. Возвращает номер текущей фазы;
awaitAdvance(int phase): int — если phase равно номеру текущей фазы, приостанавливает вызвавший его поток до её окончания. В противном случае сразу возвращает аргумент.
Phaser
ExecutorServiceExecutorServiceальтернатива классу Thread
предназначенному для управления потоками
в основе положен интерфейс Executor
работает с интерфейсами Runnable, Callable<V>, Future<V>
Executorpackage java.util.concurrent;
public interface Executor {
void execute(Runnable command);
}Callable<V>package java.util.concurrent;
@FunctionalInterface
public interface Callable<V> {
V call() throws Exception;
}Future<V>package java.util.concurrent;
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}ExecutorServiceshutdown(): void
isShutdown(): boolean
shutdownNow(): List<Runnable>
awaitTermination(long timeout, TimeUnit unit): boolean
isTerminated(): boolean
execute(Runnable): void
ExecutorServiceinvokeAny(Collection<? extends Callable<T>> tasks): T
invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit): T
invokeAll (Collection<? extends Callable<T>> tasks): List<Future<T>>
invokeAll (Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit): List<Future<T>>
ExecutorServicesubmit(Callable<T> task): Future<T>
submit(Runnable task, T result): Future<T>
submit(Runnable task): Future<?>
ExecutorServiceSingleThreadExecutor
FixedThreadPool
CachedThreadPool
ScheduledThreadPool
RunnableExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.execute(new Runnable() {
public void run() {
System.out.println("Asynchronous task");
}
});
executorService.shutdown();ExecutorService executorService = Executors.newFixedThreadPool(10);
executorService.execute(new Runnable() {
public void run() {
System.out.println("Asynchronous task");
}
});
executorService.shutdown();RunnableExecutorService executorService = Executors.newSingleThreadExecutor();
Future future = executorService.submit(new Runnable() {
public void run() {
System.out.println("Asynchronous task");
}
});
future.get(); // returns null if the task has finished correctly.
executorService.shutdown();invokeAny()ExecutorService executorService = Executors.newSingleThreadExecutor();
Set<Callable<String>> callables = new HashSet<>();
callables.add(new Callable<String>() {
public String call() throws Exception {
return "Task 1";
}
});
callables.add(new Callable<String>() {
public String call() throws Exception {
return "Task 2";
}
});
callables.add(new Callable<String>() {
public String call() throws Exception {
return "Task 3";
}
});
String result = executorService.invokeAny(callables);
System.out.println("result = " + result);
executorService.shutdown();invokeAll()ExecutorService executorService = Executors.newSingleThreadExecutor();
Set<Callable<String>> callables = new HashSet<>();
callables.add(new Callable<String>() {
public String call() throws Exception {
return "Task 1";
}
});
callables.add(new Callable<String>() {
public String call() throws Exception {
return "Task 2";
}
});
callables.add(new Callable<String>() {
public String call() throws Exception {
return "Task 3";
}
});
List<Future<String>> futures = executorService.invokeAll(callables);
for (Future<String> future : futures) {
System.out.println("future.get = " + future.get());
}
executorService.shutdown();