1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
//! Holds the server ingestor (listener) implementations.
use std::{
net::SocketAddr,
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
};
use tokio::net::TcpListener;
use crate::server::{
error::{IngestorError, QueueError},
queue::QueueSender,
request::ZingoIndexerRequest,
AtomicStatus, StatusType,
};
/// Listens for incoming gRPC requests over HTTP.
pub(crate) struct TcpIngestor {
/// Tcp Listener.
ingestor: TcpListener,
/// Used to send requests to the queue.
queue: QueueSender<ZingoIndexerRequest>,
/// Current status of the ingestor.
status: AtomicStatus,
/// Represents the Online status of the gRPC server.
online: Arc<AtomicBool>,
}
impl TcpIngestor {
/// Creates a Tcp Ingestor.
pub(crate) async fn spawn(
listen_addr: SocketAddr,
queue: QueueSender<ZingoIndexerRequest>,
status: AtomicStatus,
online: Arc<AtomicBool>,
) -> Result<Self, IngestorError> {
status.store(0);
let listener = TcpListener::bind(listen_addr).await?;
println!("TcpIngestor listening at: {}.", listen_addr);
Ok(TcpIngestor {
ingestor: listener,
queue,
online,
status,
})
}
/// Starts Tcp service.
pub(crate) async fn serve(self) -> tokio::task::JoinHandle<Result<(), IngestorError>> {
tokio::task::spawn(async move {
// NOTE: This interval may need to be changed or removed / moved once scale testing begins.
let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(50));
// TODO Check blockcache sync status and wait on server / node if on hold.
self.status.store(1);
loop {
tokio::select! {
_ = interval.tick() => {
if self.check_for_shutdown().await {
self.status.store(5);
return Ok(());
}
}
incoming = self.ingestor.accept() => {
// NOTE: This may need to be removed / moved for scale use.
if self.check_for_shutdown().await {
self.status.store(5);
return Ok(());
}
match incoming {
Ok((stream, _)) => {
match self.queue.try_send(ZingoIndexerRequest::new_from_grpc(stream)) {
Ok(_) => {
println!("[TEST] Requests in Queue: {}", self.queue.queue_length());
}
Err(QueueError::QueueFull(_request)) => {
eprintln!("Queue Full.");
// TODO: Return queue full tonic status over tcpstream and close (that TcpStream..).
}
Err(e) => {
eprintln!("Queue Closed. Failed to send request to queue: {}", e);
// TODO: Handle queue closed error here.
}
}
}
Err(e) => {
eprintln!("Failed to accept connection with client: {}", e);
// TODO: Handle failed connection errors here (count errors and restart ingestor / proxy or initiate shotdown?)
}
}
}
}
}
})
}
/// Checks indexers online status and ingestors internal status for closure signal.
pub(crate) async fn check_for_shutdown(&self) -> bool {
if self.status() >= 4 {
return true;
}
if !self.check_online() {
return true;
}
false
}
/// Sets the ingestor to close gracefully.
pub(crate) async fn _shutdown(&mut self) {
self.status.store(4)
}
/// Returns the ingestor current status usize.
pub(crate) fn status(&self) -> usize {
self.status.load()
}
/// Returns the ingestor current statustype.
pub(crate) fn _statustype(&self) -> StatusType {
StatusType::from(self.status())
}
fn check_online(&self) -> bool {
self.online.load(Ordering::SeqCst)
}
}