littlebot
Published on 2025-04-09 / 0 Visits
0

【源码】基于Redis和MySQL的分布式异步任务框架——Meepo

项目简介

该项目是基于Go语言、Redis和MySQL构建的分布式异步任务框架。主要用于解决分布式环境下的任务调度、任务处理以及任务失败重试等问题。借助Redis作为消息队列,配合MySQL持久化任务消息,保证在Redis宕机等意外状况下任务消息不会丢失,同时支持任务的重试机制。

项目的主要特性和功能

  1. 支持分布式任务调度,可在分布式环境中创建任务,生产者将任务推送到Redis队列,消费者从队列获取并执行任务。
  2. 具备任务失败重试机制,任务执行失败后能自动或手动重试,确保任务最终完成。
  3. 提供自动与手动提交模式:自动提交模式适合简单任务处理场景,任务执行完成后自动提交状态;手动提交模式适合需要精确控制任务状态的场景,需手动调用 Commit 方法提交状态。
  4. 支持并发处理,可配置消费者并发数量,提高任务处理效率。
  5. 能自动检测并处理长时间未完成的僵尸任务,避免任务因意外情况长时间挂起。
  6. 采用MySQL持久化任务消息,保障在Redis宕机等意外时任务消息不丢失。

安装使用步骤

前提条件

  1. 已安装并配置好MySQL和Redis。
  2. 已安装Go语言环境。
  3. 已下载本项目的源码文件。

使用步骤

  1. 配置数据库和Redis连接:在代码里配置MySQL和Redis的连接信息,保证MySQL数据库有创建表的相应权限。
  2. 初始化任务表:在代码中设置 SetInitDB(true),系统会自动创建任务表;若不需要自动创建,可手动创建,表结构参考默认的 t_common_job_[task_id]
  3. 配置任务:使用 task.MustNewTask 方法创建任务对象,配置任务ID、MySQL连接、Redis客户端、消费者函数等参数,按需选择自动或手动提交模式。
  4. 发送任务:使用 mytask.Push([]byte("hello")) 方法将任务推送到Redis队列。
  5. 消费者处理任务:消费者函数从Redis队列获取任务并执行。自动提交模式下,任务执行完成自动提交状态;手动提交模式下,需手动调用 Commit 方法提交状态。
  6. 处理僵尸任务:系统自动检测并处理超时的僵尸任务,防止任务长时间挂起。

示例代码

自动提交模式

```go mytask := task.MustNewTask(context.Background(), "my_task_0831", // taskID DB, // mysql conn Client, // redis client task.SetJobFunc(MyJob), // 配置消费者函数 task.SetParallelNum(10), // 配置消费者并发数量 task.SetMaxFailedTimes(3), // 自动模式下重试次数 task.SetHandleZombieJobsDstStatus(task.JobStatusTodo), // 处理超时job task.SetHandleZombieJobsPeriod(time.Second20), // 处理超时job轮训周期 task.SetHandleZombieJobsTimeout(time.Second20), // 设置job超时时间 task.SetHandleZombieJobsLimit(1000), // 抽离超时job的limit task.SetTableName("t_my_job_0831"), // 设置db table name task.SetListNamePrefix("l_my_job_0831"), // 设置redis list name的prefix task.SetInitDB(true), // 设置是否初始化DB )

// 发送消息 mytask.Push([]byte("hello")) ```

手动提交模式

```go mytask := task.MustNewTask(context.Background(), "my_task_0831", // taskID DB, // mysql conn Client, // redis client task.SetJobFunc(MyJob), // 配置消费者函数 task.SetParallelNum(10), // 配置消费者并发数量 task.SetMaxFailedTimes(3), // 自动模式下重试次数 task.SetHandleZombieJobsDstStatus(task.JobStatusTodo), // 处理超时job task.SetHandleZombieJobsPeriod(time.Second20), // 处理超时job轮训周期 task.SetHandleZombieJobsTimeout(time.Second20), // 设置job超时时间 task.SetHandleZombieJobsLimit(1000), // 抽离超时job的limit task.SetTableName("t_my_job_0831"), // 设置db table name task.SetListNamePrefix("l_my_job_0831"), // 设置redis list name的prefix task.SetInitDB(true), // 设置是否初始化DB task.SetAutoCommit(false), // 关闭自动commit )

// 发送消息 mytask.Push([]byte("hello"))

// 消费者函数 func MyJob(ctx context.Context, id, fts, cID int64, payload []byte) error { succ := false defer func() { if succ { mytask.Commit(id, cID, task.JobStatusDone) } else { if fts+1 >= 5 { mytask.Commit(id, cID, task.JobStatusFailed) } else { mytask.Commit(id, cID, task.JobStatusTodo) } } }()

fmt.Println("MyJob assigned job: ", id, string(payload))
// 处理任务逻辑
succ = true
return nil

} ```

注意事项

  1. 要确保MySQL和Redis服务正常运行。
  2. 在生产环境中,需使用安全的数据库和Redis连接信息。
  3. 根据业务需求配置任务重试次数和超时时间。

版权声明

本项目遵循MIT开源协议,使用时请遵守相关协议。

下载地址

点击下载 【提取码: 4003】【解压密码: www.makuang.net】