gores
gores copied to clipboard
:construction_worker: Redis-backed library for creating background jobs in Go. Placing jobs in multiple queues, and process them later asynchronously.
Gores
An asynchronous job execution system based on Redis
Installation
Get the package
$ go get github.com/wang502/gores/gores
Import the package
import "github.com/wang502/gores/gores"
Usage
Start local Redis server
$ git clone [email protected]:antirez/redis.git
$ cd redis
$ ./src/redis-server
Configuration
Add a config.json in your project folder
{
"RedisURL": "127.0.0.1:6379",
"RedisPassword": "mypassword",
"BlpopMaxBlockTime" : 1,
"MaxWorkers": 2,
"Queues": ["queue1", "queue2"],
"DispatcherTimeout": 5,
"WorkerTimeout": 5
}
-
RedisURL: Redis server address. If you run in a local Redis, the dafault host is
127.0.0.1:6379
- RedisPassword: Redis password. If the password is not set, then password can be any string.
- BlpopMaxBlockTime: Blocking time when calling BLPOP command in Redis.
- MaxWorkers: Maximum number of concurrent workers, each worker is a separate goroutine that execute specific task on the fetched item.
- Queues: Array of queue names on Redis message broker.
- DispatcherTimeout: Duration dispatcher will wait to dispatch new job before quitting.
- WorkerTimeout: Duration worker will wait to process new job before quitting.
Initialize config
configPath := flag.String("c", "config.json", "path to configuration file")
flag.Parse()
config, err := gores.InitConfig(*configPath)
Enqueue job to Redis queue
A job is a Go map. It is required to have several keys:
- Name: name of the item to enqueue, items with different names are mapped to different tasks.
- Queue: name of the queue you want to put the item in.
- Args: the required arguments that you need in order for the workers to execute those tasks.
- Enqueue_timestamp: the Unix timestamp of when the item is enqueued.
gores := gores.NewGores(config)
job := map[string]interface{}{
"Name": "Rectangle",
"Queue": "TestJob",
"Args": map[string]interface{}{
"Length": 10,
"Width": 10,
},
"Enqueue_timestamp": time.Now().Unix(),
}
err = gores.Enqueue(job)
if err != nil {
log.Fatalf("ERROR Enqueue item to Gores")
}
$ go run main.go -c ./config.json -o produce
Define tasks
package tasks
// task for item with 'Name' = 'Rectangle'
// calculating the area of an rectangle by multiplying Length with Width
func CalculateArea(args map[string]interface{}) error {
var err error
length := args["Length"]
width := args["Width"]
if length == nil || width == nil {
err = errors.New("Map has no required attributes")
return err
}
fmt.Printf("The area is %d\n", int(length.(float64)) * int(width.(float64)))
return err
}
Launch workers to consume items and execute tasks
tasks := map[string]interface{}{
"Item": tasks.PrintItem,
"Rectangle": tasks.CalculateArea,
}
gores.Launch(config, &tasks)
$ go run main.go -c ./config.json -o consume
The output will be:
The rectangle area is 100
Info about processed/failed job
gores := gores.NewGores(config)
if gores == nil {
log.Fatalf("gores is nil")
}
info, err := gores.Info()
if err != nil {
log.Fatal(err)
}
for k, v := range info {
switch v.(type) {
case string:
fmt.Printf("%s : %s\n", k, v)
case int:
fmt.Printf("%s : %d\n", k, v)
case int64:
fmt.Printf("%s : %d\n", k, v)
}
}
The output will be:
Gores Info:
queues : 2
workers : 0
failed : 0
host : 127.0.0.1:6379
pending : 0
processed : 1
Contribution
Please feel free to suggest new features. Also open to pull request!