use crate::server::error::QueueError;
use crossbeam_channel::{bounded, Receiver, Sender};
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
#[derive(Debug, Clone)]
pub(crate) struct Queue<T> {
max_length: usize,
queue_status: Arc<AtomicUsize>,
queue_tx: QueueSender<T>,
queue_rx: QueueReceiver<T>,
}
impl<T> Queue<T> {
pub(crate) fn new(max_length: usize, queue_status: Arc<AtomicUsize>) -> Self {
let (queue_tx, queue_rx) = bounded(max_length);
queue_status.store(0, Ordering::SeqCst);
Queue {
max_length,
queue_status: queue_status.clone(),
queue_tx: QueueSender {
inner: queue_tx,
queue_status: queue_status.clone(),
},
queue_rx: QueueReceiver {
inner: queue_rx,
queue_status,
},
}
}
pub(crate) fn tx(&self) -> QueueSender<T> {
self.queue_tx.clone()
}
pub(crate) fn rx(&self) -> QueueReceiver<T> {
self.queue_rx.clone()
}
pub(crate) fn max_length(&self) -> usize {
self.max_length
}
pub(crate) fn queue_length(&self) -> usize {
self.queue_status.load(Ordering::SeqCst)
}
}
#[derive(Debug)]
pub(crate) struct QueueSender<T> {
inner: Sender<T>,
queue_status: Arc<AtomicUsize>,
}
impl<T> Clone for QueueSender<T> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
queue_status: Arc::clone(&self.queue_status),
}
}
}
impl<T> QueueSender<T> {
pub(crate) fn try_send(&self, message: T) -> Result<(), QueueError<T>> {
match self.inner.try_send(message) {
Ok(_) => {
self.queue_status.fetch_add(1, Ordering::SeqCst);
Ok(())
}
Err(crossbeam_channel::TrySendError::Full(t)) => Err(QueueError::QueueFull(t)),
Err(crossbeam_channel::TrySendError::Disconnected(_)) => Err(QueueError::QueueClosed),
}
}
pub(crate) fn queue_length(&self) -> usize {
self.queue_status.load(Ordering::SeqCst)
}
}
#[derive(Debug)]
pub(crate) struct QueueReceiver<T> {
inner: Receiver<T>,
queue_status: Arc<AtomicUsize>,
}
impl<T> Clone for QueueReceiver<T> {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
queue_status: Arc::clone(&self.queue_status),
}
}
}
impl<T> QueueReceiver<T> {
pub(crate) fn try_recv(&self) -> Result<T, QueueError<T>> {
match self.inner.try_recv() {
Ok(message) => {
self.queue_status.fetch_sub(1, Ordering::SeqCst);
Ok(message)
}
Err(crossbeam_channel::TryRecvError::Empty) => Err(QueueError::QueueEmpty),
Err(crossbeam_channel::TryRecvError::Disconnected) => Err(QueueError::QueueClosed),
}
}
pub(crate) async fn listen(&self) -> Result<T, QueueError<T>> {
let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(50));
loop {
match self.try_recv() {
Ok(message) => {
return Ok(message);
}
Err(QueueError::QueueEmpty) => {
interval.tick().await;
continue;
}
Err(e) => {
return Err(e);
}
}
}
}
pub(crate) fn _queue_length(&self) -> usize {
self.queue_status.load(Ordering::SeqCst)
}
}