Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 11 additions & 4 deletions crates/iota-indexer/src/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ impl Indexer {
registry: &Registry,
connection_pool: ConnectionPool,
metrics: IndexerMetrics,
cancel: CancellationToken,
) -> Result<(), IndexerError> {
info!(
"IOTA Indexer Reader (version {:?}) started...",
Expand Down Expand Up @@ -184,10 +185,16 @@ impl Indexer {
info!("No config for HistoricalFallbackReader provided, skipping...");
}

let (handle, cancel) =
build_json_rpc_server(store.clone(), registry, read.clone(), config, metrics)
.await
.expect("json rpc server should not run into errors upon start.");
let handle = build_json_rpc_server(
store.clone(),
registry,
read.clone(),
config,
metrics,
cancel.clone(),
)
.await
.expect("json rpc server should not run into errors upon start.");

tracing::info!("Starting watermark background task to track pruning state");
let watermark_task = WatermarkTask::new(store, watermark_cache);
Expand Down
14 changes: 4 additions & 10 deletions crates/iota-indexer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,8 @@ pub async fn build_json_rpc_server(
reader: IndexerReader,
config: &JsonRpcConfig,
metrics: IndexerMetrics,
) -> Result<(ServerHandle, CancellationToken), IndexerError> {
cancel: CancellationToken,
) -> Result<ServerHandle, IndexerError> {
let mut builder =
JsonRpcServerBuilder::new(env!("CARGO_PKG_VERSION"), prometheus_registry, None, None);

Expand All @@ -72,16 +73,9 @@ pub async fn build_json_rpc_server(
.await?,
))?;

let cancel = CancellationToken::new();

let handle = builder
.start(
config.rpc_address,
None,
ServerType::Http,
Some(cancel.clone()),
)
.start(config.rpc_address, None, ServerType::Http, Some(cancel))
.await?;

Ok((handle, cancel))
Ok(handle)
}
28 changes: 27 additions & 1 deletion crates/iota-indexer/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ async fn main() -> Result<(), IndexerError> {
)?;
spawn_connection_pool_metric_collector(indexer_metrics.clone(), connection_pool.clone());

let cancel = CancellationToken::new();
spawn_shutdown_signal_listener(cancel.clone());

match opts.command {
Command::Indexer {
ingestion_config,
Expand Down Expand Up @@ -100,7 +103,7 @@ async fn main() -> Result<(), IndexerError> {
indexer_metrics,
snapshot_config,
retention_config,
CancellationToken::new(),
cancel.clone(),
)
.await?;
}
Expand All @@ -118,6 +121,7 @@ async fn main() -> Result<(), IndexerError> {
&registry,
connection_pool,
indexer_metrics,
cancel.clone(),
)
.await?;
}
Expand All @@ -139,3 +143,25 @@ async fn main() -> Result<(), IndexerError> {

Ok(())
}

fn spawn_shutdown_signal_listener(token: CancellationToken) {
tokio::spawn(async move {
#[cfg(unix)]
let terminate = async {
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
.expect("cannot listen to SIGTERM signal")
.recv()
.await;
};

#[cfg(not(unix))]
let terminate = std::future::pending::<()>();

tokio::select! {
_ = tokio::signal::ctrl_c() => tracing::info!("shutting down, CTRL+C signal received"),
_ = terminate => tracing::info!("shutting down, SIGTERM signal received"),
};

token.cancel();
});
}
10 changes: 9 additions & 1 deletion crates/iota-indexer/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,15 @@ pub async fn start_test_indexer_impl(
let pool = store.blocking_cp();
let store_clone = store.clone();
tokio::spawn(async move {
Indexer::start_reader(&config, store_clone, &registry, pool, indexer_metrics).await
Indexer::start_reader(
&config,
store_clone,
&registry,
pool,
indexer_metrics,
CancellationToken::new(),
)
.await
})
}
IndexerTypeConfig::Writer {
Expand Down
15 changes: 12 additions & 3 deletions crates/iota-indexer/tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ use tokio::{
sync::{Mutex, OnceCell},
task::JoinHandle,
};
use tokio_util::sync::CancellationToken;

const DEFAULT_DB: &str = "iota_indexer";
const DEFAULT_INDEXER_IP: &str = "127.0.0.1";
Expand Down Expand Up @@ -450,9 +451,17 @@ fn start_indexer_reader(fullnode_rpc_url: impl Into<String>, database_name: Opti

let store = create_pg_store(&db_url, false);

tokio::spawn(
async move { Indexer::start_reader(&config, store, &registry, pool, metrics).await },
);
tokio::spawn(async move {
Indexer::start_reader(
&config,
store,
&registry,
pool,
metrics,
CancellationToken::new(),
)
.await
});
port
}

Expand Down
Loading