@@ -73,30 +73,18 @@ pub type BoxConsumerStream =
7373pub type BoxConsumerStream =
7474 Pin < Box < dyn ConsumerStream < Item = Result < ConsumerRecord , ErrorCode > > + Send + ' static > > ;
7575
76+ type ShararedConsumerStream = Arc < Mutex < BoxConsumerStream > > ;
77+
78+ type ConsumerFutureOutput = (
79+ ShararedConsumerStream ,
80+ Option < Result < ( ConsumerRecord , Option < i64 > ) , ErrorCode > > ,
81+ ) ;
82+
7683/// Type alias to access consume stream as a future.
7784#[ cfg( target_arch = "wasm32" ) ]
78- type BoxConsumerFuture = Pin <
79- Box <
80- dyn Future <
81- Output = (
82- Arc < Mutex < BoxConsumerStream > > ,
83- Option < Result < ( ConsumerRecord , Option < i64 > ) , ErrorCode > > ,
84- ) ,
85- > + ' static ,
86- > ,
87- > ;
85+ type BoxConsumerFuture = Pin < Box < dyn Future < Output = ConsumerFutureOutput > + ' static > > ;
8886#[ cfg( not( target_arch = "wasm32" ) ) ]
89- type BoxConsumerFuture = Pin <
90- Box <
91- dyn Future <
92- Output = (
93- Arc < Mutex < BoxConsumerStream > > ,
94- Option < Result < ( ConsumerRecord , Option < i64 > ) , ErrorCode > > ,
95- ) ,
96- > + Send
97- + ' static ,
98- > ,
99- > ;
87+ type BoxConsumerFuture = Pin < Box < dyn Future < Output = ConsumerFutureOutput > + Send + ' static > > ;
10088
10189/// An interface for consuming events from a particular partition
10290///
@@ -617,7 +605,8 @@ where
617605 & self ,
618606 config : ConsumerConfigExt ,
619607 ) -> Result < SinglePartitionConsumerStream < impl Stream < Item = Result < Record , ErrorCode > > > > {
620- let ( offset, config, consumer_id, strategy, flush_period) = config. into_parts ( ) ;
608+ let ( offset, config, consumer_id, strategy, flush_period, flusher_check_period) =
609+ config. into_parts ( ) ;
621610 let ( stream, start_offset, stream_to_server) = self
622611 . inner_stream_batches_with_config ( offset, config, consumer_id)
623612 . await ?;
@@ -642,6 +631,7 @@ where
642631 flattened,
643632 strategy,
644633 flush_period,
634+ flusher_check_period,
645635 stream_to_server,
646636 ) )
647637 }
0 commit comments