@@ -2,30 +2,34 @@ use std::{
22 collections:: HashSet ,
33 pin:: Pin ,
44 sync:: Arc ,
5- task:: { Context , Poll }
5+ task:: { Context , Poll } ,
66} ;
77
88use alloy:: { primitives:: Address , providers:: Provider } ;
99use futures:: { Future , Stream , StreamExt } ;
1010use thiserror:: Error ;
11- use tokio:: sync:: mpsc;
11+ use tokio:: sync:: mpsc:: { self , UnboundedSender } ;
1212use uni_v4_common:: { PoolId , PoolKey , PoolUpdate , UniswapPools } ;
1313use uni_v4_structure:: BaselinePoolState ;
1414
1515use super :: {
1616 baseline_pool_factory:: { BaselinePoolFactory , BaselinePoolFactoryError , UpdateMessage } ,
17- fetch_pool_keys:: set_controller_address
17+ fetch_pool_keys:: set_controller_address,
18+ } ;
19+ use crate :: {
20+ pool_providers:: PoolEventStream ,
21+ slot0_pool_updater:: { Slot0PoolUpdater , Slot0UpgradeCommand } ,
22+ slot0_stream:: Slot0Stream ,
1823} ;
19- use crate :: { pool_providers:: PoolEventStream , slot0:: Slot0Stream } ;
2024
2125/// Pool information combining BaselinePoolState with token metadata
2226#[ derive( Debug , Clone ) ]
2327pub struct PoolInfo {
24- pub baseline_state : BaselinePoolState ,
25- pub token0 : Address ,
26- pub token1 : Address ,
28+ pub baseline_state : BaselinePoolState ,
29+ pub token0 : Address ,
30+ pub token1 : Address ,
2731 pub token0_decimals : u8 ,
28- pub token1_decimals : u8
32+ pub token1_decimals : u8 ,
2933}
3034
3135#[ derive( Error , Debug ) ]
@@ -37,39 +41,38 @@ pub enum PoolManagerServiceError {
3741 #[ error( "Pool factory error: {0}" ) ]
3842 PoolFactory ( String ) ,
3943 #[ error( "Baseline pool factory error: {0}" ) ]
40- BaselineFactory ( #[ from] BaselinePoolFactoryError )
44+ BaselineFactory ( #[ from] BaselinePoolFactoryError ) ,
4145}
4246
4347/// Service for managing Uniswap V4 pools with real-time block subscription
4448/// updates
45- pub struct PoolManagerService < P , Event , S = ( ) >
49+ pub struct PoolManagerService < P , Event >
4650where
4751 P : Provider + Unpin + Clone + ' static ,
48- Event : PoolEventStream
52+ Event : PoolEventStream ,
4953{
50- pub ( crate ) factory : BaselinePoolFactory < P > ,
51- pub ( crate ) event_stream : Event ,
52- pub ( crate ) pools : UniswapPools ,
53- pub ( crate ) current_block : u64 ,
54+ pub ( crate ) factory : BaselinePoolFactory < P > ,
55+ pub ( crate ) event_stream : Event ,
56+ pub ( crate ) pools : UniswapPools ,
57+ pub ( crate ) current_block : u64 ,
5458 pub ( crate ) auto_pool_creation : bool ,
55- pub ( crate ) slot0_stream : Option < S > ,
59+ pub ( crate ) slot0_stream_updater_tx : Option < UnboundedSender < Slot0UpgradeCommand > > ,
5660 // If we are loading more ticks at a block, we will queue up updates messages here
5761 // so that we don't hit any race conditions.
58- pending_updates : Vec < PoolUpdate > ,
62+ pending_updates : Vec < PoolUpdate > ,
5963 // Channel for sending updates instead of applying them directly
60- update_sender : Option < mpsc:: Sender < PoolUpdate > >
64+ update_sender : Option < mpsc:: Sender < PoolUpdate > > ,
6165}
6266
63- impl < P , Event , S > PoolManagerService < P , Event , S >
67+ impl < P , Event > PoolManagerService < P , Event >
6468where
6569 P : Provider + Clone + Unpin + ' static ,
6670 Event : PoolEventStream ,
6771 BaselinePoolFactory < P > : Stream < Item = UpdateMessage > + Unpin ,
68- S : Slot0Stream
6972{
7073 /// Create a new PoolManagerService
7174 #[ allow( clippy:: too_many_arguments) ]
72- pub async fn new (
75+ pub async fn new < S : Slot0Stream > (
7376 provider : Arc < P > ,
7477 event_stream : Event ,
7578 angstrom_address : Address ,
@@ -80,10 +83,10 @@ where
8083 tick_edge_threshold : Option < u16 > ,
8184 filter_pool_keys : Option < HashSet < PoolKey > > ,
8285 auto_pool_creation : bool ,
83- slot0_stream : Option < S > ,
86+ mut slot0_stream : Option < S > ,
8487 current_block : Option < u64 > ,
8588 ticks_per_batch : Option < usize > ,
86- update_channel : Option < mpsc:: Sender < PoolUpdate > >
89+ update_channel : Option < mpsc:: Sender < PoolUpdate > > ,
8790 ) -> Result < Self , PoolManagerServiceError > {
8891 // Set the controller address for the fetch_pool_keys module
8992 set_controller_address ( controller_address) ;
@@ -105,19 +108,24 @@ where
105108 tick_band,
106109 tick_edge_threshold,
107110 filter_pool_keys,
108- ticks_per_batch
111+ ticks_per_batch,
109112 )
110113 . await ;
111114
115+ let ( slot0_stream_updater_tx, slot0_stream_updater_rx) = slot0_stream
116+ . is_some ( )
117+ . then ( tokio:: sync:: mpsc:: unbounded_channel)
118+ . unzip ( ) ;
119+
112120 let mut service = Self {
113121 event_stream,
114122 factory,
115123 pools : UniswapPools :: new ( pools, current_block) ,
116124 current_block : deploy_block,
117125 auto_pool_creation,
118- slot0_stream ,
126+ slot0_stream_updater_tx ,
119127 pending_updates : Vec :: new ( ) ,
120- update_sender : update_channel
128+ update_sender : update_channel,
121129 } ;
122130
123131 service
@@ -130,10 +138,15 @@ where
130138 }
131139
132140 // Subscribe all initial pools to slot0 stream if present (using angstrom IDs)
133- if let Some ( slot0_stream) = & mut service . slot0_stream {
141+ if let Some ( mut slot0_stream) = slot0_stream. take ( ) {
134142 let angstrom_pool_ids: HashSet < PoolId > =
135143 service. factory . registry ( ) . public_keys ( ) . collect ( ) ;
136144 slot0_stream. subscribe_pools ( angstrom_pool_ids) ;
145+ Slot0PoolUpdater :: < S > :: spawn_new (
146+ slot0_stream,
147+ service. pools . clone ( ) ,
148+ slot0_stream_updater_rx. unwrap ( ) ,
149+ ) ;
137150 }
138151
139152 // Send all initialized pools through the channel on startup
@@ -144,7 +157,7 @@ where
144157 . iter ( )
145158 . map ( |entry| PoolUpdate :: NewPoolState {
146159 pool_id : * entry. key ( ) ,
147- state : entry. value ( ) . clone ( )
160+ state : entry. value ( ) . clone ( ) ,
148161 } )
149162 . collect ( ) ;
150163
@@ -178,14 +191,14 @@ where
178191 block_number : u64 ,
179192 bundle_fee : u32 ,
180193 swap_fee : u32 ,
181- protocol_fee : u32
194+ protocol_fee : u32 ,
182195 ) {
183196 self . factory . queue_pool_creation (
184197 pool_key,
185198 block_number,
186199 bundle_fee,
187200 swap_fee,
188- protocol_fee
201+ protocol_fee,
189202 ) ;
190203 }
191204
@@ -243,7 +256,7 @@ where
243256 swap_fee,
244257 protocol_fee,
245258 tick_spacing,
246- block
259+ block,
247260 } => {
248261 if self . auto_pool_creation {
249262 // Reconstruct pool_key from the NewPool data
@@ -254,7 +267,7 @@ where
254267 * block,
255268 * bundle_fee,
256269 * swap_fee,
257- * protocol_fee
270+ * protocol_fee,
258271 ) ;
259272
260273 tracing:: info!(
@@ -282,8 +295,9 @@ where
282295 self . factory . remove_pool_by_id ( * pool_id) ;
283296
284297 // Unsubscribe pool from slot0 stream (pool_id here is already angstrom ID)
285- if let Some ( slot0_stream) = & mut self . slot0_stream {
286- slot0_stream. unsubscribe_pools ( HashSet :: from ( [ * pool_id] ) ) ;
298+ if let Some ( slot0_stream_tx) = self . slot0_stream_updater_tx . as_ref ( ) {
299+ let _ = slot0_stream_tx
300+ . send ( Slot0UpgradeCommand :: UnsubscribePools ( HashSet :: from ( [ * pool_id] ) ) ) ;
287301 }
288302 }
289303 PoolUpdate :: FeeUpdate { pool_id, bundle_fee, swap_fee, protocol_fee, .. } => {
@@ -316,11 +330,14 @@ where
316330 self . event_stream . start_tracking_pool ( * pool_id) ;
317331
318332 // Subscribe new pool to slot0 stream (using angstrom ID)
319- if let Some ( slot0_stream ) = & mut self . slot0_stream
333+ if let Some ( slot0_stream_tx ) = self . slot0_stream_updater_tx . as_ref ( )
320334 && let Some ( angstrom_pool_id) =
321335 self . factory . registry ( ) . public_key_from_private ( pool_id)
322336 {
323- slot0_stream. subscribe_pools ( HashSet :: from ( [ angstrom_pool_id] ) ) ;
337+ let _ =
338+ slot0_stream_tx. send ( Slot0UpgradeCommand :: SubscribePools ( HashSet :: from ( [
339+ angstrom_pool_id,
340+ ] ) ) ) ;
324341 }
325342
326343 tracing:: info!( "Tracking new pool from factory: {:?}" , pool_id) ;
@@ -334,12 +351,11 @@ where
334351 }
335352}
336353
337- impl < P , Event , S > Future for PoolManagerService < P , Event , S >
354+ impl < P , Event > Future for PoolManagerService < P , Event >
338355where
339356 P : Provider + Clone + Unpin + ' static ,
340357 Event : PoolEventStream ,
341358 BaselinePoolFactory < P > : Stream < Item = UpdateMessage > + Unpin ,
342- S : Slot0Stream
343359{
344360 type Output = ( ) ;
345361
@@ -386,7 +402,7 @@ where
386402 this. factory . check_and_request_ticks_if_needed (
387403 * entry. key ( ) ,
388404 entry. value ( ) ,
389- Some ( this. current_block )
405+ Some ( this. current_block ) ,
390406 ) ;
391407 }
392408 }
@@ -415,7 +431,7 @@ where
415431 this. factory . check_and_request_ticks_if_needed (
416432 * entry. key ( ) ,
417433 entry. value ( ) ,
418- Some ( this. current_block )
434+ Some ( this. current_block ) ,
419435 ) ;
420436 }
421437 }
@@ -424,16 +440,16 @@ where
424440 }
425441 }
426442
427- if let Some ( slot0_stream) = this. slot0_stream . as_mut ( ) {
428- let mut slot0_updates = Vec :: new ( ) ;
429- while let Poll :: Ready ( Some ( update) ) = slot0_stream. poll_next_unpin ( cx) {
430- slot0_updates. push ( update) ;
431- }
432- for update in slot0_updates {
433- let pool_update = PoolUpdate :: Slot0Update ( update) ;
434- this. dispatch_update ( pool_update) ;
435- }
436- }
443+ // if let Some(slot0_stream) = this.slot0_stream.as_mut() {
444+ // let mut slot0_updates = Vec::new();
445+ // while let Poll::Ready(Some(update)) = slot0_stream.poll_next_unpin(cx) {
446+ // slot0_updates.push(update);
447+ // }
448+ // for update in slot0_updates {
449+ // let pool_update = PoolUpdate::Slot0Update(update);
450+ // this.dispatch_update(pool_update);
451+ // }
452+ // }
437453
438454 Poll :: Pending
439455 }
0 commit comments