1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106
//! Request types.
use crate::server::error::RequestError;
use std::time::SystemTime;
use tokio::net::TcpStream;
/// Requests queuing metadata.
#[derive(Debug, Clone)]
struct QueueData {
// / Exclusive request id.
// request_id: u64, // TODO: implement with request queue (implement exlusive request_id generator in queue object).
/// Time which the request was received.
time_received: SystemTime,
/// Number of times the request has been requeued.
requeue_attempts: u32,
}
impl QueueData {
/// Returns a new instance of QueueData.
fn new() -> Self {
QueueData {
time_received: SystemTime::now(),
requeue_attempts: 0,
}
}
/// Increases the requeue attempts for the request.
pub fn increase_requeues(&mut self) {
self.requeue_attempts += 1;
}
/// Returns the duration sunce the request was received.
fn duration(&self) -> Result<std::time::Duration, RequestError> {
self.time_received.elapsed().map_err(RequestError::from)
}
/// Returns the number of times the request has been requeued.
fn requeues(&self) -> u32 {
self.requeue_attempts
}
}
/// TcpStream holing an incoming gRPC request.
#[derive(Debug)]
pub struct TcpRequest(TcpStream);
impl TcpRequest {
/// Returns the underlying TcpStream help by the request
pub fn get_stream(self) -> TcpStream {
self.0
}
}
/// Requests originating from the Tcp server.
#[derive(Debug)]
pub struct TcpServerRequest {
queuedata: QueueData,
request: TcpRequest,
}
impl TcpServerRequest {
/// Returns the underlying request.
pub fn get_request(self) -> TcpRequest {
self.request
}
}
/// Zingo-Indexer request, used by request queue.
#[derive(Debug)]
pub enum ZingoIndexerRequest {
/// Requests originating from the gRPC server.
TcpServerRequest(TcpServerRequest),
}
impl ZingoIndexerRequest {
/// Creates a ZingoIndexerRequest from a gRPC service call, recieved by the gRPC server.
///
/// TODO: implement proper functionality along with queue.
pub fn new_from_grpc(stream: TcpStream) -> Self {
ZingoIndexerRequest::TcpServerRequest(TcpServerRequest {
queuedata: QueueData::new(),
request: TcpRequest(stream),
})
}
/// Increases the requeue attempts for the request.
pub fn increase_requeues(&mut self) {
match self {
ZingoIndexerRequest::TcpServerRequest(ref mut req) => req.queuedata.increase_requeues(),
}
}
/// Returns the duration sunce the request was received.
pub fn duration(&self) -> Result<std::time::Duration, RequestError> {
match self {
ZingoIndexerRequest::TcpServerRequest(ref req) => req.queuedata.duration(),
}
}
/// Returns the number of times the request has been requeued.
pub fn requeues(&self) -> u32 {
match self {
ZingoIndexerRequest::TcpServerRequest(ref req) => req.queuedata.requeues(),
}
}
}