@@ -10,12 +10,11 @@ import nextflow.co2footprint.FileCreation.TraceFileCreator
1010import nextflow.co2footprint.Records.CO2Record
1111import nextflow.co2footprint.Records.CO2RecordTree
1212import nextflow.co2footprint.Records.CiRecordCollector
13- import nextflow.processor.TaskHandler
1413import nextflow.processor.TaskId
15- import nextflow.processor.TaskProcessor
1614import nextflow.script.WorkflowMetadata
17- import nextflow.trace.TraceObserver
15+ import nextflow.trace.TraceObserverV2
1816import nextflow.trace.TraceRecord
17+ import nextflow.trace.event.TaskEvent
1918
2019import java.util.concurrent.ConcurrentHashMap
2120
@@ -30,7 +29,7 @@ import java.util.concurrent.ConcurrentHashMap
3029 * Josua Carl <josua.carl@uni-tuebingen.de>
3130 */
3231@Slf4j
33- class CO2FootprintObserver implements TraceObserver {
32+ class CO2FootprintObserver implements TraceObserverV2 {
3433 // Holds workflow session
3534 Session session
3635
@@ -97,20 +96,15 @@ class CO2FootprintObserver implements TraceObserver {
9796 boolean enableMetrics () { return true }
9897
9998 // ------ HELPER METHODS ------
100-
99+
101100 /**
102101 * Records the start of a task by storing its {@link TraceRecord}.
103102 *
104103 * @param trace the TraceRecord of the task that just started
105104 */
106- synchronized void recordStarted (TraceRecord traceRecord ) {
105+ synchronized void recordStartedTask (TraceRecord traceRecord ) {
107106 // Keep started tasks
108107 runningTasks[traceRecord. taskId] = traceRecord
109-
110- // Add a process node under the workflow if it doesn’t exist yet
111- if (! workflowStats. getChild(traceRecord. processName)) {
112- workflowStats. addChild(new CO2RecordTree (traceRecord. processName, [level : ' process' ]))
113- }
114108 }
115109
116110 /**
@@ -136,8 +130,13 @@ class CO2FootprintObserver implements TraceObserver {
136130 // Optionally write to trace file
137131 this . traceFile. write(co2Record)
138132
139- // Add a task node with its CO2Record to the corresponding process
133+ // Add a process node under the workflow if it doesn’t exist yet
140134 CO2RecordTree processNode = workflowStats. getChild(traceRecord. processName)
135+ if (! processNode) {
136+ processNode = workflowStats. addChild(new CO2RecordTree (traceRecord. processName, [level : ' process' ]))
137+ }
138+
139+ // Add a task node with its CO2Record to the corresponding process
141140 processNode. addChild(new CO2RecordTree (traceRecord. taskId, [level : ' task' ], co2Record))
142141
143142 return co2Record
@@ -155,15 +154,14 @@ class CO2FootprintObserver implements TraceObserver {
155154 co2RecordTree. collectAdditionalMetrics()
156155
157156 // Create report and summary if any content exists to write to the file
158- if (workflowStats ) {
157+ if (co2RecordTree ) {
159158 summaryFile. create()
160159 summaryFile. write(co2RecordTree, co2FootprintCalculator, config)
161160
162161 reportFile. create()
163162 reportFile. addEntries(co2RecordTree, co2FootprintCalculator, config, timeCiRecordCollector, workflowMetadata)
164163 reportFile. write()
165- }
166- if (co2RecordTree) {
164+
167165 provenanceFile. create()
168166 provenanceFile. write(co2RecordTree)
169167 }
@@ -226,77 +224,68 @@ class CO2FootprintObserver implements TraceObserver {
226224 )
227225 }
228226
229-
230- // ---- PROCESS LEVEL ----
227+ // ---- TASK LEVEL ----
231228
232229 /**
233- * This method is invoked when a process is created
230+ * Invoked when a task is submitted by an executor to the
231+ * underlying execution backend.
234232 *
235- * @param process The process created ({@link nextflow.processor.TaskProcessor})
233+ * @param event A task event containing the ({@link nextflow.processor.TaskHandler})
234+ * and ({@link nextflow.trace.TraceRecord}) for the task
236235 */
237236 @Override
238- void onProcessCreate (TaskProcessor process ) {}
237+ void onTaskSubmit (TaskEvent event ) {
238+ log. trace(" Trace report - submit process > ${ event.handler} " )
239239
240- /**
241- * This method is invoked before a process run is going to be submitted.
242- *
243- * @param handler The task handler ({@link nextflow.processor.TaskHandler})
244- * @param trace The trace record for the task ({@link nextflow.trace.TraceRecord})
245- */
246- @Override
247- void onProcessSubmit (TaskHandler handler , TraceRecord trace ) {
248- log. trace(" Trace report - submit process > ${ handler} " )
249-
250- recordStarted(trace)
240+ recordStartedTask(event. trace)
251241 }
252242
253243 /**
254- * This method is invoked when a process run is going to start .
244+ * Invoked when a task is running in the underlying execution backend .
255245 *
256- * @param handler The task handler ({@link nextflow.processor.TaskHandler})
257- * @param trace The trace record for the task ({@link nextflow.trace.TraceRecord})
246+ * @param event A task event containing the ({@link nextflow.processor.TaskHandler})
247+ * and ({@link nextflow.trace.TraceRecord}) for the task
258248 */
259- @Override
260- void onProcessStart (TaskHandler handler , TraceRecord trace ) {
261- log. trace(" Trace report - start process > ${ handler} " )
249+ void onTaskStart (TaskEvent event ) {
250+ log. trace(" Trace report - start process > ${ event.handler} " )
262251
263- recordStarted( trace)
252+ recordStartedTask(event . trace)
264253 }
265254
266255 /**
267- * This method is invoked when a process run completes.
256+ * Invoked when a task completes.
268257 *
269- * @param handler The task handler ({@link nextflow.processor.TaskHandler})
270- * @param trace The trace record for the task ({@link nextflow.trace.TraceRecord})
258+ * @param event A task event containing the ({@link nextflow.processor.TaskHandler})
259+ * and ({@link nextflow.trace.TraceRecord}) for the task
271260 */
272261 @Override
273- void onProcessComplete ( TaskHandler handler , TraceRecord trace ) {
274- log. trace(" Trace report - complete process > ${ handler} " )
262+ void onTaskComplete ( TaskEvent event ) {
263+ log. trace(" Trace report - complete process > ${ event. handler} " )
275264
276265 // Ensure the presence of a Trace BaseRecord
277- if (! trace) {
278- log. warn(" Unable to find TraceRecord for task with id: ${ handler.task.id} " )
266+ if (! event . trace) {
267+ log. warn(" Unable to find TraceRecord for task with id: ${ event. handler.task.id} " )
279268 return
280269 }
281270
282- aggregateRecords(trace)
271+ aggregateRecords(event . trace)
283272 }
284273
285274 /**
286- * This method is invoked when a process was cached.
275+ * Invoked when a task execution is skipped because the result is cached (already computed)
276+ * or stored (using the `storeDir` directive).
287277 *
288- * @param handler The task handler ({@link nextflow.processor.TaskHandler})
289- * @param trace The trace record for the task ({@link nextflow.trace.TraceRecord})
278+ * @param event A task event containing the ({@link nextflow.processor.TaskHandler})
279+ * and ({@link nextflow.trace.TraceRecord}) for the task
290280 */
291281 @Override
292- void onProcessCached ( TaskHandler handler , TraceRecord trace ) {
293- log. trace(" Trace report - cached process > ${ handler} " )
282+ void onTaskCached ( TaskEvent event ) {
283+ log. trace(" Trace report - cached process > ${ event. handler} " )
294284
295285 // Event was triggered by a stored task, ignore it
296- if (trace == null ) { return }
297-
298- recordStarted(trace) // add also cashed tasks to the runningTasks to be able to report them in the output files
299- aggregateRecords(trace)
300- }
286+ if (event. trace == null ) { return }
301287
288+ recordStartedTask(event. trace) // add also cashed tasks to the runningTasks to be able to report them in the output files
289+ aggregateRecords(event. trace)
290+ }
302291}
0 commit comments