@@ -2,7 +2,7 @@ use std::{
22 collections:: HashSet ,
33 pin:: Pin ,
44 sync:: Arc ,
5- task:: { Context , Poll }
5+ task:: { Context , Poll } ,
66} ;
77
88use alloy:: { primitives:: Address , providers:: Provider } ;
@@ -14,18 +14,18 @@ use uni_v4_structure::BaselinePoolState;
1414
1515use super :: {
1616 baseline_pool_factory:: { BaselinePoolFactory , BaselinePoolFactoryError , UpdateMessage } ,
17- fetch_pool_keys:: set_controller_address
17+ fetch_pool_keys:: set_controller_address,
1818} ;
1919use crate :: { pool_providers:: PoolEventStream , slot0:: Slot0Stream } ;
2020
2121/// Pool information combining BaselinePoolState with token metadata
2222#[ derive( Debug , Clone ) ]
2323pub struct PoolInfo {
24- pub baseline_state : BaselinePoolState ,
25- pub token0 : Address ,
26- pub token1 : Address ,
24+ pub baseline_state : BaselinePoolState ,
25+ pub token0 : Address ,
26+ pub token1 : Address ,
2727 pub token0_decimals : u8 ,
28- pub token1_decimals : u8
28+ pub token1_decimals : u8 ,
2929}
3030
3131#[ derive( Error , Debug ) ]
@@ -37,35 +37,35 @@ pub enum PoolManagerServiceError {
3737 #[ error( "Pool factory error: {0}" ) ]
3838 PoolFactory ( String ) ,
3939 #[ error( "Baseline pool factory error: {0}" ) ]
40- BaselineFactory ( #[ from] BaselinePoolFactoryError )
40+ BaselineFactory ( #[ from] BaselinePoolFactoryError ) ,
4141}
4242
4343/// Service for managing Uniswap V4 pools with real-time block subscription
4444/// updates
4545pub struct PoolManagerService < P , Event , S = ( ) >
4646where
4747 P : Provider + Unpin + Clone + ' static ,
48- Event : PoolEventStream
48+ Event : PoolEventStream ,
4949{
50- pub ( crate ) factory : BaselinePoolFactory < P > ,
51- pub ( crate ) event_stream : Event ,
52- pub ( crate ) pools : UniswapPools ,
53- pub ( crate ) current_block : u64 ,
50+ pub ( crate ) factory : BaselinePoolFactory < P > ,
51+ pub ( crate ) event_stream : Event ,
52+ pub ( crate ) pools : UniswapPools ,
53+ pub ( crate ) current_block : u64 ,
5454 pub ( crate ) auto_pool_creation : bool ,
55- pub ( crate ) slot0_stream : Option < S > ,
55+ pub ( crate ) slot0_stream : Option < S > ,
5656 // If we are loading more ticks at a block, we will queue up updates messages here
5757 // so that we don't hit any race conditions.
58- pending_updates : Vec < PoolUpdate > ,
58+ pending_updates : Vec < PoolUpdate > ,
5959 // Channel for sending updates instead of applying them directly
60- update_sender : Option < mpsc:: Sender < PoolUpdate > >
60+ update_sender : Option < mpsc:: Sender < PoolUpdate > > ,
6161}
6262
6363impl < P , Event , S > PoolManagerService < P , Event , S >
6464where
6565 P : Provider + Clone + Unpin + ' static ,
6666 Event : PoolEventStream ,
6767 BaselinePoolFactory < P > : Stream < Item = UpdateMessage > + Unpin ,
68- S : Slot0Stream
68+ S : Slot0Stream ,
6969{
7070 /// Create a new PoolManagerService
7171 #[ allow( clippy:: too_many_arguments) ]
8383 slot0_stream : Option < S > ,
8484 current_block : Option < u64 > ,
8585 ticks_per_batch : Option < usize > ,
86- update_channel : Option < mpsc:: Sender < PoolUpdate > >
86+ update_channel : Option < mpsc:: Sender < PoolUpdate > > ,
8787 ) -> Result < Self , PoolManagerServiceError > {
8888 // Set the controller address for the fetch_pool_keys module
8989 set_controller_address ( controller_address) ;
@@ -105,7 +105,7 @@ where
105105 tick_band,
106106 tick_edge_threshold,
107107 filter_pool_keys,
108- ticks_per_batch
108+ ticks_per_batch,
109109 )
110110 . await ;
111111
@@ -117,7 +117,7 @@ where
117117 auto_pool_creation,
118118 slot0_stream,
119119 pending_updates : Vec :: new ( ) ,
120- update_sender : update_channel
120+ update_sender : update_channel,
121121 } ;
122122
123123 service
@@ -144,7 +144,7 @@ where
144144 . iter ( )
145145 . map ( |entry| PoolUpdate :: NewPoolState {
146146 pool_id : * entry. key ( ) ,
147- state : entry. value ( ) . clone ( )
147+ state : entry. value ( ) . clone ( ) ,
148148 } )
149149 . collect ( ) ;
150150
@@ -178,14 +178,14 @@ where
178178 block_number : u64 ,
179179 bundle_fee : u32 ,
180180 swap_fee : u32 ,
181- protocol_fee : u32
181+ protocol_fee : u32 ,
182182 ) {
183183 self . factory . queue_pool_creation (
184184 pool_key,
185185 block_number,
186186 bundle_fee,
187187 swap_fee,
188- protocol_fee
188+ protocol_fee,
189189 ) ;
190190 }
191191
@@ -243,7 +243,7 @@ where
243243 swap_fee,
244244 protocol_fee,
245245 tick_spacing,
246- block
246+ block,
247247 } => {
248248 if self . auto_pool_creation {
249249 // Reconstruct pool_key from the NewPool data
@@ -254,7 +254,7 @@ where
254254 * block,
255255 * bundle_fee,
256256 * swap_fee,
257- * protocol_fee
257+ * protocol_fee,
258258 ) ;
259259
260260 tracing:: info!(
@@ -275,6 +275,13 @@ where
275275 pool_id
276276 ) ;
277277 }
278+
279+ if let Some ( slot0_stream) = & mut self . slot0_stream
280+ && let Some ( angstrom_pool_id) =
281+ self . factory . registry ( ) . public_key_from_private ( pool_id)
282+ {
283+ slot0_stream. subscribe_pools ( HashSet :: from ( [ angstrom_pool_id] ) ) ;
284+ }
278285 }
279286 PoolUpdate :: PoolRemoved { pool_id, .. } => {
280287 tracing:: info!( "Pool removed: {:?}" , pool_id) ;
@@ -339,7 +346,7 @@ where
339346 P : Provider + Clone + Unpin + ' static ,
340347 Event : PoolEventStream ,
341348 BaselinePoolFactory < P > : Stream < Item = UpdateMessage > + Unpin ,
342- S : Slot0Stream
349+ S : Slot0Stream ,
343350{
344351 type Output = ( ) ;
345352
@@ -386,7 +393,7 @@ where
386393 this. factory . check_and_request_ticks_if_needed (
387394 * entry. key ( ) ,
388395 entry. value ( ) ,
389- Some ( this. current_block )
396+ Some ( this. current_block ) ,
390397 ) ;
391398 }
392399 }
@@ -415,7 +422,7 @@ where
415422 this. factory . check_and_request_ticks_if_needed (
416423 * entry. key ( ) ,
417424 entry. value ( ) ,
418- Some ( this. current_block )
425+ Some ( this. current_block ) ,
419426 ) ;
420427 }
421428 }
@@ -427,8 +434,11 @@ where
427434 if let Some ( slot0_stream) = this. slot0_stream . as_mut ( ) {
428435 let mut slot0_updates = Vec :: new ( ) ;
429436 while let Poll :: Ready ( Some ( update) ) = slot0_stream. poll_next_unpin ( cx) {
437+ tracing:: error!( "got slot0 update" ) ;
430438 slot0_updates. push ( update) ;
431439 }
440+
441+ tracing:: error!( "LENGTH WHEN DISPATCHING: {}" , slot0_updates. len( ) ) ;
432442 for update in slot0_updates {
433443 let pool_update = PoolUpdate :: Slot0Update ( update) ;
434444 this. dispatch_update ( pool_update) ;
0 commit comments