1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
//! Holds the server ingestor (listener) implementations.

use std::{
    net::SocketAddr,
    sync::{
        atomic::{AtomicBool, Ordering},
        Arc,
    },
};
use tokio::net::TcpListener;

use crate::server::{
    error::{IngestorError, QueueError},
    queue::QueueSender,
    request::ZingoIndexerRequest,
    AtomicStatus, StatusType,
};

/// Listens for incoming gRPC requests over HTTP.
pub(crate) struct TcpIngestor {
    /// Tcp Listener.
    ingestor: TcpListener,
    /// Used to send requests to the queue.
    queue: QueueSender<ZingoIndexerRequest>,
    /// Current status of the ingestor.
    status: AtomicStatus,
    /// Represents the Online status of the gRPC server.
    online: Arc<AtomicBool>,
}

impl TcpIngestor {
    /// Creates a Tcp Ingestor.
    pub(crate) async fn spawn(
        listen_addr: SocketAddr,
        queue: QueueSender<ZingoIndexerRequest>,
        status: AtomicStatus,
        online: Arc<AtomicBool>,
    ) -> Result<Self, IngestorError> {
        status.store(0);
        let listener = TcpListener::bind(listen_addr).await?;
        println!("TcpIngestor listening at: {}.", listen_addr);
        Ok(TcpIngestor {
            ingestor: listener,
            queue,
            online,
            status,
        })
    }

    /// Starts Tcp service.
    pub(crate) async fn serve(self) -> tokio::task::JoinHandle<Result<(), IngestorError>> {
        tokio::task::spawn(async move {
            // NOTE: This interval may need to be changed or removed / moved once scale testing begins.
            let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(50));
            // TODO Check blockcache sync status and wait on server / node if on hold.
            self.status.store(1);
            loop {
                tokio::select! {
                    _ = interval.tick() => {
                        if self.check_for_shutdown().await {
                            self.status.store(5);
                            return Ok(());
                        }
                    }
                    incoming = self.ingestor.accept() => {
                        // NOTE: This may need to be removed / moved for scale use.
                        if self.check_for_shutdown().await {
                            self.status.store(5);
                            return Ok(());
                        }
                        match incoming {
                            Ok((stream, _)) => {
                                match self.queue.try_send(ZingoIndexerRequest::new_from_grpc(stream)) {
                                    Ok(_) => {
                                        println!("[TEST] Requests in Queue: {}", self.queue.queue_length());
                                    }
                                    Err(QueueError::QueueFull(_request)) => {
                                        eprintln!("Queue Full.");
                                        // TODO: Return queue full tonic status over tcpstream and close (that TcpStream..).
                                    }
                                    Err(e) => {
                                        eprintln!("Queue Closed. Failed to send request to queue: {}", e);
                                        // TODO: Handle queue closed error here.
                                    }
                                }
                            }
                            Err(e) => {
                                eprintln!("Failed to accept connection with client: {}", e);
                                // TODO: Handle failed connection errors here (count errors and restart ingestor / proxy or initiate shotdown?)
                            }
                        }
                    }
                }
            }
        })
    }

    /// Checks indexers online status and ingestors internal status for closure signal.
    pub(crate) async fn check_for_shutdown(&self) -> bool {
        if self.status() >= 4 {
            return true;
        }
        if !self.check_online() {
            return true;
        }
        false
    }

    /// Sets the ingestor to close gracefully.
    pub(crate) async fn _shutdown(&mut self) {
        self.status.store(4)
    }

    /// Returns the ingestor current status usize.
    pub(crate) fn status(&self) -> usize {
        self.status.load()
    }

    /// Returns the ingestor current statustype.
    pub(crate) fn _statustype(&self) -> StatusType {
        StatusType::from(self.status())
    }

    fn check_online(&self) -> bool {
        self.online.load(Ordering::SeqCst)
    }
}