背景
在 Java 中分布式锁的实现框架主要包括基于 数据库、Redis 和 Zookeeper 的实现方式。使用 Redis 实现的组件可以选择 Jedis API 或者 Redisson API,使用 Zookeeper 实现的组件可以选择 Zookeeper API 或者 Curator API。笔者在项目中看到不少这种混用 API 的情况,维护性较差。
目标
封装通用接口,屏蔽 API 细节,基于 Spring Boot 自动装配管理。
实现
首先,定义分布式锁接口。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| public interface DistributedLock {
String lockType();
boolean lock(String key);
boolean lock(String key, int waitTime, TimeUnit timeUnit);
void unlock(String key); }
|
使用 Jedis 实现接口。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
| @RequiredArgsConstructor @Slf4j public class JedisDistributedLock implements DistributedLock {
private static final String UNLOCK_LUA = "if redis.call(\"get\",KEYS[1]) == ARGV[1] " + "then return redis.call(\"del\",KEYS[1])" + "else return 0 end";
private final TransmittableThreadLocal<String> threadLocal = new TransmittableThreadLocal<>();
private final Jedis jedis;
@Override public String lockType() { return DistributedLockType.JEDIS.name(); }
@Override public boolean lock(@NonNull String key) { log.debug("Jedis create lock '{}'", key); String value = UUID.randomUUID().toString(); SetParams setParams = new SetParams(); setParams.ex(-1); boolean isSuccess; try { String result = jedis.set(key, value, setParams); isSuccess = StringUtils.isNotEmpty(result); } catch (Exception e) { log.error("Jedis create lock '{}', catch exception: {}", key, e.getMessage(), e); throw new DistributedLockAcquireException(e); } if (isSuccess) { threadLocal.set(value); log.debug("Jedis create lock '{}' successfully", key); } else { log.warn("Jedis create lock '{}' failed", key); } return isSuccess; }
@Override public boolean lock(@NonNull String key, int waitTime, TimeUnit timeUnit) { log.warn("Jedis create lock '{}' not support waitTime", key); return lock(key); }
@Override public void unlock(@NonNull String key) { log.debug("Jedis release lock '{}'", key); String uuid = threadLocal.get(); if (uuid == null) { log.warn("Jedis release lock '{}' failed due to thread local is null", key); return; } List<String> args = Collections.singletonList(uuid); List<String> keys = Collections.singletonList(key); Long result; try { result = (Long) jedis.eval(UNLOCK_LUA, keys, args); } catch (Exception e) { log.error("Jedis release lock '{}', catch exception: {}", key, e.getMessage(), e); throw new DistributedLockReleaseException(e); } if (result == null || result == 0L) { log.warn("Jedis release lock '{}', but it not work", key); } else { log.debug("Jedis release lock '{}' successfully", key); } } }
|
使用 Redisson 实现接口。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92
| @RequiredArgsConstructor @Slf4j public class RedissonDistributedLock implements DistributedLock {
private static final TransmittableThreadLocal<RLock> threadLocal = new TransmittableThreadLocal<>();
private final RedissonClient redissonClient;
@Override public String lockType() { return DistributedLockType.REDISSON.name(); }
@Override public boolean lock(@NonNull String key) { log.debug("Redisson create lock '{}'", key); RLock rLock = redissonClient.getFairLock(key); boolean isSuccess; try { isSuccess = rLock.tryLock(); } catch (Exception e) { log.error("Redisson create lock '{}', catch exception: {}", key, e.getMessage(), e); throw new DistributedLockAcquireException(e); } return isSuccess; }
@Override public boolean lock(@NonNull String key, int waitTime, TimeUnit timeUnit) { log.debug("Redisson create lock '{}' with waitTime '{}'", key, waitTime); RLock rLock = redissonClient.getFairLock(key); boolean isSuccess; try { isSuccess = rLock.tryLock(waitTime, 1, timeUnit); } catch (Exception e) { log.error("Redisson create lock '{}' with waitTime '{}', catch exception: {}", key, waitTime, e.getMessage(), e); throw new DistributedLockTimeoutException(e); } if (isSuccess) { threadLocal.set(rLock); log.debug("Redisson create lock '{}' with waitTime '{}' successfully", key, waitTime); } else { log.warn("Redisson create lock '{}' with waitTime '{}' failed", key, waitTime); } return isSuccess; }
@Override public void unlock(@NonNull String key) { log.debug("Redisson release lock '{}'", key); RLock rLock = threadLocal.get(); if (rLock == null) { log.warn("Redisson release lock '{}' failed due to thread local is null", key); return; } if (!rLock.isHeldByCurrentThread()) { log.warn("Curator release lock '{}' failed that is not held by current thread", key); return; } try { rLock.unlock(); threadLocal.remove(); } catch (Exception e) { log.error("Redisson release lock '{}', catch exception: {}", key, e.getMessage(), e); throw new DistributedLockReleaseException(e); } log.debug("Redisson release lock '{}' successfully", key); } }
|
使用 Zookeeper 实现接口。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82
| @RequiredArgsConstructor @Slf4j public class ZookeeperDistributedLock implements DistributedLock {
private static final TransmittableThreadLocal<String> threadLocal = new TransmittableThreadLocal<>();
private static final byte[] EMPTY_DATA = new byte[0];
private final ZooKeeper zooKeeper;
@Override public String lockType() { return DistributedLockType.ZOOKEEPER.name(); }
@Override public boolean lock(@NonNull String key) { log.debug("Zookeeper create lock '{}'", key); boolean isSuccess; String result; try { result = zooKeeper.create(key, EMPTY_DATA, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); isSuccess = StringUtils.isNotEmpty(result); } catch (Exception e) { log.error("Zookeeper create lock '{}', catch exception: {}", key, e.getMessage(), e); throw new DistributedLockAcquireException(e); } if (isSuccess) { threadLocal.set(result); log.debug("Zookeeper create lock '{}' successfully", key); } else { log.warn("Zookeeper create lock '{}' failed", key); } return isSuccess; }
@Override public boolean lock(@NonNull String key, int waitTime, TimeUnit timeUnit) { log.warn("Zookeeper create lock '{}' not support waitTime", key); return lock(key); }
@Override public void unlock(@NonNull String key) { log.debug("Zookeeper release lock '{}'", key); String result = threadLocal.get(); if (result == null) { log.warn("Zookeeper release lock '{}' failed due to thread local is null", key); return; } try { zooKeeper.delete(key, -1); } catch (Exception e) { log.error("Zookeeper release lock '{}', catch exception: {}", key, e.getMessage(), e); throw new DistributedLockReleaseException(e); } log.debug("Zookeeper release lock '{}' successfully", key); } }
|
使用 Curator 实现接口。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99
| @RequiredArgsConstructor @Slf4j public class CuratorDistributedLock implements DistributedLock {
private static final TransmittableThreadLocal<InterProcessMutex> threadLocal = new TransmittableThreadLocal<>();
private final CuratorFramework curatorFramework;
@Override public String lockType() { return DistributedLockType.CURATOR.name(); }
@Override public boolean lock(@NonNull String key) { log.debug("Curator create lock '{}'", key); if (!key.startsWith(Strings.SLASH)) { throw new DistributedLockAcquireException("Invalid curator lock: " + key); } InterProcessMutex interProcessMutex = new InterProcessMutex(curatorFramework, key); try { interProcessMutex.acquire(); threadLocal.set(interProcessMutex); } catch (Exception e) { log.error("Curator create lock '{}', catch exception: {}", key, e.getMessage(), e); throw new DistributedLockAcquireException(e); } log.debug("Curator create lock '{}' successfully", key); return true; }
@Override public boolean lock(@NonNull String key, int waitTime, TimeUnit timeUnit) { log.debug("Curator create lock '{}' with waitTime '{}'", key, waitTime); if (!key.startsWith(Strings.SLASH)) { throw new DistributedLockAcquireException("Invalid curator lock: " + key); } InterProcessMutex interProcessMutex = new InterProcessMutex(curatorFramework, key); boolean isSuccess; try { isSuccess = interProcessMutex.acquire(waitTime, timeUnit); } catch (Exception e) { log.error("Curator create lock '{}' with waitTime '{}', catch exception: {}", key, waitTime, e.getMessage(), e); throw new DistributedLockTimeoutException(e); } if (isSuccess) { threadLocal.set(interProcessMutex); log.debug("Curator create lock '{}' with waitTime '{} successfully", key, waitTime); } else { log.warn("Curator create lock '{}' with waitTime '{} failed", key, waitTime); } return isSuccess; }
@Override public void unlock(@NonNull String key) { log.debug("Curator release lock '{}'", key); InterProcessMutex interProcessMutex = threadLocal.get(); if (interProcessMutex == null) { log.warn("Curator release lock '{}' failed due to thread local is null", key); return; } if (!interProcessMutex.isAcquiredInThisProcess()) { log.warn("Curator release lock '{}' failed that is not acquired in process", key); return; } try { interProcessMutex.release(); threadLocal.remove(); } catch (Exception e) { log.error("Curator release lock: {}, catch exception: {}", key, e.getMessage(), e); throw new DistributedLockReleaseException(e.getMessage()); } log.debug("Curator release lock '{}' successfully", key); } }
|
将具体实现放入 Spring Boot 管理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
| @Setter @Getter @ConfigurationProperties(prefix = "distributed-lock") public class DistributedLockProperties {
private boolean enabled;
private String primary;
private final Redisson redisson = new Redisson();
private final Jedis jedis = new Jedis();
private final Curator curator = new Curator();
private final ZooKeeper zooKeeper = new ZooKeeper();
@Setter @Getter public static class Redisson {
private boolean enabled; }
@Setter @Getter public static class Jedis {
private boolean enabled; }
@Setter @Getter public static class Curator {
private boolean enabled; }
@Setter @Getter public static class ZooKeeper {
private boolean enabled; } }
@ConditionalOnProperty(prefix = "distributed-lock.jedis", name = "enabled", havingValue = true) @ConditionalOnBean(Jedis.class) @ConditionalOnClass(Jedis.class) @Slf4j @Role(BeanDefinition.ROLE_INFRASTRUCTURE) @Configuration(proxyBeanMethods = false) public class JedisDistributedLockAutoConfiguration {
@Bean public DistributedLock distributedLock(Jedis jedis) { log.debug("Autowired JedisDistributedLock"); return new JedisDistributedLock(jedis); } }
@ConditionalOnProperty(prefix = "distributed-lock.redisson", name = "enabled", havingValue = true) @AutoConfigureAfter(RedissonAutoConfiguration.class) @ConditionalOnBean(RedissonClient.class) @ConditionalOnClass(Redisson.class) @Slf4j @Role(BeanDefinition.ROLE_INFRASTRUCTURE) @Configuration(proxyBeanMethods = false) public class RedissonDistributedLockAutoConfiguration {
@Bean public DistributedLock distributedLock(RedissonClient redissonClient) { log.debug("Autowired RedissonDistributedLock"); return new RedissonDistributedLock(redissonClient); } }
@ConditionalOnProperty(prefix = "distributed-lock.zookeeper", name = "enabled", havingValue = true) @ConditionalOnClass(ZooKeeper.class) @ConditionalOnBean(ZookeeperTemplate.class) @Slf4j @Role(BeanDefinition.ROLE_INFRASTRUCTURE) @Configuration(proxyBeanMethods = false) public class ZookeeperDistributedLockAutoConfiguration {
@Bean public DistributedLock distributedLock(ZookeeperTemplate zookeeperTemplate) { log.debug("Autowired ZookeeperDistributedLock"); return new ZookeeperDistributedLock(zookeeperTemplate.getZookeeper()); } }
@ConditionalOnProperty(prefix = "distributed-lock.curator", name = "enabled", havingValue = true) @ConditionalOnClass(CuratorFramework.class) @Slf4j @Role(BeanDefinition.ROLE_INFRASTRUCTURE) @Configuration(proxyBeanMethods = false) public class CuratorDistributedLockAutoConfiguration {
@Bean public DistributedLock distributedLock(CuratorFramework curatorFramework) { log.debug("Autowired CuratorDistributedLock"); return new CuratorDistributedLock(curatorFramework); } }
|
当项目配置 distributed-lock.redisson.enabled=true 时,开启 Redisson 作为分布式锁的底层实现。
1 2 3 4 5 6 7 8 9 10 11
| distributed-lock: redisson: enabled: true
spring: redis: password: demo@123 timeout: 5000 database: 1 host: localhost port: 6379
|
当项目配置 distributed-lock.curator.enabled=true 时,开启 Curator 作为分布式锁的底层实现。
1 2 3 4 5 6 7 8 9
| distributed-lock: curator: enabled: true
spring: cloud: zookeeper: enabled: true connectString: localhost:2181
|
业务代码只需要引入 DistributedLock 接口,调用 lock 和 unlock 方法完成分布式场景的加锁和解锁。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| @RequiredArgsConstructor @Slf4j public cliass Demo {
private final DistributedLock distributedLock;
public void test() { String key = "demo"; if (distributedLock.lock(key, 2, TimeUnit.SECONDS)) { distributedLock.unlock(key); } } }
|
产出
通过 API 封装,简化了业务项目对分布式锁的繁琐编码工作,提高开发效率。
本文涉及的代码完全开源,感兴趣的伙伴可以查阅 eden-distributed-lock 和 eden-distributed-lock-spring-boot-starter。