Skip to content

Commit 967a7bb

Browse files
committed
feat: add example that uses http stream
1 parent 2ae64cf commit 967a7bb

1 file changed

Lines changed: 120 additions & 0 deletions

File tree

examples/pool_manager_http.rs

Lines changed: 120 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
use std::{collections::HashSet, sync::Arc, time::Duration};
2+
3+
use alloy::{
4+
primitives::address,
5+
providers::{Provider, ProviderBuilder}
6+
};
7+
use angstrom_v4::{
8+
pool_providers::{
9+
completed_block_stream::CompletedBlockStream,
10+
pool_update_provider::{PoolUpdateProvider, StateStream}
11+
},
12+
pools::PoolId,
13+
slot0::NoOpSlot0Stream,
14+
uniswap::{
15+
pool_manager_service_builder::PoolManagerServiceBuilder, pool_registry::UniswapPoolRegistry
16+
}
17+
};
18+
use futures::StreamExt;
19+
20+
const BLOCK_REQUERY: Duration = Duration::from_millis(150);
21+
22+
/// Example demonstrating PoolManagerServiceBuilder without slot0 stream
23+
///
24+
/// This example shows how to:
25+
/// 1. Use PoolManagerServiceBuilder for a simpler setup without real-time
26+
/// updates
27+
/// 2. Rely solely on block-based events for pool state changes
28+
/// 3. Create services with different configurations (with/without event stream)
29+
/// 4. Process pool updates through block events only
30+
#[tokio::main]
31+
async fn main() -> eyre::Result<()> {
32+
// Initialize logging
33+
tracing_subscriber::fmt::init();
34+
35+
// Setup provider
36+
let rpc_url = std::env::var("RPC_URL").expect("no rpc url set, must be ws");
37+
38+
let provider = Arc::new(ProviderBuilder::new().connect(&rpc_url).await.unwrap());
39+
40+
// Configuration addresses (replace with actual deployment addresses)
41+
let angstrom_address = address!("0x0000000aa232009084Bd71A5797d089AA4Edfad4");
42+
let controller_address = address!("0x1746484EA5e11C75e009252c102C8C33e0315fD4");
43+
let pool_manager_address = address!("0x000000000004444c5dc75cB358380D2e3dE08A90");
44+
let deploy_block = 22971782;
45+
46+
// Example 1: Basic setup without any streams (static pool state)
47+
48+
// Example 2: Setup with event stream but no slot0 stream
49+
println!("\n📡 Example 1: Building pool manager with event stream only...");
50+
51+
// Create event stream for block-based updates
52+
let pool_registry = UniswapPoolRegistry::default();
53+
let update_provider = PoolUpdateProvider::new(
54+
provider.clone(),
55+
pool_manager_address,
56+
controller_address,
57+
angstrom_address,
58+
pool_registry
59+
)
60+
.await;
61+
62+
let block = provider.get_block_number().await.unwrap();
63+
let prev_block_hash = provider
64+
.get_block_by_number(block.into())
65+
.hashes()
66+
.await
67+
.unwrap()
68+
.unwrap()
69+
.hash();
70+
71+
// Work-around for not having ws.
72+
let mut stream = provider.watch_full_blocks().await.unwrap().full();
73+
stream.set_poll_interval(BLOCK_REQUERY);
74+
let block_stream = stream.into_stream().map(|block| block.unwrap()).boxed();
75+
76+
let block_stream =
77+
CompletedBlockStream::new(prev_block_hash, block, provider.clone(), block_stream);
78+
let event_stream = StateStream::new(update_provider, block_stream);
79+
// Build service with event stream but without slot0 stream
80+
let service = PoolManagerServiceBuilder::<_, _, NoOpSlot0Stream>::new(
81+
provider.clone(),
82+
angstrom_address,
83+
controller_address,
84+
pool_manager_address,
85+
deploy_block,
86+
event_stream
87+
)
88+
.with_initial_tick_range_size(100) // Custom tick range
89+
.with_tick_edge_threshold(30)
90+
.build()
91+
.await?;
92+
93+
println!("✅ Pool service initialized!");
94+
println!("📊 Found {} pools", service.get_pools().len());
95+
println!("🔗 Current block: {}", service.current_block());
96+
97+
// Get all pool IDs to subscribe to slot0 updates
98+
let pool_ids: HashSet<PoolId> = service
99+
.get_pools()
100+
.iter()
101+
.map(|entry| *entry.key())
102+
.collect();
103+
println!("started with pool_ids {pool_ids:#?}");
104+
105+
let pools = service.get_pools();
106+
// spawn the upkeeper service.
107+
tokio::spawn(service);
108+
109+
// Main event loop - process both block events and slot0 updates
110+
println!("🔄 Starting event processing loop...");
111+
println!(" Block events: Pool creations, swaps, mints, burns");
112+
println!(" Slot0 updates: Real-time price, liquidity, and tick changes");
113+
println!("Press Ctrl+C to stop");
114+
115+
loop {
116+
tokio::time::sleep(Duration::from_secs(12)).await;
117+
let updated_to_block = pools.get_block();
118+
println!("pools are updated to block number: {updated_to_block}");
119+
}
120+
}

0 commit comments

Comments
 (0)