11 - 会话与任务管理

一、定位

AgentTaskManager 是 Agent 的中介者,提供:

  • 跨实例的任务注册
  • 任务停止(支持 Redis Pub/Sub 跨实例广播)
  • TTL 自动过期
  • 并发控制(同一会话不能并发执行)

二、核心设计:Redis Pub/Sub 跨实例协调

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
Instance A                    Instance B
│ │
├─ registerTask (本地 Map) ──→│
│ │
│ 业务执行中... │
│ │
├─ 收到 stop 请求 ───────────→│
│ 1. 检查本地 Map │
│ 2. 没有 → 检查 Redis │
│ 3. Redis 有 → Pub/Sub 广播 │
│ │
│ ←─── 广播 ───┤
│ │
│ handleRemoteStop()
│ - 从本地 Map 移除
│ - 中断 Disposable
│ - 发送停止消息
│ - 删除 Redis key

三、任务生命周期

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
HTTP /agent/chat/stream

1. AgentController.webSearchStream()

2. checkRunningTask() ──→ taskManager.hasRunningTask(conversationId)

3. 构造 Sinks.Many<String> sink

4. registerTask(conversationId, sink, "websearch")
├─→ 本地 taskMap.put(conversationId, TaskInfo)
└─→ Redis: setIfAbsent("agent:task:xxx", instanceId, TTL=30min)

5. Agent 执行(ReAct 循环)

6. doFinally / doOnCancel

7. taskManager.stopTask(conversationId)
├─→ 本地 taskMap.remove(conversationId)
└─→ Redis: bucket.delete()

四、TaskInfo 任务信息

1
2
3
4
5
6
7
8
9
10
11
12
public static class TaskInfo {
private final Sinks.Many<String> sink; // 流式输出 sink
@Setter private Disposable disposable; // Reactor 订阅(可中断)
private final long createTime;
private final String agentType; // websearch/file/pptx/plan-execute/skills

public TaskInfo(Sinks.Many<String> sink, String agentType) {
this.sink = sink;
this.agentType = agentType;
this.createTime = System.currentTimeMillis();
}
}

五、核心方法

5.1 registerTask(注册任务)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
public TaskInfo registerTask(String conversationId, Sinks.Many<String> sink, String agentType) {
// 1. 本地快速检查
TaskInfo existing = taskMap.get(conversationId);
if (existing != null) {
log.warn("会话 {} 本地已有任务在执行,拒绝注册新任务", conversationId);
return null;
}

// 2. Redis 分布式锁
RBucket<String> bucket = getTaskBucket(conversationId);
boolean acquired = bucket.setIfAbsent(instanceId, Duration.ofMinutes(TASK_TTL_MINUTES));
if (!acquired) {
String holder = bucket.get();
log.warn("会话 {} 已在实例 {} 上执行", conversationId, holder, instanceId);
return null;
}

// 3. 本地 Map 注册
TaskInfo taskInfo = new TaskInfo(sink, agentType);
taskMap.put(conversationId, taskInfo);
return taskInfo;
}

Redis Key 格式agent:task:{conversationId} → 值为持有者 instanceId

TTL:30 分钟(可配置)

5.2 stopTask(停止任务)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public boolean stopTask(String conversationId) {
// 1. 本地快速路径
TaskInfo localTask = taskMap.get(conversationId);
if (localTask != null) {
doStopTask(conversationId, localTask);
return true;
}

// 2. Redis 检查
RBucket<String> bucket = getTaskBucket(conversationId);
if (!bucket.isExists()) {
return false; // 任务不存在
}

// 3. 持有者是本实例
String holder = bucket.get();
if (instanceId.equals(holder)) {
return false; // 已经在处理
}

// 4. Pub/Sub 广播
long receivers = stopTopic.publish(conversationId);
return true;
}

5.3 doStopTask(执行停止)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
private void doStopTask(String conversationId, TaskInfo taskInfo) {
try {
// 1. 中断底层 Reactor 订阅
Disposable disposable = taskInfo.getDisposable();
if (disposable != null && !disposable.isDisposed()) {
disposable.dispose();
}

// 2. 发送停止消息
Sinks.Many<String> sink = taskInfo.getSink();
if (sink != null) {
try {
sink.tryEmitNext(createStopMessage());
sink.tryEmitComplete();
} catch (Exception e) {
log.warn("发送停止消息失败", e);
}
}
} finally {
doRemoveTask(conversationId); // 清理
}
}

停止消息格式

1
{"type":"text","content":"⏹ 用户已停止生成\n"}

5.4 handleRemoteStop(Pub/Sub 回调)

