diff --git a/binaries/cuprated/src/commands.rs b/binaries/cuprated/src/commands.rs index 89e9cd71c..a142ff99e 100644 --- a/binaries/cuprated/src/commands.rs +++ b/binaries/cuprated/src/commands.rs @@ -1,8 +1,9 @@ //! Commands //! //! `cuprated` [`Command`] definition and handling. -use std::{io, thread::sleep, time::Duration}; +use std::{io, path::PathBuf, thread::sleep, time::Duration}; +use bytes::Bytes; use clap::{builder::TypedValueParser, Parser, ValueEnum}; use tokio::sync::mpsc; use tower::{Service, ServiceExt}; @@ -11,12 +12,14 @@ use tracing::level_filters::LevelFilter; use cuprate_consensus_context::{ BlockChainContextRequest, BlockChainContextResponse, BlockchainContextService, }; +use cuprate_dandelion_tower::TxState; use cuprate_helper::time::secs_to_hms; use crate::{ constants::PANIC_CRITICAL_SERVICE_ERROR, - logging::{self, CupratedTracingFilter}, + logging::{self, eprintln_red, CupratedTracingFilter}, statics, + txpool::{IncomingTxHandler, IncomingTxs}, }; /// A command received from [`io::stdin`]. @@ -50,6 +53,12 @@ pub enum Command { /// Print the height of first block not contained in the fast sync hashes. FastSyncStopHeight, + + /// Broadcast a transaction to the network. + BroadcastTx { + /// The path to the file containing the raw hex string of the tx. + tx_file: PathBuf, + }, } /// The log output target. @@ -91,6 +100,7 @@ pub fn command_listener(incoming_commands: mpsc::Sender) -> ! { pub async fn io_loop( mut incoming_commands: mpsc::Receiver, mut context_service: BlockchainContextService, + mut incoming_tx_handler: IncomingTxHandler, ) { loop { let Some(command) = incoming_commands.recv().await else { @@ -131,6 +141,34 @@ pub async fn io_loop( println!("{stop_height}"); } + Command::BroadcastTx { tx_file } => { + let tx = match std::fs::read_to_string(tx_file) { + Ok(tx) => tx, + Err(e) => { + eprintln_red(&format!("Failed to read file: {e}")); + continue; + } + }; + + let Ok(tx) = hex::decode(tx) else { + eprintln_red("Invalid tx hex"); + continue; + }; + + let res = incoming_tx_handler + .ready() + .await + .unwrap() + .call(IncomingTxs { + txs: vec![Bytes::from(tx)], + state: TxState::Local, + }) + .await; + + if let Err(e) = res { + eprintln_red(&format!("{e}")); + } + } } } } diff --git a/binaries/cuprated/src/main.rs b/binaries/cuprated/src/main.rs index 8dc97e29d..2c8b84a0b 100644 --- a/binaries/cuprated/src/main.rs +++ b/binaries/cuprated/src/main.rs @@ -130,7 +130,7 @@ fn main() { context_svc.clone(), blockchain_read_handle.clone(), ); - if incoming_tx_handler_tx.send(tx_handler).is_err() { + if incoming_tx_handler_tx.send(tx_handler.clone()).is_err() { unreachable!() } @@ -151,7 +151,7 @@ fn main() { std::thread::spawn(|| commands::command_listener(command_tx)); // Wait on the io_loop, spawned on a separate task as this improves performance. - tokio::spawn(commands::io_loop(command_rx, context_svc)) + tokio::spawn(commands::io_loop(command_rx, context_svc, tx_handler)) .await .unwrap(); } else {