Home | 简体中文 | 繁体中文 | 杂文 | Github | 知乎专栏 | Facebook | Linkedin | Youtube | 打赏(Donations) | About
知乎专栏

第 83 章 Spring Integration

目录

83.1. Spring Redis Lock
83.1.1. Maven 依赖
83.1.2. 配置锁
83.1.3. 使用方法
83.1.4. Service 中使用方法
83.1.5. 在定时任务中使用
83.1.6. 在Controller中使用
83.1.7. 使用模板方法模式封装
83.1.8. 读取锁过期时间
83.2. MQTT Support
83.2.1. 入站消息通道适配器
83.2.2. 出站通道适配器
83.2.3. @MessagingGateway 定义消息网管接口
83.2.4. 手动 ACK 应答
83.2.5. Spring boot with Mqtt v5
	
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-integration</artifactId>
        </dependency>	
	
	

83.1. Spring Redis Lock

83.1.1. Maven 依赖

			
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-redis</artifactId>
        </dependency>			
			
			

83.1.2. 配置锁

			
package cn.netkiller.config;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.integration.redis.util.RedisLockRegistry;

@Configuration
public class RedisLockRegistryConfiguration {
    @Bean
    public RedisLockRegistry redisLockRegistry(RedisConnectionFactory redisConnectionFactory) {
        return new RedisLockRegistry(redisConnectionFactory, "netkiller-lock");
    }
}
			
			
			

配置默认超时时间

			
  @Bean(destroyMethod = "destroy")
  public RedisLockRegistry redisLockRegistry(RedisConnectionFactory redisConnectionFactory) {
    return new RedisLockRegistry(redisConnectionFactory, "neo-lock",
        TimeUnit.MINUTES.toMillis(10));
  }			
			
			

83.1.3. 使用方法

通过 Autowired 注解使用 RedisLockRegistry

			
@Autowired
private RedisLockRegistry redisLockRegistry;
			
		Lock lock = redisLockRegistry.obtain(device);
        if (lock.tryLock()) {
            try {
                // manipulate protected state
            } finally {
                lock.unlock();
            }
        } else {
            // perform alternative actions
        }			
			
			
			
@Autowired
private RedisLockRegistry redisLockRegistry;

Lock lock = redisLockRegistry.obtain(key);
boolean locked = false;
try {
  locked = lock.tryLock();
  if (!locked) {
    // 没有获取到锁的逻辑    
  }

  // 获取锁的逻辑
} finally {
  if (locked) {
    lock.unlock();
  }
}			
			
			

如果没有上锁,上锁后返回 true 状态。如果已经上锁阻塞等待10秒,然后再返回锁状态

			
    public boolean isLock(String device) {

        Lock lock = redisLockRegistry.obtain(device);
        boolean status = false;
        try {
            status = lock.tryLock(10, TimeUnit.SECONDS);

        } catch (Exception e) {
            log.info(e.getMessage());
        }
        log.warn("status: {} <<<<<<<<<<", status);
        return status;
    }			
			
			

方法二,通过构造方法使用 RedisLockRegistry

			
@Service
public class DistributedLockService {
    
    private final RedisLockRegistry redisLockRegistry;
    
    public DistributedLockService(RedisLockRegistry redisLockRegistry) {
        this.redisLockRegistry = redisLockRegistry;
    }
    
    public void processWithLock(String lockKey) {
        // 获取锁对象
        Lock lock = redisLockRegistry.obtain(lockKey);
        
        try {
            // 尝试获取锁,最多等待10秒,锁持有30秒
            if (lock.tryLock(10, 30, TimeUnit.SECONDS)) {
                try {
                    // 成功获取锁,执行业务逻辑
                    System.out.println("获得锁,执行业务逻辑");
                    // 模拟业务处理
                    Thread.sleep(5000);
                } finally {
                    // 释放锁
                    lock.unlock();
                    System.out.println("释放锁");
                }
            } else {
                System.out.println("获取锁失败");
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            System.out.println("线程被中断");
        }
    }
}			
			
			

83.1.4. Service 中使用方法

			
@Service
public class LockExamples {
    
    private final RedisLockRegistry redisLockRegistry;
    
    public LockExamples(RedisLockRegistry redisLockRegistry) {
        this.redisLockRegistry = redisLockRegistry;
    }
    
    // 方式1:立即尝试获取锁(不等待)
    public boolean tryLockImmediately(String key) {
        Lock lock = redisLockRegistry.obtain(key);
        return lock.tryLock();
    }
    
    // 方式2:带超时的尝试获取锁
    public boolean tryLockWithTimeout(String key) throws InterruptedException {
        Lock lock = redisLockRegistry.obtain(key);
        return lock.tryLock(5, TimeUnit.SECONDS); // 等待5秒
    }
    