1
2
3
4
5
6
7
8
private void handleRemoteStop(String conversationId) {
TaskInfo taskInfo = taskMap.remove(conversationId);
if (taskInfo == null) {
return;
}
log.info("远程停止任务: conversationId={}, instanceId={}", conversationId, instanceId);
doStopTask(conversationId, taskInfo);
}

5.5 TTL 刷新

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@PostConstruct
public void afterPropertiesSet() {
listenerId = stopTopic.addListener(String.class, (channel, conversationId) -> {
handleRemoteStop(conversationId);
});

ttlRefreshScheduler.scheduleAtFixedRate(
this::refreshTaskTtls,
TTL_REFRESH_INTERVAL_MINUTES,
TTL_REFRESH_INTERVAL_MINUTES,
TimeUnit.MINUTES
);
}

private void refreshTaskTtls() {
for (String conversationId : taskMap.keySet()) {
RBucket<String> bucket = getTaskBucket(conversationId);
String holder = bucket.get();
if (instanceId.equals(holder)) {
bucket.expire(Duration.ofMinutes(TASK_TTL_MINUTES)); // 续期
} else {
// 归属变化,清理本地
taskMap.remove(conversationId);
}
}
}

TTL 刷新间隔:5 分钟(任务默认 30 分钟过期,每 5 分钟续期一次)

六、Redis Key 设计

Key 模式类型TTL
agent:task:{conversationId}StringinstanceId30 min
Topic agent:stopPub/SubconversationId-

七、停止 API

1
GET /agent/stop?conversationId=xxx

响应

1
2
3
4
{
"success": true,
"message": "已停止执行"
}

底层调用

1
2
3
4
5
6
7
8
@GetMapping("/stop")
public Map<String, Object> stopAgent(@RequestParam String conversationId) {
boolean success = taskManager.stopTask(conversationId);
Map<String, Object> result = new HashMap<>();
result.put("success", success);
result.put("message", success ? "已停止执行" : "没有找到正在执行的任务或已停止");
return result;
}

八、并发控制

单实例内

  • 同一 conversationId 第二次请求会被 registerTask 拒绝(本地 Map 命中)

跨实例

  • 第二次请求会因 Redis setIfAbsent 失败而被拒绝
  • 错误信息:”该会话正在执行中,请稍后再试”

九、Agent 集成示例

每个 Agent 的 execute() 方法都遵循:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Override
public Flux<String> execute(String conversationId, String question) {
// 1. 检查并发
Flux<String> checkResult = checkRunningTask(conversationId);
if (checkResult != null) return checkResult;

Sinks.Many<String> sink = Sinks.many().unicast().onBackpressureBuffer();

// 2. 注册任务
AgentTaskManager.TaskInfo taskInfo = registerTask(conversationId, sink);
if (taskInfo == null && conversationId != null) {
return Flux.error(new IllegalStateException("该会话正在执行中"));
}

// 3. 业务执行
chatClient.prompt().messages(messages).stream().content()
.publishOn(Schedulers.boundedElastic())
.doOnNext(chunk -> sink.tryEmitNext(chunk))
.doOnError(err -> sink.tryEmitError(err))
.doFinally(signal -> {
// 4. 清理任务
if (taskManager != null) {
taskManager.stopTask(conversationId);
}
})
.subscribe();

// 5. 保存 disposable 用于停止
if (taskManager != null) {
taskManager.setDisposable(conversationId, disposable);
}

return sink.asFlux();
}

十、配置

1
2
3
4
5
6
7
spring:
data:
redis:
host: localhost
port: 6389
database: 0
timeout: 5s
1
2
3
4
private static final String TASK_KEY_PREFIX = "agent:task:";
private static final String STOP_TOPIC_NAME = "agent:stop";
private static final long TASK_TTL_MINUTES = 30;
private static final long TTL_REFRESH_INTERVAL_MINUTES = 5;

十一、生命周期回调

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Component
public class AgentTaskManager implements InitializingBean, DisposableBean {

@Override
public void afterPropertiesSet() {
// 订阅停止消息
listenerId = stopTopic.addListener(...);
// 启动 TTL 刷新
ttlRefreshScheduler.scheduleAtFixedRate(...);
}

@Override
public void destroy() {
// 移除监听器
stopTopic.removeListener(listenerId);
// 关闭定时器
ttlRefreshScheduler.shutdown();
// 清理所有任务
for (String conversationId : taskMap.keySet()) {
doRemoveTask(conversationId);
}
}
}

十二、扩展方向

  • 限流:按用户/IP 维度的 QPS 限流
  • 优先级队列:不同 agentType 设置不同优先级
  • 任务监控:暴露 /actuator/agent-tasks 端点查看活跃任务
  • 优雅停机:JVM shutdown 时先 stopTask 再 destroy