Перейти к основному содержимому

Семафоры в Java

· 4 мин. чтения

1. Обзор

В этом кратком руководстве мы рассмотрим основы семафоров и мьютексов в Java.

2. Семафор

Начнем с java.util.concurrent.Semaphore. Мы можем использовать семафоры, чтобы ограничить количество одновременных потоков, обращающихся к определенному ресурсу.

В следующем примере мы реализуем простую очередь входа, чтобы ограничить количество пользователей в системе:

class LoginQueueUsingSemaphore {

private Semaphore semaphore;

public LoginQueueUsingSemaphore(int slotLimit) {
semaphore = new Semaphore(slotLimit);
}

boolean tryLogin() {
return semaphore.tryAcquire();
}

void logout() {
semaphore.release();
}

int availableSlots() {
return semaphore.availablePermits();
}

}

Обратите внимание, как мы использовали следующие методы:

  • tryAcquire() — возвращает true, если разрешение доступно немедленно, и получает его, в противном случае возвращает false, ноacquire() получает разрешение и блокирует его до тех пор, пока оно не будет доступно.
  • release() — освободить разрешение
  • availablePermits() — возвращает количество доступных текущих разрешений

Чтобы проверить нашу очередь входа в систему, мы сначала попытаемся достичь предела и проверим, будет ли заблокирована следующая попытка входа:

@Test
public void givenLoginQueue_whenReachLimit_thenBlocked() {
int slots = 10;
ExecutorService executorService = Executors.newFixedThreadPool(slots);
LoginQueueUsingSemaphore loginQueue = new LoginQueueUsingSemaphore(slots);
IntStream.range(0, slots)
.forEach(user -> executorService.execute(loginQueue::tryLogin));
executorService.shutdown();

assertEquals(0, loginQueue.availableSlots());
assertFalse(loginQueue.tryLogin());
}

Далее мы увидим, доступны ли какие-либо слоты после выхода из системы:

@Test
public void givenLoginQueue_whenLogout_thenSlotsAvailable() {
int slots = 10;
ExecutorService executorService = Executors.newFixedThreadPool(slots);
LoginQueueUsingSemaphore loginQueue = new LoginQueueUsingSemaphore(slots);
IntStream.range(0, slots)
.forEach(user -> executorService.execute(loginQueue::tryLogin));
executorService.shutdown();
assertEquals(0, loginQueue.availableSlots());
loginQueue.logout();

assertTrue(loginQueue.availableSlots() > 0);
assertTrue(loginQueue.tryLogin());
}

3. Временный семафор

Далее мы обсудим Apache Commons TimedSemaphore. TimedSemaphore допускает ряд разрешений как простой семафор, но в течение заданного периода времени, после этого периода время сбрасывается и все разрешения освобождаются.

Мы можем использовать TimedSemaphore для создания простой очереди задержки следующим образом:

class DelayQueueUsingTimedSemaphore {

private TimedSemaphore semaphore;

DelayQueueUsingTimedSemaphore(long period, int slotLimit) {
semaphore = new TimedSemaphore(period, TimeUnit.SECONDS, slotLimit);
}

boolean tryAdd() {
return semaphore.tryAcquire();
}

int availableSlots() {
return semaphore.getAvailablePermits();
}

}

Когда мы используем очередь задержки с одной секундой в качестве периода времени и после использования всех слотов в течение одной секунды, ни один из них не должен быть доступен:

public void givenDelayQueue_whenReachLimit_thenBlocked() {
int slots = 50;
ExecutorService executorService = Executors.newFixedThreadPool(slots);
DelayQueueUsingTimedSemaphore delayQueue
= new DelayQueueUsingTimedSemaphore(1, slots);

IntStream.range(0, slots)
.forEach(user -> executorService.execute(delayQueue::tryAdd));
executorService.shutdown();

assertEquals(0, delayQueue.availableSlots());
assertFalse(delayQueue.tryAdd());
}

Но после некоторого сна семафор должен сбросить и освободить разрешения :

@Test
public void givenDelayQueue_whenTimePass_thenSlotsAvailable() throws InterruptedException {
int slots = 50;
ExecutorService executorService = Executors.newFixedThreadPool(slots);
DelayQueueUsingTimedSemaphore delayQueue = new DelayQueueUsingTimedSemaphore(1, slots);
IntStream.range(0, slots)
.forEach(user -> executorService.execute(delayQueue::tryAdd));
executorService.shutdown();

assertEquals(0, delayQueue.availableSlots());
Thread.sleep(1000);
assertTrue(delayQueue.availableSlots() > 0);
assertTrue(delayQueue.tryAdd());
}

4. Семафор против мьютекса

Мьютекс действует аналогично бинарному семафору, мы можем использовать его для реализации взаимного исключения.

В следующем примере мы будем использовать простой двоичный семафор для создания счетчика:

class CounterUsingMutex {

private Semaphore mutex;
private int count;

CounterUsingMutex() {
mutex = new Semaphore(1);
count = 0;
}

void increase() throws InterruptedException {
mutex.acquire();
this.count = this.count + 1;
Thread.sleep(1000);
mutex.release();

}

int getCount() {
return this.count;
}

boolean hasQueuedThreads() {
return mutex.hasQueuedThreads();
}
}

Когда много потоков одновременно пытаются получить доступ к счетчику, они просто блокируются в очереди :

@Test
public void whenMutexAndMultipleThreads_thenBlocked()
throws InterruptedException {
int count = 5;
ExecutorService executorService
= Executors.newFixedThreadPool(count);
CounterUsingMutex counter = new CounterUsingMutex();
IntStream.range(0, count)
.forEach(user -> executorService.execute(() -> {
try {
counter.increase();
} catch (InterruptedException e) {
e.printStackTrace();
}
}));
executorService.shutdown();

assertTrue(counter.hasQueuedThreads());
}

Когда мы ждем, все потоки будут обращаться к счетчику, и в очереди не останется ни одного потока:

@Test
public void givenMutexAndMultipleThreads_ThenDelay_thenCorrectCount()
throws InterruptedException {
int count = 5;
ExecutorService executorService
= Executors.newFixedThreadPool(count);
CounterUsingMutex counter = new CounterUsingMutex();
IntStream.range(0, count)
.forEach(user -> executorService.execute(() -> {
try {
counter.increase();
} catch (InterruptedException e) {
e.printStackTrace();
}
}));
executorService.shutdown();

assertTrue(counter.hasQueuedThreads());
Thread.sleep(5000);
assertFalse(counter.hasQueuedThreads());
assertEquals(count, counter.getCount());
}

5. Вывод

В этой статье мы изучили основы семафоров в Java.

Как всегда, полный исходный код доступен на GitHub .