1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
//! Zingo-Indexer mempool state functionality.

use std::{collections::HashSet, time::SystemTime};
use tokio::sync::{Mutex, RwLock};

use crate::{chain::error::MempoolError, jsonrpc::connector::JsonRpcConnector};

/// Mempool state information.
pub struct Mempool {
    /// Txids currently in the mempool.
    txids: RwLock<Vec<String>>,
    /// Txids that have already been added to Zingo-Indexer's mempool.
    txids_seen: Mutex<HashSet<String>>,
    /// System time when the mempool was last updated.
    last_sync_time: Mutex<SystemTime>,
    /// Blockchain data, used to check when a new block has been mined.
    best_block_hash: RwLock<Option<zebra_chain::block::Hash>>,
}

impl Default for Mempool {
    fn default() -> Self {
        Self::new()
    }
}

impl Mempool {
    /// Returns an empty mempool.
    pub fn new() -> Self {
        Mempool {
            txids: RwLock::new(Vec::new()),
            txids_seen: Mutex::new(HashSet::new()),
            last_sync_time: Mutex::new(SystemTime::now()),
            best_block_hash: RwLock::new(None),
        }
    }

    /// Updates the mempool, returns true if the current block in the mempool has been mined.
    pub async fn update(&self, zebrad_uri: &http::Uri) -> Result<bool, MempoolError> {
        self.update_last_sync_time().await?;
        let mined = self.check_and_update_best_block_hash(zebrad_uri).await?;
        if mined {
            self.reset_txids().await?;
            self.update_txids(zebrad_uri).await?;
            Ok(true)
        } else {
            self.update_txids(zebrad_uri).await?;
            Ok(false)
        }
    }

    /// Updates the txids in the mempool.
    async fn update_txids(&self, zebrad_uri: &http::Uri) -> Result<(), MempoolError> {
        let node_txids = JsonRpcConnector::new(
            zebrad_uri.clone(),
            Some("xxxxxx".to_string()),
            Some("xxxxxx".to_string()),
        )
        .await?
        .get_raw_mempool()
        .await?
        .transactions;
        let mut txids_seen = self.txids_seen.lock().await;
        let mut txids = self.txids.write().await;
        for txid in node_txids {
            if !txids_seen.contains(&txid) {
                txids.push(txid.clone());
            }
            txids_seen.insert(txid);
        }
        Ok(())
    }

    /// Updates the system last sync time.
    async fn update_last_sync_time(&self) -> Result<(), MempoolError> {
        let mut last_sync_time = self.last_sync_time.lock().await;
        *last_sync_time = SystemTime::now();
        Ok(())
    }

    /// Updates the mempool blockchain info, returns true if the current block in the mempool has been mined.
    async fn check_and_update_best_block_hash(
        &self,
        zebrad_uri: &http::Uri,
    ) -> Result<bool, MempoolError> {
        let node_best_block_hash = JsonRpcConnector::new(
            zebrad_uri.clone(),
            Some("xxxxxx".to_string()),
            Some("xxxxxx".to_string()),
        )
        .await?
        .get_blockchain_info()
        .await?
        .best_block_hash;

        let mut last_best_block_hash = self.best_block_hash.write().await;

        if let Some(ref last_hash) = *last_best_block_hash {
            if node_best_block_hash == *last_hash {
                return Ok(false);
            }
        }

        *last_best_block_hash = Some(node_best_block_hash);
        Ok(true)
    }

    /// Clears the txids currently held in the mempool.
    async fn reset_txids(&self) -> Result<(), MempoolError> {
        let mut txids = self.txids.write().await;
        txids.clear();
        let mut txids_seen = self.txids_seen.lock().await;
        txids_seen.clear();
        Ok(())
    }

    /// Returns the txids currently in the mempool.
    pub async fn get_mempool_txids(&self) -> Result<Vec<String>, MempoolError> {
        let txids = self.txids.read().await;
        Ok(txids.clone())
    }

    /// Returns the txids currently in the mempool, filtered by exclude_txids.
    ///
    /// NOTE: THIS impl is inefficient and should be refactored with the addition of the internal mempool.
    pub async fn get_filtered_mempool_txids(
        &self,
        exclude_txids: Vec<String>,
    ) -> Result<Vec<String>, MempoolError> {
        let mempool_txids = self.txids.read().await.clone();

        let mut txids_to_exclude: HashSet<String> = HashSet::new();
        for exclude_txid in &exclude_txids {
            let matching_txids: Vec<&String> = mempool_txids
                .iter()
                .filter(|txid| txid.starts_with(exclude_txid))
                .collect();

            if matching_txids.len() == 1 {
                txids_to_exclude.insert(matching_txids[0].clone());
            }
        }

        let filtered_txids: Vec<String> = mempool_txids
            .into_iter()
            .filter(|txid| !txids_to_exclude.contains(txid))
            .collect();

        Ok(filtered_txids)
    }

    /// Returns the hash of the block currently in the mempool.
    pub async fn get_best_block_hash(
        &self,
    ) -> Result<Option<zebra_chain::block::Hash>, MempoolError> {
        let best_block_hash = self.best_block_hash.read().await;
        Ok(*best_block_hash)
    }
}