1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
//! Zingo-Indexer queue implementation.

use crate::server::error::QueueError;
use crossbeam_channel::{bounded, Receiver, Sender};
use std::sync::{
    atomic::{AtomicUsize, Ordering},
    Arc,
};

/// Queue with max length.
#[derive(Debug, Clone)]
pub(crate) struct Queue<T> {
    /// Max number of messages allowed in the queue.
    max_length: usize,
    /// Used to track current messages in the queue.
    queue_status: Arc<AtomicUsize>,
    /// Queue sender.
    queue_tx: QueueSender<T>,
    /// Queue receiver.
    queue_rx: QueueReceiver<T>,
}

impl<T> Queue<T> {
    /// Creates a new queue with a maximum size.
    pub(crate) fn new(max_length: usize, queue_status: Arc<AtomicUsize>) -> Self {
        let (queue_tx, queue_rx) = bounded(max_length);
        queue_status.store(0, Ordering::SeqCst);
        Queue {
            max_length,
            queue_status: queue_status.clone(),
            queue_tx: QueueSender {
                inner: queue_tx,
                queue_status: queue_status.clone(),
            },
            queue_rx: QueueReceiver {
                inner: queue_rx,
                queue_status,
            },
        }
    }

    /// Returns a queue transmitter.
    pub(crate) fn tx(&self) -> QueueSender<T> {
        self.queue_tx.clone()
    }

    /// Returns a queue receiver.
    pub(crate) fn rx(&self) -> QueueReceiver<T> {
        self.queue_rx.clone()
    }

    /// Returns the max length of the queue.
    pub(crate) fn max_length(&self) -> usize {
        self.max_length
    }

    /// Returns the current length of the queue.
    pub(crate) fn queue_length(&self) -> usize {
        self.queue_status.load(Ordering::SeqCst)
    }
}

/// Sends messages to a queue.
#[derive(Debug)]
pub(crate) struct QueueSender<T> {
    /// Crossbeam_Channel Sender.
    inner: Sender<T>,
    /// Used to track current messages in the queue.
    queue_status: Arc<AtomicUsize>,
}

impl<T> Clone for QueueSender<T> {
    fn clone(&self) -> Self {
        Self {
            inner: self.inner.clone(),
            queue_status: Arc::clone(&self.queue_status),
        }
    }
}

impl<T> QueueSender<T> {
    /// Tries to add a request to the queue, updating the queue size.
    pub(crate) fn try_send(&self, message: T) -> Result<(), QueueError<T>> {
        match self.inner.try_send(message) {
            Ok(_) => {
                self.queue_status.fetch_add(1, Ordering::SeqCst);
                Ok(())
            }
            Err(crossbeam_channel::TrySendError::Full(t)) => Err(QueueError::QueueFull(t)),
            Err(crossbeam_channel::TrySendError::Disconnected(_)) => Err(QueueError::QueueClosed),
        }
    }

    /// Returns the current length of the queue.
    pub(crate) fn queue_length(&self) -> usize {
        self.queue_status.load(Ordering::SeqCst)
    }
}

/// Receives messages from a queue.
#[derive(Debug)]
pub(crate) struct QueueReceiver<T> {
    /// Crossbeam_Channel Receiver.
    inner: Receiver<T>,
    /// Used to track current messages in the queue.
    queue_status: Arc<AtomicUsize>,
}

impl<T> Clone for QueueReceiver<T> {
    fn clone(&self) -> Self {
        Self {
            inner: self.inner.clone(),
            queue_status: Arc::clone(&self.queue_status),
        }
    }
}

impl<T> QueueReceiver<T> {
    /// Try to receive a request from the queue, updatig queue size.
    pub(crate) fn try_recv(&self) -> Result<T, QueueError<T>> {
        match self.inner.try_recv() {
            Ok(message) => {
                self.queue_status.fetch_sub(1, Ordering::SeqCst);
                Ok(message)
            }
            Err(crossbeam_channel::TryRecvError::Empty) => Err(QueueError::QueueEmpty),
            Err(crossbeam_channel::TryRecvError::Disconnected) => Err(QueueError::QueueClosed),
        }
    }

    /// Listens indefinately for an incoming message on the queue. Returns message if received or error if queue is closed.
    pub(crate) async fn listen(&self) -> Result<T, QueueError<T>> {
        // NOTE: This interval may need to be reduced or removed / moved once scale testing begins.
        let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(50));
        loop {
            match self.try_recv() {
                Ok(message) => {
                    return Ok(message);
                }
                Err(QueueError::QueueEmpty) => {
                    interval.tick().await;
                    continue;
                }
                Err(e) => {
                    return Err(e);
                }
            }
        }
    }

    /// Returns the current length of the queue.
    pub(crate) fn _queue_length(&self) -> usize {
        self.queue_status.load(Ordering::SeqCst)
    }
}