use http::Uri;
use std::{
net::SocketAddr,
sync::{
atomic::{AtomicBool, AtomicUsize, Ordering},
Arc,
},
};
use crate::server::{
error::{IngestorError, ServerError, WorkerError},
ingestor::TcpIngestor,
queue::Queue,
request::ZingoIndexerRequest,
worker::{WorkerPool, WorkerPoolStatus},
AtomicStatus, StatusType,
};
#[derive(Debug, Clone)]
pub struct ServerStatus {
pub server_status: AtomicStatus,
tcp_ingestor_status: AtomicStatus,
workerpool_status: WorkerPoolStatus,
request_queue_status: Arc<AtomicUsize>,
}
impl ServerStatus {
pub fn new(max_workers: u16) -> Self {
ServerStatus {
server_status: AtomicStatus::new(5),
tcp_ingestor_status: AtomicStatus::new(5),
workerpool_status: WorkerPoolStatus::new(max_workers),
request_queue_status: Arc::new(AtomicUsize::new(0)),
}
}
pub fn load(&self) -> ServerStatus {
self.server_status.load();
self.tcp_ingestor_status.load();
self.workerpool_status.load();
self.request_queue_status.load(Ordering::SeqCst);
self.clone()
}
}
pub struct Server {
tcp_ingestor: Option<TcpIngestor>,
worker_pool: WorkerPool,
request_queue: Queue<ZingoIndexerRequest>,
status: ServerStatus,
pub online: Arc<AtomicBool>,
}
impl Server {
#[allow(clippy::too_many_arguments)]
pub async fn spawn(
tcp_active: bool,
tcp_ingestor_listen_addr: Option<SocketAddr>,
lightwalletd_uri: Uri,
zebrad_uri: Uri,
max_queue_size: u16,
max_worker_pool_size: u16,
idle_worker_pool_size: u16,
status: ServerStatus,
online: Arc<AtomicBool>,
) -> Result<Self, ServerError> {
if !tcp_active {
return Err(ServerError::ServerConfigError(
"Cannot start server with no ingestors selected.".to_string(),
));
}
if tcp_active && tcp_ingestor_listen_addr.is_none() {
return Err(ServerError::ServerConfigError(
"TCP is active but no address provided.".to_string(),
));
}
println!("Launching Server!\n");
status.server_status.store(0);
let request_queue: Queue<ZingoIndexerRequest> =
Queue::new(max_queue_size as usize, status.request_queue_status.clone());
status.request_queue_status.store(0, Ordering::SeqCst);
let tcp_ingestor = if tcp_active {
println!("Launching TcpIngestor..");
Some(
TcpIngestor::spawn(
tcp_ingestor_listen_addr
.expect("tcp_ingestor_listen_addr returned none when used."),
request_queue.tx().clone(),
status.tcp_ingestor_status.clone(),
online.clone(),
)
.await?,
)
} else {
None
};
println!("Launching WorkerPool..");
let worker_pool = WorkerPool::spawn(
max_worker_pool_size,
idle_worker_pool_size,
request_queue.rx().clone(),
request_queue.tx().clone(),
lightwalletd_uri,
zebrad_uri,
status.workerpool_status.clone(),
online.clone(),
)
.await;
Ok(Server {
tcp_ingestor,
worker_pool,
request_queue,
status: status.clone(),
online,
})
}
pub async fn serve(mut self) -> tokio::task::JoinHandle<Result<(), ServerError>> {
tokio::task::spawn(async move {
let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(50));
let mut tcp_ingestor_handle = None;
let mut worker_handles;
if let Some(ingestor) = self.tcp_ingestor.take() {
tcp_ingestor_handle = Some(ingestor.serve().await);
}
worker_handles = self.worker_pool.clone().serve().await;
self.status.server_status.store(1);
loop {
if self.request_queue.queue_length() >= (self.request_queue.max_length() / 4)
&& (self.worker_pool.workers() < self.worker_pool.max_size() as usize)
{
match self.worker_pool.push_worker().await {
Ok(handle) => {
worker_handles.push(handle);
}
Err(_e) => {
eprintln!("WorkerPool at capacity");
}
}
} else if (self.request_queue.queue_length() <= 1)
&& (self.worker_pool.workers() > self.worker_pool.idle_size() as usize)
{
let worker_index = self.worker_pool.workers() - 1;
let worker_handle = worker_handles.remove(worker_index);
match self.worker_pool.pop_worker(worker_handle).await {
Ok(_) => {}
Err(e) => {
eprintln!("Failed to pop worker from pool: {}", e);
}
}
}
self.statuses();
if self.check_for_shutdown().await {
self.status.server_status.store(4);
let worker_handle_options: Vec<
Option<tokio::task::JoinHandle<Result<(), WorkerError>>>,
> = worker_handles.into_iter().map(Some).collect();
self.shutdown_components(tcp_ingestor_handle, worker_handle_options)
.await;
self.status.server_status.store(5);
return Ok(());
}
interval.tick().await;
}
})
}
pub async fn check_for_shutdown(&self) -> bool {
if self.status() >= 4 {
return true;
}
if !self.check_online() {
return true;
}
false
}
pub async fn shutdown(&mut self) {
self.status.server_status.store(4)
}
async fn shutdown_components(
&mut self,
tcp_ingestor_handle: Option<tokio::task::JoinHandle<Result<(), IngestorError>>>,
mut worker_handles: Vec<Option<tokio::task::JoinHandle<Result<(), WorkerError>>>>,
) {
if let Some(handle) = tcp_ingestor_handle {
self.status.tcp_ingestor_status.store(4);
handle.await.ok();
}
self.worker_pool.shutdown(&mut worker_handles).await;
}
pub fn status(&self) -> usize {
self.status.server_status.load()
}
pub fn statustype(&self) -> StatusType {
StatusType::from(self.status())
}
pub fn statuses(&mut self) -> ServerStatus {
self.status.server_status.load();
self.status.tcp_ingestor_status.load();
self.status
.request_queue_status
.store(self.request_queue.queue_length(), Ordering::SeqCst);
self.worker_pool.status();
self.status.clone()
}
pub async fn check_statuses(&mut self) {
todo!()
}
fn check_online(&self) -> bool {
self.online.load(Ordering::SeqCst)
}
}