Skip to content

Commit 068907e

Browse files
navinaclaude
andauthored
feat(broker): extract doScatter/doReduce hooks in single-stage broker request handlers (#18316)
Co-authored-by: Claude Sonnet 4.6 <[email protected]>
1 parent 671e1ba commit 068907e

3 files changed

Lines changed: 175 additions & 40 deletions

File tree

pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/BaseBrokerStarter.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -279,6 +279,20 @@ protected SingleConnectionBrokerRequestHandler createSingleStageBrokerRequestHan
279279
serverRoutingStatsManager, failureDetector, threadAccountant, multiClusterRoutingContext);
280280
}
281281

282+
/**
283+
* Override to supply a custom {@link GrpcBrokerRequestHandler} subclass.
284+
* The default implementation returns a plain {@link GrpcBrokerRequestHandler}.
285+
*/
286+
protected GrpcBrokerRequestHandler createGrpcBrokerRequestHandler(
287+
PinotConfiguration config, String brokerId, BrokerRequestIdGenerator requestIdGenerator,
288+
RoutingManager routingManager, AccessControlFactory accessControlFactory,
289+
QueryQuotaManager queryQuotaManager, TableCache tableCache, FailureDetector failureDetector,
290+
ThreadAccountant threadAccountant, MultiClusterRoutingContext multiClusterRoutingContext) {
291+
return new GrpcBrokerRequestHandler(config, brokerId, requestIdGenerator, routingManager,
292+
accessControlFactory, queryQuotaManager, tableCache, failureDetector, threadAccountant,
293+
multiClusterRoutingContext);
294+
}
295+
282296
private void setupHelixSystemProperties() {
283297
// NOTE: Helix will disconnect the manager and disable the instance if it detects flapping (too frequent disconnect
284298
// from ZooKeeper). Setting flapping time window to a small value can avoid this from happening. Helix ignores the
@@ -445,7 +459,7 @@ public void start()
445459
BaseSingleStageBrokerRequestHandler singleStageBrokerRequestHandler;
446460
if (brokerRequestHandlerType.equalsIgnoreCase(Broker.GRPC_BROKER_REQUEST_HANDLER_TYPE)) {
447461
singleStageBrokerRequestHandler =
448-
new GrpcBrokerRequestHandler(_brokerConf, brokerId, requestIdGenerator, _routingManager,
462+
createGrpcBrokerRequestHandler(_brokerConf, brokerId, requestIdGenerator, _routingManager,
449463
_accessControlFactory, _queryQuotaManager, _tableCache, _failureDetector, _threadAccountant,
450464
multiClusterRoutingContext);
451465
} else {

pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/GrpcBrokerRequestHandler.java

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -59,9 +59,9 @@
5959
public class GrpcBrokerRequestHandler extends BaseSingleStageBrokerRequestHandler {
6060
private static final Logger LOGGER = LoggerFactory.getLogger(GrpcBrokerRequestHandler.class);
6161

62-
private final StreamingReduceService _streamingReduceService;
63-
private final PinotServerStreamingQueryClient _streamingQueryClient;
64-
private final FailureDetector _failureDetector;
62+
protected final StreamingReduceService _streamingReduceService;
63+
protected final PinotServerStreamingQueryClient _streamingQueryClient;
64+
protected final FailureDetector _failureDetector;
6565

6666
// TODO: Support TLS
6767
public GrpcBrokerRequestHandler(PinotConfiguration config, String brokerId,
@@ -94,15 +94,25 @@ protected BrokerResponseNative processBrokerRequest(long requestId, BrokerReques
9494
BrokerRequest serverBrokerRequest, TableRouteInfo route, long timeoutMs,
9595
ServerStats serverStats, RequestContext requestContext)
9696
throws Exception {
97+
// TODO: Add servers queried/responded stats
98+
assert route.getOfflineBrokerRequest() != null || route.getRealtimeBrokerRequest() != null;
99+
Map<ServerRoutingInstance, Iterator<Server.ServerResponse>> responseMap = doScatter(requestId, route,
100+
requestContext);
101+
return doReduce(originalBrokerRequest, responseMap, timeoutMs);
102+
}
103+
104+
/**
105+
* Executes scatter: sends the query to servers and collects per-server streaming response iterators.
106+
* Subclasses may override to replace or augment the scatter step.
107+
*/
108+
protected Map<ServerRoutingInstance, Iterator<Server.ServerResponse>> doScatter(long requestId, TableRouteInfo route,
109+
RequestContext requestContext) {
97110
BrokerRequest offlineBrokerRequest = route.getOfflineBrokerRequest();
98111
BrokerRequest realtimeBrokerRequest = route.getRealtimeBrokerRequest();
99112
// TODO: Routing bases on Map<ServerInstance, SegmentsToQuery> cannot be supported for logical tables.
100113
// The routing will be replaces to support table to segment list map in the future.
101114
Map<ServerInstance, SegmentsToQuery> offlineRoutingTable = route.getOfflineRoutingTable();
102115
Map<ServerInstance, SegmentsToQuery> realtimeRoutingTable = route.getRealtimeRoutingTable();
103-
104-
// TODO: Add servers queried/responded stats
105-
assert offlineBrokerRequest != null || realtimeBrokerRequest != null;
106116
Map<ServerRoutingInstance, Iterator<Server.ServerResponse>> responseMap = new HashMap<>();
107117
if (offlineBrokerRequest != null) {
108118
assert offlineRoutingTable != null;
@@ -114,6 +124,16 @@ protected BrokerResponseNative processBrokerRequest(long requestId, BrokerReques
114124
sendRequest(requestId, TableType.REALTIME, realtimeBrokerRequest, realtimeRoutingTable, responseMap,
115125
requestContext.isSampledRequest());
116126
}
127+
return responseMap;
128+
}
129+
130+
/**
131+
* Executes the reduce step on the given responseMap.
132+
* Subclasses may override to perform custom reduce logic or augment the responseMap.
133+
*/
134+
protected BrokerResponseNative doReduce(BrokerRequest originalBrokerRequest,
135+
Map<ServerRoutingInstance, Iterator<Server.ServerResponse>> responseMap, long timeoutMs)
136+
throws Exception {
117137
long reduceStartTimeNs = System.nanoTime();
118138
BrokerResponseNative brokerResponse =
119139
_streamingReduceService.reduceOnStreamResponse(originalBrokerRequest, responseMap, timeoutMs, _brokerMetrics);

pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/SingleConnectionBrokerRequestHandler.java

Lines changed: 134 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -66,9 +66,9 @@
6666
public class SingleConnectionBrokerRequestHandler extends BaseSingleStageBrokerRequestHandler {
6767
private static final Logger LOGGER = LoggerFactory.getLogger(SingleConnectionBrokerRequestHandler.class);
6868

69-
private final BrokerReduceService _brokerReduceService;
70-
private final QueryRouter _queryRouter;
71-
private final FailureDetector _failureDetector;
69+
protected final BrokerReduceService _brokerReduceService;
70+
protected final QueryRouter _queryRouter;
71+
protected final FailureDetector _failureDetector;
7272

7373
public SingleConnectionBrokerRequestHandler(PinotConfiguration config, String brokerId,
7474
BrokerRequestIdGenerator requestIdGenerator, RoutingManager routingManager,
@@ -101,36 +101,36 @@ protected BrokerResponseNative processBrokerRequest(long requestId, BrokerReques
101101
BrokerRequest serverBrokerRequest, TableRouteInfo route, long timeoutMs,
102102
ServerStats serverStats, RequestContext requestContext)
103103
throws Exception {
104-
BrokerRequest offlineBrokerRequest = route.getOfflineBrokerRequest();
105-
BrokerRequest realtimeBrokerRequest = route.getRealtimeBrokerRequest();
106-
107-
assert offlineBrokerRequest != null || realtimeBrokerRequest != null;
104+
assert route.getOfflineBrokerRequest() != null || route.getRealtimeBrokerRequest() != null;
108105
if (requestContext.isSampledRequest()) {
109106
serverBrokerRequest.getPinotQuery().putToQueryOptions(CommonConstants.Broker.Request.TRACE, "true");
110107
}
111-
112108
String rawTableName = TableNameBuilder.extractRawTableName(serverBrokerRequest.getQuerySource().getTableName());
113109
long scatterGatherStartTimeNs = System.nanoTime();
114-
AsyncQueryResponse asyncQueryResponse =
115-
_queryRouter.submitQuery(requestId, rawTableName, route, timeoutMs);
110+
ScatterResult scatterResult = doScatter(requestId, rawTableName, route, timeoutMs, serverStats);
111+
return doReduce(originalBrokerRequest, serverBrokerRequest, scatterResult, scatterGatherStartTimeNs, timeoutMs,
112+
rawTableName);
113+
}
114+
115+
/**
116+
* Executes scatter-gather: sends the query to servers and collects per-server DataTables.
117+
* Subclasses may override to replace or augment the scatter step.
118+
*/
119+
protected ScatterResult doScatter(long requestId, String rawTableName, TableRouteInfo route, long timeoutMs,
120+
ServerStats serverStats)
121+
throws Exception {
122+
AsyncQueryResponse asyncQueryResponse = _queryRouter.submitQuery(requestId, rawTableName, route, timeoutMs);
116123
Map<ServerRoutingInstance, ServerResponse> finalResponses = asyncQueryResponse.getFinalResponses();
117-
if (asyncQueryResponse.getStatus() == QueryResponse.Status.TIMED_OUT) {
118-
BrokerMeter meter = QueryOptionsUtils.isSecondaryWorkload(serverBrokerRequest.getPinotQuery().getQueryOptions())
119-
? BrokerMeter.SECONDARY_WORKLOAD_BROKER_RESPONSES_WITH_TIMEOUTS : BrokerMeter.BROKER_RESPONSES_WITH_TIMEOUTS;
120-
_brokerMetrics.addMeteredTableValue(rawTableName, meter, 1);
121-
}
124+
boolean timedOut = asyncQueryResponse.getStatus() == QueryResponse.Status.TIMED_OUT;
122125
ServerRoutingInstance failedServer = asyncQueryResponse.getFailedServer();
123126
if (failedServer != null) {
124127
_failureDetector.markServerUnhealthy(failedServer.getInstanceId(), failedServer.getHostname());
125128
}
126-
_brokerMetrics.addPhaseTiming(rawTableName, BrokerQueryPhase.SCATTER_GATHER,
127-
System.nanoTime() - scatterGatherStartTimeNs);
128129
// TODO Use scatterGatherStats as serverStats
129130
serverStats.setServerStats(asyncQueryResponse.getServerStats());
130131

131-
int numServersQueried = finalResponses.size();
132132
long totalResponseSize = 0;
133-
Map<ServerRoutingInstance, DataTable> dataTableMap = Maps.newHashMapWithExpectedSize(numServersQueried);
133+
Map<ServerRoutingInstance, DataTable> dataTableMap = Maps.newHashMapWithExpectedSize(finalResponses.size());
134134
List<ServerRoutingInstance> serversNotResponded = new ArrayList<>();
135135
for (Map.Entry<ServerRoutingInstance, ServerResponse> entry : finalResponses.entrySet()) {
136136
ServerResponse serverResponse = entry.getValue();
@@ -142,30 +142,48 @@ protected BrokerResponseNative processBrokerRequest(long requestId, BrokerReques
142142
serversNotResponded.add(entry.getKey());
143143
}
144144
}
145-
int numServersResponded = dataTableMap.size();
145+
ScatterResultStats stats = new ScatterResultStats(
146+
dataTableMap.size() + serversNotResponded.size(), dataTableMap.size(), totalResponseSize);
147+
return new ScatterResult(dataTableMap, serversNotResponded, stats, timedOut, asyncQueryResponse.getException());
148+
}
149+
150+
/**
151+
* Executes the reduce step on the scatter result and populates the response with server stats.
152+
* Subclasses may override to perform custom reduce logic, or construct a {@link ScatterResult}
153+
* with a substituted data table map using {@link ScatterResultStats} to preserve server stats.
154+
*/
155+
protected BrokerResponseNative doReduce(BrokerRequest originalBrokerRequest, BrokerRequest serverBrokerRequest,
156+
ScatterResult scatterResult, long scatterGatherStartTimeNs, long timeoutMs, String rawTableName)
157+
throws Exception {
158+
_brokerMetrics.addPhaseTiming(rawTableName, BrokerQueryPhase.SCATTER_GATHER,
159+
System.nanoTime() - scatterGatherStartTimeNs);
160+
161+
if (scatterResult.isTimedOut()) {
162+
BrokerMeter meter = QueryOptionsUtils.isSecondaryWorkload(serverBrokerRequest.getPinotQuery().getQueryOptions())
163+
? BrokerMeter.SECONDARY_WORKLOAD_BROKER_RESPONSES_WITH_TIMEOUTS : BrokerMeter.BROKER_RESPONSES_WITH_TIMEOUTS;
164+
_brokerMetrics.addMeteredTableValue(rawTableName, meter, 1);
165+
}
146166

147167
long reduceStartTimeNs = System.nanoTime();
148168
long reduceTimeoutMs = timeoutMs - TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - scatterGatherStartTimeNs);
149169
BrokerResponseNative brokerResponse =
150-
_brokerReduceService.reduceOnDataTable(originalBrokerRequest, serverBrokerRequest, dataTableMap,
151-
reduceTimeoutMs, _brokerMetrics);
170+
_brokerReduceService.reduceOnDataTable(originalBrokerRequest, serverBrokerRequest,
171+
scatterResult.getDataTableMap(), reduceTimeoutMs, _brokerMetrics);
152172
long reduceTimeNanos = System.nanoTime() - reduceStartTimeNs;
153173
_brokerMetrics.addPhaseTiming(rawTableName, BrokerQueryPhase.REDUCE, reduceTimeNanos);
154174

155-
brokerResponse.setNumServersQueried(numServersQueried);
156-
brokerResponse.setNumServersResponded(numServersResponded);
175+
brokerResponse.setNumServersQueried(scatterResult.getNumServersQueried());
176+
brokerResponse.setNumServersResponded(scatterResult.getNumServersResponded());
157177
brokerResponse.setBrokerReduceTimeMs(TimeUnit.NANOSECONDS.toMillis(reduceTimeNanos));
158178

159-
Exception brokerRequestSendException = asyncQueryResponse.getException();
160-
if (brokerRequestSendException != null) {
161-
brokerResponse.addException(
162-
new QueryProcessingException(QueryErrorCode.BROKER_REQUEST_SEND, brokerRequestSendException.getMessage()));
179+
if (scatterResult.getSendException() != null) {
180+
brokerResponse.addException(new QueryProcessingException(QueryErrorCode.BROKER_REQUEST_SEND,
181+
scatterResult.getSendException().getMessage()));
163182
}
164-
int numServersNotResponded = serversNotResponded.size();
165-
if (numServersNotResponded != 0) {
183+
List<ServerRoutingInstance> serversNotResponded = scatterResult.getServersNotResponded();
184+
if (!serversNotResponded.isEmpty()) {
166185
brokerResponse.addException(new QueryProcessingException(QueryErrorCode.SERVER_NOT_RESPONDING,
167-
String.format("%d servers %s not responded", numServersNotResponded, serversNotResponded)));
168-
186+
String.format("%d servers %s not responded", serversNotResponded.size(), serversNotResponded)));
169187
BrokerMeter meter = QueryOptionsUtils.isSecondaryWorkload(serverBrokerRequest.getPinotQuery().getQueryOptions())
170188
? BrokerMeter.SECONDARY_WORKLOAD_BROKER_RESPONSES_WITH_PARTIAL_SERVERS_RESPONDED
171189
: BrokerMeter.BROKER_RESPONSES_WITH_PARTIAL_SERVERS_RESPONDED;
@@ -174,11 +192,94 @@ protected BrokerResponseNative processBrokerRequest(long requestId, BrokerReques
174192
if (brokerResponse.getExceptionsSize() > 0) {
175193
_brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.BROKER_RESPONSES_WITH_PROCESSING_EXCEPTIONS, 1);
176194
}
177-
_brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.TOTAL_SERVER_RESPONSE_SIZE, totalResponseSize);
195+
_brokerMetrics.addMeteredTableValue(rawTableName, BrokerMeter.TOTAL_SERVER_RESPONSE_SIZE,
196+
scatterResult.getTotalResponseSize());
178197

179198
return brokerResponse;
180199
}
181200

201+
/**
202+
* Snapshot of server-side scatter statistics. Passed to {@link ScatterResult} so that server
203+
* counts are always derived from the live scatter, not from a data table map that may have been
204+
* augmented by a subclass.
205+
*/
206+
public static final class ScatterResultStats {
207+
private final int _numServersQueried;
208+
private final int _numServersResponded;
209+
private final long _totalResponseSize;
210+
211+
public ScatterResultStats(int numServersQueried, int numServersResponded, long totalResponseSize) {
212+
_numServersQueried = numServersQueried;
213+
_numServersResponded = numServersResponded;
214+
_totalResponseSize = totalResponseSize;
215+
}
216+
217+
public int getNumServersQueried() {
218+
return _numServersQueried;
219+
}
220+
221+
public int getNumServersResponded() {
222+
return _numServersResponded;
223+
}
224+
225+
public long getTotalResponseSize() {
226+
return _totalResponseSize;
227+
}
228+
}
229+
230+
/**
231+
* Carries the scatter-gather result before the reduce step.
232+
*/
233+
public static final class ScatterResult {
234+
private final Map<ServerRoutingInstance, DataTable> _dataTableMap;
235+
private final List<ServerRoutingInstance> _serversNotResponded;
236+
private final long _totalResponseSize;
237+
private final boolean _timedOut;
238+
private final Exception _sendException;
239+
private final int _numServersQueried;
240+
private final int _numServersResponded;
241+
242+
public ScatterResult(Map<ServerRoutingInstance, DataTable> dataTableMap,
243+
List<ServerRoutingInstance> serversNotResponded, ScatterResultStats stats,
244+
boolean timedOut, Exception sendException) {
245+
_dataTableMap = dataTableMap;
246+
_serversNotResponded = serversNotResponded;
247+
_totalResponseSize = stats.getTotalResponseSize();
248+
_timedOut = timedOut;
249+
_sendException = sendException;
250+
_numServersQueried = stats.getNumServersQueried();
251+
_numServersResponded = stats.getNumServersResponded();
252+
}
253+
254+
public Map<ServerRoutingInstance, DataTable> getDataTableMap() {
255+
return _dataTableMap;
256+
}
257+
258+
public List<ServerRoutingInstance> getServersNotResponded() {
259+
return _serversNotResponded;
260+
}
261+
262+
public int getNumServersQueried() {
263+
return _numServersQueried;
264+
}
265+
266+
public int getNumServersResponded() {
267+
return _numServersResponded;
268+
}
269+
270+
public long getTotalResponseSize() {
271+
return _totalResponseSize;
272+
}
273+
274+
public boolean isTimedOut() {
275+
return _timedOut;
276+
}
277+
278+
public Exception getSendException() {
279+
return _sendException;
280+
}
281+
}
282+
182283
/**
183284
* Check if a server that was previously detected as unhealthy is now healthy.
184285
*/

0 commit comments

Comments
 (0)