1. Обзор
В этой статье мы увидим, как мы можем заблокировать определенный ключ, чтобы предотвратить одновременные действия с этим ключом, не препятствуя действиям с другими ключами.
В общем, мы хотим реализовать два метода и понять, как ими манипулировать:
недействительный замок (строковый ключ)
недействительная разблокировка (строковый ключ)
Для простоты руководства мы всегда будем предполагать, что наши ключи являются строками
. Вы можете заменить их типом объектов, которые вам нужны, при единственном условии, что методы equals
и hashCode
определены правильно, потому что мы будем использовать их в качестве ключей HashMap .
2. Простой взаимоисключающий замок
Во-первых, предположим, что мы хотим заблокировать любое запрошенное действие, если соответствующий ключ уже используется. Здесь мы скорее определим логический метод tryLock(String key)
вместо метода блокировки
, который мы себе представляли.
Конкретно, мы стремимся поддерживать набор
ключей, который мы будем заполнять ключами, используемыми в любой момент. Таким образом, когда запрашивается новое действие над ключом, нам просто придется отказаться от него, если мы обнаружим, что ключ уже используется другим потоком.
Проблема, с которой мы здесь сталкиваемся, заключается в том, что не существует потокобезопасной реализации Set
. Следовательно, мы будем использовать Set
, поддерживаемый ConcurrentHashMap
. Использование ConcurrentHashMap
гарантирует нам согласованность данных в многопоточной среде.
Давайте посмотрим на это в действии:
public class SimpleExclusiveLockByKey {
private static Set<String> usedKeys= ConcurrentHashMap.newKeySet();
public boolean tryLock(String key) {
return usedKeys.add(key);
}
public void unlock(String key) {
usedKeys.remove(key);
}
}
Вот как мы будем использовать этот класс:
String key = "key";
SimpleExclusiveLockByKey lockByKey = new SimpleExclusiveLockByKey();
try {
lockByKey.tryLock(key);
// insert the code that needs to be executed only if the key lock is available
} finally { // CRUCIAL
lockByKey.unlock(key);
}
Настоим на наличии блока finally
: внутри него крайне важно вызвать метод unlock .
Таким образом, даже если наш код выдаст исключение
в скобках try
, мы разблокируем ключ.
3. Приобретайте и открывайте замки с помощью ключей
Теперь давайте углубимся в проблему и скажем, что мы не хотим просто отказываться от одновременных действий на одних и тех же клавишах, а предпочитаем, чтобы новые входящие действия ждали завершения текущего действия на клавише.
Поток заявки будет:
- первый поток запрашивает блокировку ключа: он получает блокировку ключа
- второй поток запрашивает блокировку того же ключа: потоку 2 предлагается подождать
- первый поток освобождает блокировку ключа
- второй поток получает блокировку ключа и может выполнить свое действие
3.1. Определите блокировку с помощью счетчика потоков
В этом случае кажется естественным использовать Lock
. Вкратце, Lock
— это объект, используемый для синхронизации потоков, который позволяет блокировать потоки до тех пор, пока он не будет получен. Lock
— это интерфейс — мы будем использовать ReentrantLock
, базовую реализацию для него.
Давайте начнем с того, что обернем наш Lock
во внутренний класс. Этот класс сможет отслеживать количество потоков, ожидающих блокировки ключа. Он предоставит два метода: один для увеличения счетчика потока, а другой для его уменьшения:
private static class LockWrapper {
private final Lock lock = new ReentrantLock();
private final AtomicInteger numberOfThreadsInQueue = new AtomicInteger(1);
private LockWrapper addThreadInQueue() {
numberOfThreadsInQueue.incrementAndGet();
return this;
}
private int removeThreadFromQueue() {
return numberOfThreadsInQueue.decrementAndGet();
}
}
3.2. Позвольте замку обрабатывать потоки в очереди
Кроме того, мы продолжим использовать ConcurrentHashMap
. Но вместо того, чтобы просто извлекать ключи карты
, как мы делали раньше, мы будем использовать объекты LockWrapper
в качестве значений:
private static ConcurrentHashMap<String, LockWrapper> locks = new ConcurrentHashMap<String, LockWrapper>();
Когда поток хочет получить блокировку ключа, нам нужно посмотреть, присутствует ли уже LockWrapper
для этого ключа:
- если нет, мы
создадим новый экземпляр LockWrapper
для данного ключа со счетчиком, установленным на 1 - если это так, мы вернем существующий
LockWrapper
и увеличим связанный с ним счетчик .
Давайте посмотрим, как это делается:
public void lock(String key) {
LockWrapper lockWrapper = locks.compute(key, (k, v) -> v == null ? new LockWrapper() : v.addThreadInQueue());
lockWrapper.lock.lock();
}
Код очень лаконичен из-за использования метода вычисления
HashMap
. Приведем некоторые подробности о функционировании этого метода: ``
- метод
вычисления
применяется кблокировкам
объекта сключом
в качестве первого аргумента: извлекается начальное значение, соответствующееключу
вблокировках
BiFunction
, указанная в качестве второго аргументавычисления
, применяется кключу
и начальному значению: результат дает новое значение- новое значение заменяет начальное значение для ключевого
ключа
взамках
3.3. Разблокировать и при желании удалить запись карты
Кроме того, когда поток освобождает блокировку, мы уменьшаем количество потоков, связанных с LockWrapper
. Если количество равно нулю, мы удалим ключ из ConcurrentHashMap
:
public void unlock(String key) {
LockWrapper lockWrapper = locks.get(key);
lockWrapper.lock.unlock();
if (lockWrapper.removeThreadFromQueue() == 0) {
// NB : We pass in the specific value to remove to handle the case where another thread would queue right before the removal
locks.remove(key, lockWrapper);
}
}
3.4. Резюме
Вкратце, давайте посмотрим, как в итоге выглядит весь наш класс:
public class LockByKey {
private static class LockWrapper {
private final Lock lock = new ReentrantLock();
private final AtomicInteger numberOfThreadsInQueue = new AtomicInteger(1);
private LockWrapper addThreadInQueue() {
numberOfThreadsInQueue.incrementAndGet();
return this;
}
private int removeThreadFromQueue() {
return numberOfThreadsInQueue.decrementAndGet();
}
}
private static ConcurrentHashMap<String, LockWrapper> locks = new ConcurrentHashMap<String, LockWrapper>();
public void lock(String key) {
LockWrapper lockWrapper = locks.compute(key, (k, v) -> v == null ? new LockWrapper() : v.addThreadInQueue());
lockWrapper.lock.lock();
}
public void unlock(String key) {
LockWrapper lockWrapper = locks.get(key);
lockWrapper.lock.unlock();
if (lockWrapper.removeThreadFromQueue() == 0) {
// NB : We pass in the specific value to remove to handle the case where another thread would queue right before the removal
locks.remove(key, lockWrapper);
}
}
}
Использование очень похоже на то, что у нас было раньше:
String key = "key";
LockByKey lockByKey = new LockByKey();
try {
lockByKey.lock(key);
// insert your code here
} finally { // CRUCIAL
lockByKey.unlock(key);
}
4. Разрешить несколько действий одновременно
И последнее, но не менее важное: давайте рассмотрим другой случай: вместо того, чтобы позволять только одному потоку выполнять действие для данного ключа за раз, мы хотим ограничить количество потоков, которым разрешено одновременно действовать с одним и тем же ключом, до некоторого целого числа n
. Для простоты мы установим n
= 2.
Давайте подробно опишем наш вариант использования:
- первый поток хочет получить блокировку ключа: ему будет разрешено это сделать
- второй поток хочет получить ту же блокировку: это также будет разрешено
- третий поток запрашивает блокировку того же ключа: ему придется стоять в очереди, пока один из первых двух потоков не освободит свою блокировку.
Для этого созданы семафоры . Семафор
— это объект, используемый для ограничения количества потоков, одновременно обращающихся к ресурсу.
Глобальное функционирование и код очень похожи на то, что мы имели с блокировками:
public class SimultaneousEntriesLockByKey {
private static final int ALLOWED_THREADS = 2;
private static ConcurrentHashMap<String, Semaphore> semaphores = new ConcurrentHashMap<String, Semaphore>();
public void lock(String key) {
Semaphore semaphore = semaphores.compute(key, (k, v) -> v == null ? new Semaphore(ALLOWED_THREADS) : v);
semaphore.acquireUninterruptibly();
}
public void unlock(String key) {
Semaphore semaphore = semaphores.get(key);
semaphore.release();
if (semaphore.availablePermits() == ALLOWED_THREADS) {
semaphores.remove(key, semaphore);
}
}
}
Использование идентично:
String key = "key";
SimultaneousEntriesLockByKey lockByKey = new SimultaneousEntriesLockByKey();
try {
lockByKey.lock(key);
// insert your code here
} finally { // CRUCIAL
lockByKey.unlock(key);
}
5. Вывод
В этой статье мы увидели, как можно заблокировать ключи, чтобы либо полностью воспрепятствовать одновременным действиям, либо ограничить количество одновременных действий до одного (используя блокировки) или более (используя семафоры).
Как всегда, код доступен на GitHub .