@@ -46,8 +46,12 @@ public void handle(HttpExchange t) throws IOException {
4646 t .sendResponseHeaders (200 , 0 );
4747 InputStream is = t .getRequestBody ();
4848 OutputStream os = t .getResponseBody ();
49+ boolean blob = false ;
50+ if (t .getRequestURI ().getRawQuery () != null && t .getRequestURI ().getRawQuery ().contains ("blob" )) {
51+ blob = true ;
52+ }
4953 try {
50- new Parse (is , os , false );
54+ new Parse (is , os , blob );
5155 } catch (Exception e ) {
5256 e .printStackTrace ();
5357 }
@@ -71,25 +75,25 @@ public void handle(HttpExchange t) throws IOException {
7175 try {
7276 Map <String , String > query = splitQuery (t .getRequestURI ());
7377 URI replayUrl = URI .create (query .get ("replay_url" ));
74- // boolean v2 = t.getRequestURI().getRawQuery() != null ?
75- // t.getRequestURI().getRawQuery().contains("v2") : false;
76- // boolean v2 = true;
77-
7878 // Get the replay as a byte[]
79+ long tStart = System .currentTimeMillis ();
7980 HttpClient client = HttpClient .newHttpClient ();
8081 HttpRequest request = HttpRequest .newBuilder ()
8182 .timeout (Duration .ofSeconds (145 ))
8283 .uri (replayUrl )
8384 .build ();
84- HttpResponse <byte []> response = client .send (request , HttpResponse .BodyHandlers .ofByteArray ());
85+ HttpResponse <byte []> response = client .send (request ,
86+ HttpResponse .BodyHandlers .ofByteArray ());
87+ long tEnd = System .currentTimeMillis ();
88+ System .err .format ("download: %dms\n " , tEnd - tStart );
8589
8690 byte [] bzIn = response .body ();
8791 byte [] bzOut = bzIn ;
8892
8993 if (replayUrl .toString ().endsWith (".bz2" )) {
94+ tStart = System .currentTimeMillis ();
9095 // Write byte[] to bunzip, get back decompressed byte[]
9196 Process bz = new ProcessBuilder (new String [] { "bunzip2" }).start ();
92-
9397 // Start separate thread so we can consume output while sending input
9498 Thread thread = new Thread (() -> {
9599 try {
@@ -102,79 +106,78 @@ public void handle(HttpExchange t) throws IOException {
102106 thread .start ();
103107
104108 bzOut = bz .getInputStream ().readAllBytes ();
105- System .err .println (new String (bz .getErrorStream ().readAllBytes ()));
109+ String bzError = new String (bz .getErrorStream ().readAllBytes ());
110+ System .err .println (bzError );
111+ tEnd = System .currentTimeMillis ();
112+ System .err .format ("bunzip2: %dms\n " , tEnd - tStart );
106113 }
107114
108115 // Start parser with input stream created from byte[]
116+ tStart = System .currentTimeMillis ();
109117 ByteArrayOutputStream baos2 = new ByteArrayOutputStream ();
110118 new Parse (new ByteArrayInputStream (bzOut ), baos2 , true );
111119 byte [] parseOut = baos2 .toByteArray ();
120+ tEnd = System .currentTimeMillis ();
121+ System .err .format ("parse: %dms\n " , tEnd - tStart );
112122
113123 t .sendResponseHeaders (200 , parseOut .length );
114124 t .getResponseBody ().write (parseOut );
115125 t .getResponseBody ().close ();
116126 } catch (Exception ex ) {
117- // TODO handle timeouts and corrupted replays (don't retry in those cases)
118127 ex .printStackTrace ();
119128 t .sendResponseHeaders (500 , 0 );
120129 t .getResponseBody ().close ();
121130 }
122-
131+ // long tStart = System.currentTimeMillis();
123132 // String cmd = String.format("""
124- // curl --max-time 145 --fail -L %s | %s | curl -X POST -T - "localhost:5600%s"
125- // %s
126- // """,
127- // replayUrl,
128- // replayUrl.toString().endsWith(".bz2") ? "bunzip2" : "cat",
129- // v2 ? "?blob" : "",
130- // v2 ? "" : " | node processors/createParsedDataBlob.mjs"
131- // );
133+ // curl --max-time 145 --fail -L %s | %s | curl -X POST -T - "localhost:5600?blob"
134+ // """,
135+ // replayUrl.toString(),
136+ // replayUrl.toString().endsWith(".bz2") ? "bunzip2" : "cat");
132137 // System.err.println(cmd);
133- // // Download, unzip, parse, aggregate
134- // Process proc = new ProcessBuilder(new String[] {"bash", "-c", cmd})
135- // .start();
136- // ByteArrayOutputStream output = new ByteArrayOutputStream();
137- // ByteArrayOutputStream error = new ByteArrayOutputStream();
138- // copy(proc.getInputStream(), output);
139- // // Write error to console
140- // copy(proc.getErrorStream(), error);
141- // System.err.println(error.toString());
138+ // Process proc = new ProcessBuilder(new String[] { "bash", "-c", cmd })
139+ // .start();
140+ // byte[] parseOut = proc.getInputStream().readAllBytes();
141+ // String error = new String(proc.getErrorStream().readAllBytes());
142+ // System.err.println(error);
142143 // int exitCode = proc.waitFor();
143- // if (exitCode != 0) {
144- // // We can send 200 status here and no response if expected error (read the
145- // error string)
146- // // Maybe we can pass the specific error info in the response headers
147- // int status = 500;
148- // if (error.toString().contains("curl: (28) Operation timed out")) {
149- // // Parse took too long, maybe China replay?
150- // status = 200;
151- // }
152- // if (error.toString().contains("curl: (22) The requested URL returned error:
153- // 502")) {
154- // // Google-Edge-Cache: origin retries exhausted Error: 2010
155- // // Server error, don't retry
156- // status = 200;
157- // }
158- // if (error.toString().contains("bunzip2: Data integrity error when
159- // decompressing")) {
160- // // Corrupted replay, don't retry
161- // status = 200;
162- // }
163- // if (error.toString().contains("bunzip2: Compressed file ends unexpectedly"))
164- // {
165- // // Corrupted replay, don't retry
166- // status = 200;
167- // }
168- // if (error.toString().contains("bunzip2: (stdin) is not a bzip2 file.")) {
169- // // Tried to unzip a non-bz2 file
170- // status = 200;
171- // }
172- // t.sendResponseHeaders(status, 0);
173- // t.getResponseBody().close();
144+ // long tEnd = System.currentTimeMillis();
145+ // System.err.format("download/bunzip2/parse: %sms\n", tEnd - tStart);
146+ // if (exitCode == 0) {
147+ // t.sendResponseHeaders(200, parseOut.length);
148+ // t.getResponseBody().write(parseOut);
149+ // t.getResponseBody().close();
174150 // } else {
175- // t.sendResponseHeaders(200, output.size());
176- // output.writeTo(t.getResponseBody());
177- // t.getResponseBody().close();
151+ // // We can send 200 status here and no response if expected error
152+ // // Maybe we can pass the specific error info in the response headers
153+ // int status = 500;
154+ // if (error.toString().contains("curl: (28) Operation timed out")) {
155+ // // Parse took too long, maybe China replay?
156+ // status = 200;
157+ // }
158+ // if (error.toString().contains("curl: (22) The requested URL returned error: 502")) {
159+ // // Google-Edge-Cache: origin retries exhausted Error: 2010
160+ // // Server error, don't retry
161+ // status = 200;
162+ // }
163+ // if (error.toString().contains("bunzip2: Data integrity error when decompressing")) {
164+ // // Corrupted replay, don't retry
165+ // status = 200;
166+ // }
167+ // if (error.toString().contains("bunzip2: Compressed file ends unexpectedly")) {
168+ // // Corrupted replay, don't retry
169+ // status = 200;
170+ // }
171+ // if (error.toString().contains("bunzip2: (stdin) is not a bzip2 file.")) {
172+ // // Tried to unzip a non-bz2 file
173+ // status = 200;
174+ // }
175+ // if (status == 200) {
176+ // t.sendResponseHeaders(status, 0);
177+ // t.getResponseBody().close();
178+ // } else {
179+ // throw new Exception("Unexpected error in parse pipeline");
180+ // }
178181 // }
179182 }
180183 }
0 commit comments