| 知乎专栏 |
目录
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-integration</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-redis</artifactId>
</dependency>
package cn.netkiller.config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.integration.redis.util.RedisLockRegistry;
@Configuration
public class RedisLockRegistryConfiguration {
@Bean
public RedisLockRegistry redisLockRegistry(RedisConnectionFactory redisConnectionFactory) {
return new RedisLockRegistry(redisConnectionFactory, "netkiller-lock");
}
}
配置默认超时时间
@Bean(destroyMethod = "destroy")
public RedisLockRegistry redisLockRegistry(RedisConnectionFactory redisConnectionFactory) {
return new RedisLockRegistry(redisConnectionFactory, "neo-lock",
TimeUnit.MINUTES.toMillis(10));
}
通过 Autowired 注解使用 RedisLockRegistry
@Autowired
private RedisLockRegistry redisLockRegistry;
Lock lock = redisLockRegistry.obtain(device);
if (lock.tryLock()) {
try {
// manipulate protected state
} finally {
lock.unlock();
}
} else {
// perform alternative actions
}
@Autowired
private RedisLockRegistry redisLockRegistry;
Lock lock = redisLockRegistry.obtain(key);
boolean locked = false;
try {
locked = lock.tryLock();
if (!locked) {
// 没有获取到锁的逻辑
}
// 获取锁的逻辑
} finally {
if (locked) {
lock.unlock();
}
}
如果没有上锁,上锁后返回 true 状态。如果已经上锁阻塞等待10秒,然后再返回锁状态
public boolean isLock(String device) {
Lock lock = redisLockRegistry.obtain(device);
boolean status = false;
try {
status = lock.tryLock(10, TimeUnit.SECONDS);
} catch (Exception e) {
log.info(e.getMessage());
}
log.warn("status: {} <<<<<<<<<<", status);
return status;
}
方法二,通过构造方法使用 RedisLockRegistry
@Service
public class DistributedLockService {
private final RedisLockRegistry redisLockRegistry;
public DistributedLockService(RedisLockRegistry redisLockRegistry) {
this.redisLockRegistry = redisLockRegistry;
}
public void processWithLock(String lockKey) {
// 获取锁对象
Lock lock = redisLockRegistry.obtain(lockKey);
try {
// 尝试获取锁,最多等待10秒,锁持有30秒
if (lock.tryLock(10, 30, TimeUnit.SECONDS)) {
try {
// 成功获取锁,执行业务逻辑
System.out.println("获得锁,执行业务逻辑");
// 模拟业务处理
Thread.sleep(5000);
} finally {
// 释放锁
lock.unlock();
System.out.println("释放锁");
}
} else {
System.out.println("获取锁失败");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println("线程被中断");
}
}
}
@Service
public class LockExamples {
private final RedisLockRegistry redisLockRegistry;
public LockExamples(RedisLockRegistry redisLockRegistry) {
this.redisLockRegistry = redisLockRegistry;
}
// 方式1:立即尝试获取锁(不等待)
public boolean tryLockImmediately(String key) {
Lock lock = redisLockRegistry.obtain(key);
return lock.tryLock();
}
// 方式2:带超时的尝试获取锁
public boolean tryLockWithTimeout(String key) throws InterruptedException {
Lock lock = redisLockRegistry.obtain(key);
return lock.tryLock(5, TimeUnit.SECONDS); // 等待5秒
}
// 方式3:带超时和租期的获取锁
public boolean tryLockWithLease(String key) throws InterruptedException {
Lock lock = redisLockRegistry.obtain(key);
return lock.tryLock(10, 30, TimeUnit.SECONDS); // 等待10秒,租期30秒
}
// 方式4:阻塞获取锁(不推荐,可能导致死锁)
public void lockBlockingly(String key) {
Lock lock = redisLockRegistry.obtain(key);
lock.lock(); // 一直等待直到获取锁
try {
// 业务逻辑
} finally {
lock.unlock();
}
}
}
@Component
public class ScheduledTask {
private final RedisLockRegistry redisLockRegistry;
public ScheduledTask(RedisLockRegistry redisLockRegistry) {
this.redisLockRegistry = redisLockRegistry;
}
@Scheduled(cron = "0 */5 * * * ?") // 每5分钟执行一次
public void executeScheduledTask() {
String lockKey = "scheduled-task-lock";
Lock lock = redisLockRegistry.obtain(lockKey);
try {
if (lock.tryLock(0, 10, TimeUnit.MINUTES)) { // 不等待,锁10分钟
try {
// 确保只有一个实例执行定时任务
System.out.println("开始执行定时任务...");
// 任务逻辑
} finally {
lock.unlock();
}
} else {
System.out.println("其他实例正在执行定时任务");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
@RestController
@RequestMapping("/api")
public class LockController {
private final RedisLockRegistry redisLockRegistry;
public LockController(RedisLockRegistry redisLockRegistry) {
this.redisLockRegistry = redisLockRegistry;
}
@PostMapping("/process/{resourceId}")
public ResponseEntity<String> processResource(@PathVariable String resourceId) {
String lockKey = "resource-lock:" + resourceId;
Lock lock = redisLockRegistry.obtain(lockKey);
try {
if (lock.tryLock(3, 15, TimeUnit.SECONDS)) {
try {
// 处理资源
return ResponseEntity.ok("处理成功: " + resourceId);
} finally {
lock.unlock();
}
} else {
return ResponseEntity.status(HttpStatus.CONFLICT)
.body("资源正在被处理,请稍后重试");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
.body("处理中断");
}
}
}
@Component
public class DistributedLockTemplate {
private final RedisLockRegistry redisLockRegistry;
public DistributedLockTemplate(RedisLockRegistry redisLockRegistry) {
this.redisLockRegistry = redisLockRegistry;
}
public <T> T executeWithLock(String lockKey, Supplier<T> supplier) {
return executeWithLock(lockKey, 5, 30, supplier);
}
public <T> T executeWithLock(String lockKey, long waitTime, long leaseTime, Supplier<T> supplier) {
Lock lock = redisLockRegistry.obtain(lockKey);
try {
if (lock.tryLock(waitTime, leaseTime, TimeUnit.SECONDS)) {
try {
return supplier.get();
} finally {
lock.unlock();
}
} else {
throw new RuntimeException("获取分布式锁失败: " + lockKey);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("获取锁时被中断", e);
}
}
// 无返回值的版本
public void executeWithLock(String lockKey, Runnable runnable) {
executeWithLock(lockKey, () -> {
runnable.run();
return null;
});
}
}
// 使用示例
@Service
public class BusinessService {
private final DistributedLockTemplate lockTemplate;
public BusinessService(DistributedLockTemplate lockTemplate) {
this.lockTemplate = lockTemplate;
}
public void updateInventory(String productId, int quantity) {
lockTemplate.executeWithLock("inventory:" + productId, () -> {
// 更新库存逻辑
System.out.println("更新产品 " + productId + " 的库存: " + quantity);
return null;
});
}
}
package cn.netkiller.elastolink.controller;
import cn.netkiller.elastolink.domain.device.Settings;
import cn.netkiller.elastolink.repository.SettingsRepository;
import cn.netkiller.elastolink.service.MenuService;
import cn.netkiller.elastolink.utils.responses.JsonResponse;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Marker;
import org.slf4j.MarkerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.http.MediaType;
import org.springframework.integration.redis.util.RedisLockRegistry;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
@RestController
@Slf4j
@RequestMapping("/menu/{appId}/{device}")
public class MenuController {
private final RedisLockRegistry redisLockRegistry;
private final RedisConnectionFactory redisConnectionFactory;
private final StringRedisTemplate stringRedisTemplate;
@Autowired
private MenuService menuService;
@Autowired
private SettingsRepository settingsRepository;
public MenuController(RedisConnectionFactory redisConnectionFactory, StringRedisTemplate stringRedisTemplate) {
this.redisConnectionFactory = redisConnectionFactory;
this.stringRedisTemplate = stringRedisTemplate;
redisLockRegistry = new RedisLockRegistry(
this.redisConnectionFactory,
"menu",
1000 * 60 * 5 // 过期时间(毫秒)
);
}
@GetMapping("item")
public JsonResponse item(@PathVariable String device) {
Optional<Settings> optional = settingsRepository.findByDevice(device).filter(settings -> Boolean.TRUE.equals(settings.getMenu()));
if (optional.isPresent()) {
return JsonResponse.data(menuService.item());
}
return JsonResponse.builder().status(false).code(JsonResponse.Code.AccessDenied.name()).data(null).reason("无权限").build();
}
@GetMapping("content")
public JsonResponse content(
@PathVariable String appId, @PathVariable String device,
@RequestParam("uuid") String uuid, @RequestParam("taskId") String taskId) {
Optional<String> data = menuService.content(uuid, device, taskId);
return JsonResponse.data(data);
}
@PostMapping("agent")
public JsonResponse agent(
@PathVariable String appId, @PathVariable String device,
@RequestParam("uuid") String uuid, @RequestParam("taskId") String taskId,
@RequestParam("content") String content) {
Optional<String> optional = menuService.content(uuid, device, taskId, content);
log.debug(optional.toString());
if (optional.isPresent()) {
return JsonResponse.data(optional.get());
} else {
return JsonResponse.builder().status(true).code(JsonResponse.Code.PENDING.name()).reason("服务器已接受请求").build();
}
}
@PutMapping(path = "content", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> contentStream(
@PathVariable String appId, @PathVariable String device,
@RequestParam("uuid") String uuid, @RequestParam("taskId") String taskId,
@RequestParam("content") String content) {
final Marker marker = MarkerFactory.getMarker(taskId);
Lock lock = redisLockRegistry.obtain(taskId);
Flux<String> flux = Flux.empty();
try {
Long expire = stringRedisTemplate.getExpire("menu:".concat(taskId), TimeUnit.SECONDS);
log.debug("lock expire={}", expire);
boolean status = lock.tryLock(0, TimeUnit.SECONDS);
log.debug(marker, "agent lock {} status: {}", redisLockRegistry, status);
if (status) {
flux = menuService.stream(uuid, device, taskId, content);
flux.log();
flux.doOnComplete(() -> {
System.out.println("✅ SSE Flux 正常完成!");
})
// ✅ 【异常完成】发生错误时触发
.doOnError(ex -> {
System.out.println("❌ SSE Flux 异常结束:" + ex.getMessage());
})
// ✅ 【最终完成】无论正常/异常/客户端断开,都会进这里!⭐⭐⭐
.doFinally(signalType -> {
System.out.println("🔚 SSE 流最终关闭:" + signalType);
lock.unlock();
log.debug(marker, "agent unlock {}", taskId);
// 你可以在这里:释放锁、关闭资源、记录日志
});
return flux;
} else {
return Flux.just("请求频率太快");
}
} catch (Exception e) {
log.error(marker, "agent Exception {}", e.getMessage());
}
return flux;
}
}
注意细节,定义前缀是 menu 但是读取 key 是 "menu:"
redisLockRegistry = new RedisLockRegistry(
this.redisConnectionFactory,
"menu",
1000 * 60 * 5 // 过期时间(毫秒)
);
Lock lock = redisLockRegistry.obtain(taskId);
Long expire = stringRedisTemplate.getExpire("menu:".concat(taskId), TimeUnit.SECONDS);