11 - 会话与任务管理
一、定位
AgentTaskManager 是 Agent 的中介者,提供:
- 跨实例的任务注册
- 任务停止(支持 Redis Pub/Sub 跨实例广播)
- TTL 自动过期
- 并发控制(同一会话不能并发执行)
二、核心设计:Redis Pub/Sub 跨实例协调
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| Instance A Instance B │ │ ├─ registerTask (本地 Map) ──→│ │ │ │ 业务执行中... │ │ │ ├─ 收到 stop 请求 ───────────→│ │ 1. 检查本地 Map │ │ 2. 没有 → 检查 Redis │ │ 3. Redis 有 → Pub/Sub 广播 │ │ │ │ ←─── 广播 ───┤ │ │ │ handleRemoteStop() │ - 从本地 Map 移除 │ - 中断 Disposable │ - 发送停止消息 │ - 删除 Redis key
|
三、任务生命周期
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| HTTP /agent/chat/stream ↓ 1. AgentController.webSearchStream() ↓ 2. checkRunningTask() ──→ taskManager.hasRunningTask(conversationId) ↓ 3. 构造 Sinks.Many<String> sink ↓ 4. registerTask(conversationId, sink, "websearch") ├─→ 本地 taskMap.put(conversationId, TaskInfo) └─→ Redis: setIfAbsent("agent:task:xxx", instanceId, TTL=30min) ↓ 5. Agent 执行(ReAct 循环) ↓ 6. doFinally / doOnCancel ↓ 7. taskManager.stopTask(conversationId) ├─→ 本地 taskMap.remove(conversationId) └─→ Redis: bucket.delete()
|
四、TaskInfo 任务信息
1 2 3 4 5 6 7 8 9 10 11 12
| public static class TaskInfo { private final Sinks.Many<String> sink; @Setter private Disposable disposable; private final long createTime; private final String agentType;
public TaskInfo(Sinks.Many<String> sink, String agentType) { this.sink = sink; this.agentType = agentType; this.createTime = System.currentTimeMillis(); } }
|
五、核心方法
5.1 registerTask(注册任务)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| public TaskInfo registerTask(String conversationId, Sinks.Many<String> sink, String agentType) { TaskInfo existing = taskMap.get(conversationId); if (existing != null) { log.warn("会话 {} 本地已有任务在执行,拒绝注册新任务", conversationId); return null; }
RBucket<String> bucket = getTaskBucket(conversationId); boolean acquired = bucket.setIfAbsent(instanceId, Duration.ofMinutes(TASK_TTL_MINUTES)); if (!acquired) { String holder = bucket.get(); log.warn("会话 {} 已在实例 {} 上执行", conversationId, holder, instanceId); return null; }
TaskInfo taskInfo = new TaskInfo(sink, agentType); taskMap.put(conversationId, taskInfo); return taskInfo; }
|
Redis Key 格式:agent:task:{conversationId} → 值为持有者 instanceId
TTL:30 分钟(可配置)
5.2 stopTask(停止任务)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| public boolean stopTask(String conversationId) { TaskInfo localTask = taskMap.get(conversationId); if (localTask != null) { doStopTask(conversationId, localTask); return true; }
RBucket<String> bucket = getTaskBucket(conversationId); if (!bucket.isExists()) { return false; }
String holder = bucket.get(); if (instanceId.equals(holder)) { return false; }
long receivers = stopTopic.publish(conversationId); return true; }
|
5.3 doStopTask(执行停止)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| private void doStopTask(String conversationId, TaskInfo taskInfo) { try { Disposable disposable = taskInfo.getDisposable(); if (disposable != null && !disposable.isDisposed()) { disposable.dispose(); }
Sinks.Many<String> sink = taskInfo.getSink(); if (sink != null) { try { sink.tryEmitNext(createStopMessage()); sink.tryEmitComplete(); } catch (Exception e) { log.warn("发送停止消息失败", e); } } } finally { doRemoveTask(conversationId); } }
|
停止消息格式:
1
| {"type":"text","content":"⏹ 用户已停止生成\n"}
|
5.4 handleRemoteStop(Pub/Sub 回调)
1 2 3 4 5 6 7 8
| private void handleRemoteStop(String conversationId) { TaskInfo taskInfo = taskMap.remove(conversationId); if (taskInfo == null) { return; } log.info("远程停止任务: conversationId={}, instanceId={}", conversationId, instanceId); doStopTask(conversationId, taskInfo); }
|
5.5 TTL 刷新
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| @PostConstruct public void afterPropertiesSet() { listenerId = stopTopic.addListener(String.class, (channel, conversationId) -> { handleRemoteStop(conversationId); });
ttlRefreshScheduler.scheduleAtFixedRate( this::refreshTaskTtls, TTL_REFRESH_INTERVAL_MINUTES, TTL_REFRESH_INTERVAL_MINUTES, TimeUnit.MINUTES ); }
private void refreshTaskTtls() { for (String conversationId : taskMap.keySet()) { RBucket<String> bucket = getTaskBucket(conversationId); String holder = bucket.get(); if (instanceId.equals(holder)) { bucket.expire(Duration.ofMinutes(TASK_TTL_MINUTES)); } else { taskMap.remove(conversationId); } } }
|
TTL 刷新间隔:5 分钟(任务默认 30 分钟过期,每 5 分钟续期一次)
六、Redis Key 设计
| Key 模式 | 类型 | 值 | TTL |
|---|
agent:task:{conversationId} | String | instanceId | 30 min |
Topic agent:stop | Pub/Sub | conversationId | - |
七、停止 API
1
| GET /agent/stop?conversationId=xxx
|
响应:
1 2 3 4
| { "success": true, "message": "已停止执行" }
|
底层调用:
1 2 3 4 5 6 7 8
| @GetMapping("/stop") public Map<String, Object> stopAgent(@RequestParam String conversationId) { boolean success = taskManager.stopTask(conversationId); Map<String, Object> result = new HashMap<>(); result.put("success", success); result.put("message", success ? "已停止执行" : "没有找到正在执行的任务或已停止"); return result; }
|
八、并发控制
单实例内:
- 同一
conversationId 第二次请求会被 registerTask 拒绝(本地 Map 命中)
跨实例:
- 第二次请求会因 Redis
setIfAbsent 失败而被拒绝 - 错误信息:”该会话正在执行中,请稍后再试”
九、Agent 集成示例
每个 Agent 的 execute() 方法都遵循:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| @Override public Flux<String> execute(String conversationId, String question) { Flux<String> checkResult = checkRunningTask(conversationId); if (checkResult != null) return checkResult;
Sinks.Many<String> sink = Sinks.many().unicast().onBackpressureBuffer();
AgentTaskManager.TaskInfo taskInfo = registerTask(conversationId, sink); if (taskInfo == null && conversationId != null) { return Flux.error(new IllegalStateException("该会话正在执行中")); }
chatClient.prompt().messages(messages).stream().content() .publishOn(Schedulers.boundedElastic()) .doOnNext(chunk -> sink.tryEmitNext(chunk)) .doOnError(err -> sink.tryEmitError(err)) .doFinally(signal -> { if (taskManager != null) { taskManager.stopTask(conversationId); } }) .subscribe();
if (taskManager != null) { taskManager.setDisposable(conversationId, disposable); }
return sink.asFlux(); }
|
十、配置
1 2 3 4 5 6 7
| spring: data: redis: host: localhost port: 6389 database: 0 timeout: 5s
|
1 2 3 4
| private static final String TASK_KEY_PREFIX = "agent:task:"; private static final String STOP_TOPIC_NAME = "agent:stop"; private static final long TASK_TTL_MINUTES = 30; private static final long TTL_REFRESH_INTERVAL_MINUTES = 5;
|
十一、生命周期回调
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| @Component public class AgentTaskManager implements InitializingBean, DisposableBean {
@Override public void afterPropertiesSet() { listenerId = stopTopic.addListener(...); ttlRefreshScheduler.scheduleAtFixedRate(...); }
@Override public void destroy() { stopTopic.removeListener(listenerId); ttlRefreshScheduler.shutdown(); for (String conversationId : taskMap.keySet()) { doRemoveTask(conversationId); } } }
|
十二、扩展方向
- 限流:按用户/IP 维度的 QPS 限流
- 优先级队列:不同 agentType 设置不同优先级
- 任务监控:暴露
/actuator/agent-tasks 端点查看活跃任务 - 优雅停机:JVM shutdown 时先 stopTask 再 destroy