Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 43 additions & 9 deletions src/chain/bitcoind_rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ use std::sync::Arc;
pub struct BitcoindRpcClient {
rpc_client: Arc<RpcClient>,
latest_mempool_timestamp: AtomicU64,
mempool_entries_cache: tokio::sync::Mutex<HashMap<Txid, MempoolEntry>>,
mempool_txs_cache: tokio::sync::Mutex<HashMap<Txid, (Transaction, u64)>>,
Comment thread
arik-so marked this conversation as resolved.
}

impl BitcoindRpcClient {
Expand All @@ -42,7 +44,9 @@ impl BitcoindRpcClient {

let latest_mempool_timestamp = AtomicU64::new(0);

Self { rpc_client, latest_mempool_timestamp }
let mempool_entries_cache = tokio::sync::Mutex::new(HashMap::new());
let mempool_txs_cache = tokio::sync::Mutex::new(HashMap::new());
Self { rpc_client, latest_mempool_timestamp, mempool_entries_cache, mempool_txs_cache }
}

pub(crate) fn rpc_client(&self) -> Arc<RpcClient> {
Expand Down Expand Up @@ -160,16 +164,30 @@ impl BitcoindRpcClient {
}
}

pub(crate) async fn get_mempool_entries(&self) -> std::io::Result<Vec<MempoolEntry>> {
pub(crate) async fn update_mempool_entries_cache(&self) -> std::io::Result<()> {
let mempool_txids = self.get_raw_mempool().await?;
let mut mempool_entries = Vec::with_capacity(mempool_txids.len());

let mut mempool_entries_cache = self.mempool_entries_cache.lock().await;
mempool_entries_cache.retain(|txid, _| mempool_txids.contains(txid));

if let Some(difference) = mempool_txids.len().checked_sub(mempool_entries_cache.capacity())
{
mempool_entries_cache.reserve(difference)
}

for txid in mempool_txids {
// Push any entries that haven't been dropped since `getrawmempool`
if mempool_entries_cache.contains_key(&txid) {
continue;
}

if let Some(entry) = self.get_mempool_entry(txid).await? {
mempool_entries.push(entry);
mempool_entries_cache.insert(txid, entry.clone());
}
}
Ok(mempool_entries)

mempool_entries_cache.shrink_to_fit();

Ok(())
}

/// Get mempool transactions, alongside their first-seen unix timestamps.
Expand All @@ -183,10 +201,20 @@ impl BitcoindRpcClient {
let prev_mempool_time = self.latest_mempool_timestamp.load(Ordering::Relaxed);
let mut latest_time = prev_mempool_time;

let mempool_entries = self.get_mempool_entries().await?;
let mut txs_to_emit = Vec::new();
self.update_mempool_entries_cache().await?;

let mempool_entries_cache = self.mempool_entries_cache.lock().await;
let mut mempool_txs_cache = self.mempool_txs_cache.lock().await;
mempool_txs_cache.retain(|txid, _| mempool_entries_cache.contains_key(txid));

if let Some(difference) =
mempool_entries_cache.len().checked_sub(mempool_txs_cache.capacity())
{
mempool_txs_cache.reserve(difference)
}

for entry in mempool_entries {
let mut txs_to_emit = Vec::with_capacity(mempool_entries_cache.len());
for (txid, entry) in mempool_entries_cache.iter() {
if entry.time > latest_time {
latest_time = entry.time;
}
Expand All @@ -202,8 +230,14 @@ impl BitcoindRpcClient {
continue;
}

if let Some((cached_tx, cached_time)) = mempool_txs_cache.get(txid) {
txs_to_emit.push((cached_tx.clone(), *cached_time));
continue;
}

match self.get_raw_transaction(&entry.txid).await {
Ok(Some(tx)) => {
mempool_txs_cache.insert(entry.txid, (tx.clone(), entry.time));
txs_to_emit.push((tx, entry.time));
},
Ok(None) => {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should the case of having a txid among the mempool entries but not being able to find its full tx hex be handled somehow?

Copy link
Copy Markdown
Collaborator Author

@tnull tnull Feb 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, we handle it by skipping :)

Note that we used to log/error out in some of these races, but its exactly the point to get rid of that behavior in this PR. In fact, turns out you'll regularly run into these races between getrawmempool/getmempoolentry and getmempoolentry/getrawtransaction, since entries are dropped often from the mempool. So we can just skip processing them and will re-detect them if they would reappear in the mempool.

Expand Down