MIT licensed Build Status

| Feature | Bull | Agenda | |:---------------------------|:---------------:|:------:| | Backend | redis | mongo | | Priorities | ✓ | ✓ | | Concurrency | ✓ | ✓ | | Delayed jobs | ✓ | ✓ | | Global events | ✓ | ✓ | | Rate Limiter | ✓ | | | Pause/Resume | ✓ | ✓ | | Sandboxed worker | ✓ | ✓ | | Repeatable jobs | ✓ | ✓ | | Atomic ops | ✓ | ~ | | Persistence | ✓ | ✓ | | UI | ✓ | ✓ | | REST API | | ✓ | | Central (Scalable) Queue | | ✓ | | Supports long running jobs | | ✓ | | Optimized for | Jobs / Messages | Jobs | | Gracefuly stop | | | | Remote stop | | | | Multiple worker | | |

流程

  1. 发布任务
    1. 幂等

worker状态

  1. 执行中 worker占用任务成功后立即进入执行中状态, 并设置下次刷新时间 如果距离上次刷新时间超过workertimeoutms, 那么认为是超时状态
  2. 失败 worker主动设置执行状态为fail. 如果任务失败时worker不在列表中, 状态设置会失败
  3. 成功 worker主动设置执行状态为success. 如果任务成功时worker不在列表中, 状态设置会失败
  4. 超时 根据workertimeoutms推导得到的状态

任务状态

任务状态由当前worker列表决定

  1. 未启动 列表为空
  2. 执行中 列表中存在执行中worker
  3. 成功 列表中只有success的worker
  4. 失败 列表中只有fail的worker

提供功能

  1. 发布任务(幂等) 查找key对应的没有运行中的任务 没有找到=>setOnInsert 找到了=>set nothing, 按配置决定是否清除失败/成功的worker
  2. 占用任务(多worker) 查找条件: 判断是否被指定workerid, 任务状态不是成功, 任务没有被当前worker处理过(不能在worker列表中), 可以接受更多的worker 排序条件: priority最高, 优先被指定workerid的 没有找到=>不做操作 找到了=>增加自己的worker对象, 并且过滤worker列表(清除超时的运行中worker)
  3. 维持任务 查找条件: key相同, worker id相同, 找到了=> 更新对应的超时时间 没找到=> 结束当前任务
  4. 任务执行成功 尝试更新任务状态为成功
  5. 任务执行失败 主动返回任务失败, 尝试更新任务状态为失败
  6. 任务执行异常 按option中设置重试, 并更新重试次数, 如果重试次数超过限制那么更新为任务失败. 重试过程只在本地发生, 所有重试次数失败后更新为失败状态

配置更新后的影响

  1. specificworkerids变动 如果更新后不允许当前worker执行, 那么结束任务
  2. pingintervalms变动 下次ping时生效
  3. 其他参数变动 不影响正在执行中的任务, 变动需要在重新占用任务时体现

具体实现

发布任务

直接发送就行

消费任务

核心问题: 下一次什么时候去占用任务 1. 启动时计算下一次时间nexttrytime 2. 使用changestream实时更新nexttry_time

TODO

-[ ] cleansuccess 暂不实现, maintenance -[ ] cleanfailed 暂不实现, maintenance