    // 方式3:带超时和租期的获取锁
    public boolean tryLockWithLease(String key) throws InterruptedException {
        Lock lock = redisLockRegistry.obtain(key);
        return lock.tryLock(10, 30, TimeUnit.SECONDS); // 等待10秒,租期30秒
    }
    
    // 方式4:阻塞获取锁(不推荐,可能导致死锁)
    public void lockBlockingly(String key) {
        Lock lock = redisLockRegistry.obtain(key);
        lock.lock(); // 一直等待直到获取锁
        try {
            // 业务逻辑
        } finally {
            lock.unlock();
        }
    }
}			
			
			

83.1.5. 在定时任务中使用

			
@Component
public class ScheduledTask {
    
    private final RedisLockRegistry redisLockRegistry;
    
    public ScheduledTask(RedisLockRegistry redisLockRegistry) {
        this.redisLockRegistry = redisLockRegistry;
    }
    
    @Scheduled(cron = "0 */5 * * * ?") // 每5分钟执行一次
    public void executeScheduledTask() {
        String lockKey = "scheduled-task-lock";
        Lock lock = redisLockRegistry.obtain(lockKey);
        
        try {
            if (lock.tryLock(0, 10, TimeUnit.MINUTES)) { // 不等待,锁10分钟
                try {
                    // 确保只有一个实例执行定时任务
                    System.out.println("开始执行定时任务...");
                    // 任务逻辑
                } finally {
                    lock.unlock();
                }
            } else {
                System.out.println("其他实例正在执行定时任务");
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}			
			
			

83.1.6. 在Controller中使用

			
@RestController
@RequestMapping("/api")
public class LockController {
    
    private final RedisLockRegistry redisLockRegistry;
    
    public LockController(RedisLockRegistry redisLockRegistry) {
        this.redisLockRegistry = redisLockRegistry;
    }
    
    @PostMapping("/process/{resourceId}")
    public ResponseEntity<String> processResource(@PathVariable String resourceId) {
        String lockKey = "resource-lock:" + resourceId;
        Lock lock = redisLockRegistry.obtain(lockKey);
        
        try {
            if (lock.tryLock(3, 15, TimeUnit.SECONDS)) {
                try {
                    // 处理资源
                    return ResponseEntity.ok("处理成功: " + resourceId);
                } finally {
                    lock.unlock();
                }
            } else {
                return ResponseEntity.status(HttpStatus.CONFLICT)
                    .body("资源正在被处理,请稍后重试");
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return ResponseEntity.status(HttpStatus.INTERNAL_SERVER_ERROR)
                .body("处理中断");
        }
    }
}			
			
			

83.1.7. 使用模板方法模式封装

			
@Component
public class DistributedLockTemplate {
    
    private final RedisLockRegistry redisLockRegistry;
    
    public DistributedLockTemplate(RedisLockRegistry redisLockRegistry) {
        this.redisLockRegistry = redisLockRegistry;
    }
    
    public <T> T executeWithLock(String lockKey, Supplier<T> supplier) {
        return executeWithLock(lockKey, 5, 30, supplier);
    }
    
    public <T> T executeWithLock(String lockKey, long waitTime, long leaseTime, Supplier<T> supplier) {
        Lock lock = redisLockRegistry.obtain(lockKey);
        
        try {
            if (lock.tryLock(waitTime, leaseTime, TimeUnit.SECONDS)) {
                try {
                    return supplier.get();
                } finally {
                    lock.unlock();
                }
            } else {
                throw new RuntimeException("获取分布式锁失败: " + lockKey);
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException("获取锁时被中断", e);
        }
    }
    
    // 无返回值的版本
    public void executeWithLock(String lockKey, Runnable runnable) {
        executeWithLock(lockKey, () -> {
            runnable.run();
            return null;
        });
    }
}

// 使用示例
@Service
public class BusinessService {
    
    private final DistributedLockTemplate lockTemplate;
    
    public BusinessService(DistributedLockTemplate lockTemplate) {
        this.lockTemplate = lockTemplate;
    }
    
    public void updateInventory(String productId, int quantity) {
        lockTemplate.executeWithLock("inventory:" + productId, () -> {
            // 更新库存逻辑
            System.out.println("更新产品 " + productId + " 的库存: " + quantity);
            return null;
        });
    }
}			
			
			

83.1.8. 读取锁过期时间

			
package cn.netkiller.elastolink.controller;

import cn.netkiller.elastolink.domain.device.Settings;
import cn.netkiller.elastolink.repository.SettingsRepository;
import cn.netkiller.elastolink.service.MenuService;
import cn.netkiller.elastolink.utils.responses.JsonResponse;
import lombok.extern.slf4j.Slf4j;
import org.slf4j.Marker;
import org.slf4j.MarkerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.http.MediaType;
import org.springframework.integration.redis.util.RedisLockRegistry;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;

import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;

@RestController
@Slf4j
@RequestMapping("/menu/{appId}/{device}")
public class MenuController {


    private final RedisLockRegistry redisLockRegistry;
    private final RedisConnectionFactory redisConnectionFactory;
    private final StringRedisTemplate stringRedisTemplate;
    @Autowired
    private MenuService menuService;

    @Autowired
    private SettingsRepository settingsRepository;

    public MenuController(RedisConnectionFactory redisConnectionFactory, StringRedisTemplate stringRedisTemplate) {
        this.redisConnectionFactory = redisConnectionFactory;
        this.stringRedisTemplate = stringRedisTemplate;
        redisLockRegistry = new RedisLockRegistry(
                this.redisConnectionFactory,
                "menu",
                1000 * 60 * 5 // 过期时间(毫秒)
        );
    }

    @GetMapping("item")
    public JsonResponse item(@PathVariable String device) {
        Optional<Settings> optional = settingsRepository.findByDevice(device).filter(settings -> Boolean.TRUE.equals(settings.getMenu()));
        if (optional.isPresent()) {
            return JsonResponse.data(menuService.item());
        }
        return JsonResponse.builder().status(false).code(JsonResponse.Code.AccessDenied.name()).data(null).reason("无权限").build();
    }


    @GetMapping("content")
    public JsonResponse content(
            @PathVariable String appId, @PathVariable String device,
            @RequestParam("uuid") String uuid, @RequestParam("taskId") String taskId) {
        Optional<String> data = menuService.content(uuid, device, taskId);
        return JsonResponse.data(data);
    }

    @PostMapping("agent")
    public JsonResponse agent(
            @PathVariable String appId, @PathVariable String device,
            @RequestParam("uuid") String uuid, @RequestParam("taskId") String taskId,
            @RequestParam("content") String content) {
        Optional<String> optional = menuService.content(uuid, device, taskId, content);
        log.debug(optional.toString());
        if (optional.isPresent()) {
            return JsonResponse.data(optional.get());
        } else {
            return JsonResponse.builder().status(true).code(JsonResponse.Code.PENDING.name()).reason("服务器已接受请求").build();
        }
    }

    @PutMapping(path = "content", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<String> contentStream(
            @PathVariable String appId, @PathVariable String device,
            @RequestParam("uuid") String uuid, @RequestParam("taskId") String taskId,
            @RequestParam("content") String content) {
        final Marker marker = MarkerFactory.getMarker(taskId);
        Lock lock = redisLockRegistry.obtain(taskId);
        Flux<String> flux = Flux.empty();
        try {
            Long expire = stringRedisTemplate.getExpire("menu:".concat(taskId), TimeUnit.SECONDS);
            log.debug("lock expire={}", expire);
            boolean status = lock.tryLock(0, TimeUnit.SECONDS);
            log.debug(marker, "agent lock {} status: {}", redisLockRegistry, status);
            if (status) {

                flux = menuService.stream(uuid, device, taskId, content);
                flux.log();
                flux.doOnComplete(() -> {
                            System.out.println("✅ SSE Flux 正常完成!");
                        })

                        // ✅ 【异常完成】发生错误时触发
                        .doOnError(ex -> {
                            System.out.println("❌ SSE Flux 异常结束:" + ex.getMessage());
                        })

                        // ✅ 【最终完成】无论正常/异常/客户端断开,都会进这里!⭐⭐⭐
                        .doFinally(signalType -> {
                            System.out.println("🔚 SSE 流最终关闭:" + signalType);
                            lock.unlock();
                            log.debug(marker, "agent unlock {}", taskId);
                            // 你可以在这里:释放锁、关闭资源、记录日志
                        });
                return flux;
            } else {
                return Flux.just("请求频率太快");
            }
        } catch (Exception e) {
            log.error(marker, "agent Exception {}", e.getMessage());
        }
        return flux;
    }
}
			
			
			

注意细节,定义前缀是 menu 但是读取 key 是 "menu:"

			
redisLockRegistry = new RedisLockRegistry(
                this.redisConnectionFactory,
                "menu",
                1000 * 60 * 5 // 过期时间(毫秒)
        );

Lock lock = redisLockRegistry.obtain(taskId);

Long expire = stringRedisTemplate.getExpire("menu:".concat(taskId), TimeUnit.SECONDS);