Skip to content

Commit 586aedd

Browse files
committed
feat: show example for pool stream
1 parent 9991cf1 commit 586aedd

2 files changed

Lines changed: 24 additions & 11 deletions

File tree

crates/uni-v4-upkeeper/src/pool_manager_service.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,23 @@ where
137137
slot0_stream.subscribe_pools(angstrom_pool_ids);
138138
}
139139

140+
// Send all initialized pools through the channel on startup
141+
if service.update_sender.is_some() {
142+
let initial_pool_updates: Vec<PoolUpdate> = service
143+
.pools
144+
.get_pools()
145+
.iter()
146+
.map(|entry| PoolUpdate::NewPoolState {
147+
pool_id: *entry.key(),
148+
state: entry.value().clone()
149+
})
150+
.collect();
151+
152+
for update in initial_pool_updates {
153+
service.dispatch_update(update);
154+
}
155+
}
156+
140157
Ok(service)
141158
}
142159

examples/pool_manager_initialization_only.rs

Lines changed: 7 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
1-
use std::{sync::Arc, time::Duration};
1+
use std::{collections::HashMap, sync::Arc, time::Duration};
22

33
use alloy::{
44
eips::BlockNumberOrTag,
55
primitives::address,
66
providers::{Provider, ProviderBuilder, WsConnect}
77
};
8-
use angstrom_v4::slot0::Slot0Client;
98
use eyre::Result;
109
use futures::StreamExt;
1110
use jsonrpsee::ws_client::WsClientBuilder;
@@ -16,7 +15,8 @@ use uni_v4_upkeeper::{
1615
pool_providers::{
1716
completed_block_stream::CompletedBlockStream,
1817
pool_update_provider::{PoolUpdateProvider, StateStream}
19-
}
18+
},
19+
slot0::Slot0Client
2020
};
2121

2222
#[tokio::main]
@@ -105,14 +105,13 @@ async fn main() -> Result<()> {
105105
println!(" ❌ Swap and liquidity events will be filtered out\n");
106106

107107
// Create a local pool instance for the receiver
108-
let initial_pools = service.get_pools();
109108

110109
// Spawn the upkeeper service
111110
tokio::spawn(service);
112111

113112
// Spawn a task to receive and process updates
114113
let _update_processor = tokio::spawn(async move {
115-
let local_pools = initial_pools;
114+
let mut local_pools = HashMap::new();
116115
let mut message_count = 0;
117116
let mut filtered_count = 0;
118117

@@ -122,13 +121,10 @@ async fn main() -> Result<()> {
122121
message_count += 1;
123122

124123
// Log the message type
125-
match &msg {
124+
match msg {
126125
PoolUpdate::NewBlock(block) => {
127126
println!("📦 Block #{}: Received NewBlock", block);
128127
}
129-
PoolUpdate::NewPool { pool_id, .. } => {
130-
println!("🆕 Received NewPool config for pool {:?}", pool_id);
131-
}
132128
PoolUpdate::FeeUpdate { pool_id, bundle_fee, swap_fee, protocol_fee, .. } => {
133129
println!(
134130
"💰 Received FeeUpdate for pool {:?} - bundle: {}, swap: {}, protocol: {}",
@@ -141,7 +137,8 @@ async fn main() -> Result<()> {
141137
PoolUpdate::UpdatedSlot0 { pool_id, .. } => {
142138
println!("📊 Received UpdatedSlot0 for pool {:?}", pool_id);
143139
}
144-
PoolUpdate::NewPoolState { pool_id, .. } => {
140+
PoolUpdate::NewPoolState { pool_id, state } => {
141+
local_pools.insert(pool_id, state);
145142
println!("🏊 Received NewPoolState for pool {:?}", pool_id);
146143
}
147144
PoolUpdate::SwapEvent { .. } => {
@@ -160,7 +157,6 @@ async fn main() -> Result<()> {
160157
}
161158

162159
// Apply the update to our local pool instance
163-
local_pools.update_pools(vec![msg]);
164160

165161
// Print stats every 100 messages
166162
if message_count % 100 == 0 {

0 commit comments

Comments
 (0)