项目简介
该项目是基于Go语言、Redis和MySQL构建的分布式异步任务框架。主要用于解决分布式环境下的任务调度、任务处理以及任务失败重试等问题。借助Redis作为消息队列,配合MySQL持久化任务消息,保证在Redis宕机等意外状况下任务消息不会丢失,同时支持任务的重试机制。
项目的主要特性和功能
- 支持分布式任务调度,可在分布式环境中创建任务,生产者将任务推送到Redis队列,消费者从队列获取并执行任务。
- 具备任务失败重试机制,任务执行失败后能自动或手动重试,确保任务最终完成。
- 提供自动与手动提交模式:自动提交模式适合简单任务处理场景,任务执行完成后自动提交状态;手动提交模式适合需要精确控制任务状态的场景,需手动调用
Commit
方法提交状态。 - 支持并发处理,可配置消费者并发数量,提高任务处理效率。
- 能自动检测并处理长时间未完成的僵尸任务,避免任务因意外情况长时间挂起。
- 采用MySQL持久化任务消息,保障在Redis宕机等意外时任务消息不丢失。
安装使用步骤
前提条件
- 已安装并配置好MySQL和Redis。
- 已安装Go语言环境。
- 已下载本项目的源码文件。
使用步骤
- 配置数据库和Redis连接:在代码里配置MySQL和Redis的连接信息,保证MySQL数据库有创建表的相应权限。
- 初始化任务表:在代码中设置
SetInitDB(true)
,系统会自动创建任务表;若不需要自动创建,可手动创建,表结构参考默认的t_common_job_[task_id]
。 - 配置任务:使用
task.MustNewTask
方法创建任务对象,配置任务ID、MySQL连接、Redis客户端、消费者函数等参数,按需选择自动或手动提交模式。 - 发送任务:使用
mytask.Push([]byte("hello"))
方法将任务推送到Redis队列。 - 消费者处理任务:消费者函数从Redis队列获取任务并执行。自动提交模式下,任务执行完成自动提交状态;手动提交模式下,需手动调用
Commit
方法提交状态。 - 处理僵尸任务:系统自动检测并处理超时的僵尸任务,防止任务长时间挂起。
示例代码
自动提交模式
```go mytask := task.MustNewTask(context.Background(), "my_task_0831", // taskID DB, // mysql conn Client, // redis client task.SetJobFunc(MyJob), // 配置消费者函数 task.SetParallelNum(10), // 配置消费者并发数量 task.SetMaxFailedTimes(3), // 自动模式下重试次数 task.SetHandleZombieJobsDstStatus(task.JobStatusTodo), // 处理超时job task.SetHandleZombieJobsPeriod(time.Second20), // 处理超时job轮训周期 task.SetHandleZombieJobsTimeout(time.Second20), // 设置job超时时间 task.SetHandleZombieJobsLimit(1000), // 抽离超时job的limit task.SetTableName("t_my_job_0831"), // 设置db table name task.SetListNamePrefix("l_my_job_0831"), // 设置redis list name的prefix task.SetInitDB(true), // 设置是否初始化DB )
// 发送消息 mytask.Push([]byte("hello")) ```
手动提交模式
```go mytask := task.MustNewTask(context.Background(), "my_task_0831", // taskID DB, // mysql conn Client, // redis client task.SetJobFunc(MyJob), // 配置消费者函数 task.SetParallelNum(10), // 配置消费者并发数量 task.SetMaxFailedTimes(3), // 自动模式下重试次数 task.SetHandleZombieJobsDstStatus(task.JobStatusTodo), // 处理超时job task.SetHandleZombieJobsPeriod(time.Second20), // 处理超时job轮训周期 task.SetHandleZombieJobsTimeout(time.Second20), // 设置job超时时间 task.SetHandleZombieJobsLimit(1000), // 抽离超时job的limit task.SetTableName("t_my_job_0831"), // 设置db table name task.SetListNamePrefix("l_my_job_0831"), // 设置redis list name的prefix task.SetInitDB(true), // 设置是否初始化DB task.SetAutoCommit(false), // 关闭自动commit )
// 发送消息 mytask.Push([]byte("hello"))
// 消费者函数 func MyJob(ctx context.Context, id, fts, cID int64, payload []byte) error { succ := false defer func() { if succ { mytask.Commit(id, cID, task.JobStatusDone) } else { if fts+1 >= 5 { mytask.Commit(id, cID, task.JobStatusFailed) } else { mytask.Commit(id, cID, task.JobStatusTodo) } } }()
fmt.Println("MyJob assigned job: ", id, string(payload))
// 处理任务逻辑
succ = true
return nil
} ```
注意事项
- 要确保MySQL和Redis服务正常运行。
- 在生产环境中,需使用安全的数据库和Redis连接信息。
- 根据业务需求配置任务重试次数和超时时间。
版权声明
本项目遵循MIT开源协议,使用时请遵守相关协议。
下载地址
点击下载 【提取码: 4003】【解压密码: www.makuang.net】