delay-queue
delay-queue copied to clipboard
延迟队列,高可用。Delay Queue
delay-queue
Translations:
- 中文文档
____ _ ___
| _ \ ___| | __ _ _ _ / _ \ _ _ ___ _ _ ___
| | | |/ _ \ |/ _` | | | | | | | | | | |/ _ \ | | |/ _ \
| |_| | __/ | (_| | |_| | | |_| | |_| | __/ |_| | __/
|____/ \___|_|\__,_|\__, | \__\_\\__,_|\___|\__,_|\___|
|___/
Introduction
This project is a delay queue. It is based on Youzan 有赞 delay queue. Currently, it is based on Redis for storage.
Highly Available
How to run the delay queue?
# clone project
git clone https://github.com/changsongl/delay-queue.git
# build the project
make
# run the project
bin/delayqueue
# flags
bin/delayqueue -help
-config.file string
config file (default "../../config/config.yaml")
-config.type string
config type: yaml, json
-env string
delay queue env: debug, release (default "release")
-version
display build info
The default configuration file is config/config.yaml.example
.
Usage
-
SDK Link
-
Http
// Push job
POST 127.0.0.1:8000/topic/mytopic/job
body: {"id": "myid1","delay":10, "ttr":4, "body":"body"}
// response
{
"message": "ok",
"success": true
}
// Pop job (timeout: seconds)
GET 127.0.0.1:8000/topic/mytopic/job?timeout=5
// response
{
"message": "ok",
"success": true,
"data": {
"body": "body",
"delay": 10,
"id": "myid1",
"topic": "mytopic",
"ttr": 4
}
}
// Delete job
DELETE 127.0.0.1:8000/topic/mytopic/job/myid1
// response
{
"message": "ok",
"success": true
}
// Delete job
PUT 127.0.0.1:8000/topic/mytopic/job/myid1
// response
{
"message": "ok",
"success": true
}
Designs
Terms
- Job: It is a task to be processed, and it is related to only one topic.
- Topic: It is a set of jobs, it is implemented by a time-sorted queue. All consumers need to choose at least one topic to consume jobs.
Job
Jobs contain many properties like:
- Topic: It could be a service name, users can define it depending on their business.
- ID: it is unique key for inside of a topic. It's used to search job information in a topic. The combination of a topic and an ID should be unique in your business.
- Delay: It defines how many second to be delay for the job. Unit: Second
- TTR(time to run): It is job processing timeout. If consumer process this job more than TTR seconds, it might be sent to other consumer, if a consumer pop the topic.
- Body: It is content of job. It is a string. You can put your json data to it. When you consume the job, you can decode it and run your logic.
Component
There are 4 components in the delay queue.
- Job Pool: It saves all metadata of jobs.
- Delay Bucket: It is a time-sorted queue. It saves jobs that is waiting for being ready. There are more than one Bucket in the delay queue for higher throughput.
- Timer: It is a core component to scan the Delay Bucket. It pops out ready jobs from Buckets and put then inside ready queue.
- Ready Queue: It is a queue for storing all ready jobs, which can be popped now. It is also only store the job id for the consumers.

States
There are four states for jobs in the delay queue. The job can be only in one state at the time.
- Ready: It is ready to be consumed.
- Delay: It is waiting for the delay time, and it can't be consumed.
- reserved: It means the job has consumed by a consumer, but consumer hasn't ack the job. (Call delete、finish).
- Deleted: The job has finished or deleted.

Monitor
This project is using Prometheus as the monitor tool. It exposes the metrics apis to Prometheus. You can use Prometheus and Grafana as the monitor tools.
# HELP delay_queue_in_flight_jobs_numbers_in_bucket Gauge of the number of inflight jobs in each bucket
# TYPE delay_queue_in_flight_jobs_numbers_in_bucket gauge
delay_queue_in_flight_jobs_numbers_in_bucket{bucket="dq_bucket_0"} 0
delay_queue_in_flight_jobs_numbers_in_bucket{bucket="dq_bucket_1"} 3
delay_queue_in_flight_jobs_numbers_in_bucket{bucket="dq_bucket_2"} 0
delay_queue_in_flight_jobs_numbers_in_bucket{bucket="dq_bucket_3"} 0
delay_queue_in_flight_jobs_numbers_in_bucket{bucket="dq_bucket_4"} 0
delay_queue_in_flight_jobs_numbers_in_bucket{bucket="dq_bucket_5"} 0
delay_queue_in_flight_jobs_numbers_in_bucket{bucket="dq_bucket_6"} 0
delay_queue_in_flight_jobs_numbers_in_bucket{bucket="dq_bucket_7"} 0
.
.
.
# HELP delay_queue_in_flight_jobs_numbers_in_queue Gauge of the number of inflight jobs in each queue
# TYPE delay_queue_in_flight_jobs_numbers_in_queue gauge
delay_queue_in_flight_jobs_numbers_in_queue{queue="dq_queue_mytopic"} 1
What's the plan of this project?
I will work on this project all the time! I will add more features and fix bugs, and I will make this project ready to use in production. Star Or Fork it if you like it. I'm very welcome to you for contribution.
How to contribute?
- Level a message in the unsigned issue.
- We will discuss how to do it, and I will assign the issue to you.
- Fork the project, and checkout your branch from "develop" branch.
- Submit the PR to "develop" branch.
- It will be merged after code review.
Stargazers
Reference
Youzan Design Concept Youzan Link