ConcurrentPriorityQueue
ConcurrentPriorityQueue copied to clipboard
A thread-safe generic first in-first out (FIFO) collection with support for priority queuing.
ConcurrentPriorityQueue

A thread-safe generic first in first out (FIFO) collection with support for priority queuing.
Nuget: https://www.nuget.org/packages/ConcurrentPriorityQueue
Features
- Thread-Safe.
- Manages items according to a
First in first outpolicy andpriorityon top of that. - Implements
IProducerConsumerCollection<T>interface. - Extends to a
BlockingCollection<T>. - Supports multi-frameworks, includes
net48netstandard2.0net6.0net8.0
Examples:
Items in the collection must implement the generic interface IHavePriority<T> where T: implements IEquatable<T>, IComparable<T> and also overrides Object.GetHashCode():
// Simplest implementation of IHavePriority<T>
public class SomeClass : IHavePriority<int> {
int Priority {get; set;}
}
Simple flow for creating a Priority-By-Integer queue and adding an item:
// Create a new prioritized item.
var itemWithPriority = new SomeClass { Priority = 0 };
// Initialize an unbounded priority-by-integer queue.
var priorityQueue = new ConcurrentPriorityQueue<IHavePriority<int>, int>();
// Enqueue item and handle result.
Result result = priorityQueue.Enqueue(itemWithPriority);
Use the ConcurrentPriorityByIntegerQueue implementation to simplify the above example:
var priorityQueue = new ConcurrentPriorityByIntegerQueue<IHavePriority<int>>();
Consume items by priority/first-in-first-out policy, using Dequeue() and Peek():
// Lower value -> Higher priority.
var item1 = new SomeClass { Priority = 1 };
var item2 = new SomeClass { Priority = 0 };
var item3 = new SomeClass { Priority = 0 };
priorityQueue.Enqueue(item1);
priorityQueue.Enqueue(item2);
priorityQueue.Enqueue(item3);
var result = priority.Dequeue(); // item2
var result = priority.Dequeue(); // item3
var result = priority.Dequeue(); // item1
Iterating over the collection will yield items according to their priority and position (FIFO):
var item1 = new SomeClass { Priority = 1 };
var item2 = new SomeClass { Priority = 0 };
var item3 = new SomeClass { Priority = 0 };
priorityQueue.Enqueue(item1);
priorityQueue.Enqueue(item2);
priorityQueue.Enqueue(item3);
foreach(var item in priorityQueue) {
// Iteration 1 -> item2
// Iteration 2 -> item3
// Iteration 3 -> item1
}
ConcurrentPriorityQueue supports Generic Priorities.
Implement your own Business Priority object and configure the queue to handle it:
// TimeToProcess class implements IEquatable<T>, IComparable<T> and overrides Object.GetHashCode().
public class TimeToProcess : IEquatable<TimeToProcess>, IComparable<TimeToProcess> {
public decimal TimeInMilliseconds { get; set;}
public int CompareTo(TimeToProcess other) =>
TimeInMilliseconds.CompareTo(other.TimeInMilliseconds);
public bool Equals(TimeToProcess other) =>
TimeInMilliseconds.Equals(other.TimeInMilliseconds);
public override int GetHashCode() => TimeInMilliseconds.GetHashCode();
}
// BusinessPriorityItem implements IHavePriority<T>
public class BusinessPriorityItem : IHavePriority<TimeToProcess> {
TimeToProcess Priority {get; set;}
}
// Create a new prioritized item.
var item = new BusinessPriorityItem { Priority = new TimeToProcess { TimeInMilliseconds = 0.25M } };
// Initialize an unbounded priority-by-TimeToProcess queue.
var priorityQueue = new ConcurrentPriorityQueue<IHavePriority<TimeToProcess>, TimeToProcess>();
// Enqueue item and handle result.
Result result = priorityQueue.Enqueue(item);
ConcurrentPriorityQueue<T> can be bounded to a fixed amount of priorities:
// Create a bounded ConcurrentPriorityQueue to support a fixed amount of priorities.
var maxAmountOfPriorities = 2;
var priorityQueue = new ConcurrentPriorityByIntegerQueue<IHavePriority<int>>(maxAmountOfPriorities);
Result result = PriorityQueue.Enqueue(new SomeClass {Priority = 0}); // result.OK
Result result = PriorityQueue.Enqueue(new SomeClass {Priority = 1}); // result.OK
Result result = PriorityQueue.Enqueue(new SomeClass {Priority = 2}); // result.Fail -> Queue supports [0, 1]
Result result = PriorityQueue.Enqueue(new SomeClass {Priority = 0}); // result.OK
ConcurrentPriorityQueue<T> can be extended to a BlockingCollection<T> using the ToBlockingCollection<T> extension method:
var blockingPriorityQueue = new ConcurrentPriorityByIntegerQueue<IHavePriority<int>>()
.ToBlockingCollection();
foreach(var item in blockingPriorityQueue.GetConsumingEnumerable()) {
// Do something...
// Blocks until signaled on completion.
}