@@ -26,23 +26,36 @@ def initialize(broker)
2626 end
2727
2828 def register_handlers
29- self . sig_read , self . sig_write = IO . pipe
29+ Thread . main [ :event_queue ] = Queue . new
3030 register_signal_handlers
31-
32- self . action_read , self . action_write = IO . pipe
33- Thread . main [ :action_queue ] = Queue . new
3431 end
3532
3633 def wait_until_signaled
37- loop do
38- read_pipes = wait_for_signal . first
39- break unless read_pipes . all? { |pipe | read_pipe ( pipe ) }
34+ # Block and wait for messages
35+ while ( event = event_queue . pop )
36+ type , data = event
37+
38+ case type
39+ when :signal
40+ # Return false breaks the loop for graceful shutdown
41+ break unless handle_signal ( data )
42+ when :action
43+ handle_action ( data )
44+ else
45+ raise "Assertion failed - unhandled event: #{ type } "
46+ end
4047 end
4148 end
4249
50+ # Consumer threads call this to push work to the main thread
4351 def push_action ( action , delivery_info , properties , ex )
44- Thread . main [ :action_queue ] << [ action , delivery_info , properties , ex ]
45- action_write . write ( "#{ delivery_info . delivery_tag } \n " )
52+ event_queue << [ :action , {
53+ action : action ,
54+ delivery_info : delivery_info ,
55+ properties : properties ,
56+ ex : ex ,
57+ pushed_at : Time . now . to_f
58+ } ]
4659 end
4760
4861 # return true to continue processing
@@ -65,46 +78,42 @@ def handle_user_signal(sig)
6578
6679 def handle_shutdown_signal ( sig )
6780 logger . info "caught SIG#{ sig } , stopping hutch..."
81+ drain_actions
6882 false
6983 end
7084
71- # return true to continue processing
72- def handle_action ( _delivery_tag )
73- action , delivery_info , properties , ex = Thread . main [ :action_queue ] . pop
74- # TODO: check delivery_tag ??
75- case action
76- when :ack then broker . ack ( delivery_info . delivery_tag )
77- when :nack then acknowledge_error ( delivery_info , properties , ex )
78- else raise "Assertion failed - unhandled action: #{ action . inspect } "
85+ def handle_action ( data )
86+ latency = ( Time . now . to_f - data [ :pushed_at ] ) * 1000
87+
88+ if latency > 5000
89+ logger . warn "Queue latency exceeded 5 seconds"
7990 end
80- true
91+
92+ case data [ :action ]
93+ when :ack then broker . ack ( data [ :delivery_info ] . delivery_tag )
94+ when :nack then acknowledge_error ( data [ :delivery_info ] , data [ :properties ] , data [ :ex ] )
95+ else raise "Assertion failed - unhandled action: #{ action } "
96+ end
97+ rescue => e
98+ logger . error "Error during #{ data [ :action ] } : #{ e . message } "
99+ raise e
81100 end
82101
83102 def acknowledge_error ( delivery_info , properties , ex )
84103 acks = error_acknowledgements +
85- [ Hutch ::Acknowledgements ::NackOnAllFailures . new ]
104+ [ Hutch ::Acknowledgements ::NackOnAllFailures . new ]
86105 acks . find do |backend |
87106 backend . handle ( delivery_info , properties , broker , ex )
88107 end
89108 end
90109
91110 private
92111
93- def read_pipe ( pipe )
94- case pipe
95- when sig_read
96- sig = sig_read . gets . chomp
97- handle_signal ( sig )
98- when action_read
99- delivery_tag = action_read . gets . chomp
100- handle_action ( delivery_tag )
101- end
102- end
103-
104112 def log_thread_backtraces
105113 logger . info 'Requested a VM-wide thread stack trace dump...'
106114 Thread . list . each do |thread |
107- logger . info "Thread TID-#{ thread . object_id . to_s ( 36 ) } #{ thread [ 'label' ] } "
115+ main_label = thread == Thread . main ? 'main' : ''
116+ logger . info "Thread TID-#{ thread . object_id . to_s ( 36 ) } #{ thread [ 'label' ] } #{ main_label } "
108117 logger . info backtrace_for ( thread )
109118 end
110119 end
@@ -118,18 +127,11 @@ def backtrace_for(thread)
118127 end
119128
120129 attr_reader :broker
121- attr_accessor :sig_read , :sig_write , :action_read , :action_write
122-
123- def wait_for_signal
124- IO . select ( [ sig_read , action_read ] )
125- end
126130
127131 def register_signal_handlers
128132 REGISTERED_SIGNALS . each do |sig |
129- # This needs to be reentrant, so we queue up signals to be handled
130- # in the run loop, rather than acting on signals here
131133 trap ( sig ) do
132- sig_write . puts ( sig )
134+ event_queue << [ :signal , sig ]
133135 end
134136 end
135137 end
@@ -139,7 +141,27 @@ def user_signal?(sig)
139141 end
140142
141143 def error_acknowledgements
142- Hutch ::Config [ :error_acknowledgements ]
144+ Hutch ::Config [ :error_acknowledgements ] || [ ]
145+ end
146+
147+ def event_queue
148+ queue = Thread . main [ :event_queue ]
149+ raise 'Undefined main thread queue' unless queue
150+ queue
151+ end
152+
153+ # Drain the queue during shutdown
154+ def drain_actions
155+ queue = event_queue
156+
157+ until queue . empty?
158+ begin
159+ type , data = queue . pop ( true )
160+ handle_action ( data ) if type == :action
161+ rescue ThreadError
162+ break
163+ end
164+ end
143165 end
144166 end
145167end
0 commit comments