
| Feature | Bull | Agenda |
|:---------------------------|:---------------:|:------:|
| Backend | redis | mongo |
| Priorities | ✓ | ✓ |
| Concurrency | ✓ | ✓ |
| Delayed jobs | ✓ | ✓ |
| Global events | ✓ | ✓ |
| Rate Limiter | ✓ | |
| Pause/Resume | ✓ | ✓ |
| Sandboxed worker | ✓ | ✓ |
| Repeatable jobs | ✓ | ✓ |
| Atomic ops | ✓ | ~ |
| Persistence | ✓ | ✓ |
| UI | ✓ | ✓ |
| REST API | | ✓ |
| Central (Scalable) Queue | | ✓ |
| Supports long running jobs | | ✓ |
| Optimized for | Jobs / Messages | Jobs |
| Gracefuly stop | | |
| Remote stop | | |
| Multiple worker | | |
流程
- 发布任务
- 幂等
worker状态
- 执行中
worker占用任务成功后立即进入执行中状态, 并设置下次刷新时间
如果距离上次刷新时间超过workertimeoutms, 那么认为是超时状态
- 失败
worker主动设置执行状态为fail. 如果任务失败时worker不在列表中, 状态设置会失败
- 成功
worker主动设置执行状态为success. 如果任务成功时worker不在列表中, 状态设置会失败
- 超时
根据workertimeoutms推导得到的状态
任务状态
任务状态由当前worker列表决定
- 未启动
列表为空
- 执行中
列表中存在执行中worker
- 成功
列表中只有success的worker
- 失败
列表中只有fail的worker
提供功能
- 发布任务(幂等)
查找key对应的没有运行中的任务
没有找到=>setOnInsert
找到了=>set nothing, 按配置决定是否清除失败/成功的worker
- 占用任务(多worker)
查找条件: 判断是否被指定workerid, 任务状态不是成功, 任务没有被当前worker处理过(不能在worker列表中), 可以接受更多的worker
排序条件: priority最高, 优先被指定workerid的
没有找到=>不做操作
找到了=>增加自己的worker对象, 并且过滤worker列表(清除超时的运行中worker)
- 维持任务
查找条件: key相同, worker id相同,
找到了=> 更新对应的超时时间
没找到=> 结束当前任务
- 任务执行成功
尝试更新任务状态为成功
- 任务执行失败
主动返回任务失败, 尝试更新任务状态为失败
- 任务执行异常
按option中设置重试, 并更新重试次数, 如果重试次数超过限制那么更新为任务失败. 重试过程只在本地发生, 所有重试次数失败后更新为失败状态
配置更新后的影响
- specificworkerids变动
如果更新后不允许当前worker执行, 那么结束任务
- pingintervalms变动
下次ping时生效
- 其他参数变动
不影响正在执行中的任务, 变动需要在重新占用任务时体现
具体实现
发布任务
直接发送就行
消费任务
核心问题: 下一次什么时候去占用任务
1. 启动时计算下一次时间nexttrytime
2. 使用changestream实时更新nexttry_time
TODO
-[ ] cleansuccess 暂不实现, maintenance
-[ ] cleanfailed 暂不实现, maintenance