|
| 1 | +""" |
| 2 | +Maps CodeCarbon EmissionsData to BoAmps report format. |
| 3 | +""" |
| 4 | + |
| 5 | +import warnings |
| 6 | +from dataclasses import fields as dataclass_fields |
| 7 | +from dataclasses import replace |
| 8 | +from typing import Optional |
| 9 | + |
| 10 | +from codecarbon.output_methods.boamps.models import ( |
| 11 | + BoAmpsEnvironment, |
| 12 | + BoAmpsHardware, |
| 13 | + BoAmpsHeader, |
| 14 | + BoAmpsInfrastructure, |
| 15 | + BoAmpsMeasure, |
| 16 | + BoAmpsReport, |
| 17 | + BoAmpsSoftware, |
| 18 | + BoAmpsSystem, |
| 19 | + BoAmpsTask, |
| 20 | +) |
| 21 | +from codecarbon.output_methods.emissions_data import EmissionsData |
| 22 | + |
| 23 | +BOAMPS_FORMAT_VERSION = "0.1" |
| 24 | +BOAMPS_FORMAT_SPEC_URI = "https://github.com/Boavizta/BoAmps/tree/main/model" |
| 25 | + |
| 26 | + |
| 27 | +def _to_boamps_datetime(timestamp: str) -> str: |
| 28 | + """Normalize a timestamp to BoAmps format (YYYY-MM-DD HH:MM:SS).""" |
| 29 | + return timestamp.replace("T", " ") if timestamp else timestamp |
| 30 | + |
| 31 | + |
| 32 | +def map_emissions_to_boamps( |
| 33 | + emissions: EmissionsData, |
| 34 | + task: Optional[BoAmpsTask] = None, |
| 35 | + header: Optional[BoAmpsHeader] = None, |
| 36 | + quality: Optional[str] = None, |
| 37 | + infra_overrides: Optional[dict] = None, |
| 38 | + environment_overrides: Optional[dict] = None, |
| 39 | +) -> BoAmpsReport: |
| 40 | + """ |
| 41 | + Map CodeCarbon EmissionsData to a BoAmps report. |
| 42 | +
|
| 43 | + Auto-fills fields from EmissionsData and merges with user-provided context. |
| 44 | + User-provided values take precedence over auto-detected values. |
| 45 | +
|
| 46 | + Args: |
| 47 | + emissions: CodeCarbon emissions data from a completed run. |
| 48 | + task: User-provided task context (required for schema-valid BoAmps). |
| 49 | + header: User-provided header overrides. |
| 50 | + quality: Quality assessment ("high", "medium", "low"). |
| 51 | + infra_overrides: Additional infrastructure fields (cloud_instance, cloud_service). |
| 52 | + environment_overrides: Additional environment fields (power_source, etc.). |
| 53 | +
|
| 54 | + Returns: |
| 55 | + A BoAmpsReport populated with auto-detected and user-provided data. |
| 56 | + """ |
| 57 | + report_header = _build_header(emissions, header) |
| 58 | + measures = [_build_measure(emissions)] |
| 59 | + system = _build_system(emissions) |
| 60 | + software = _build_software(emissions) |
| 61 | + infrastructure = _build_infrastructure(emissions, infra_overrides) |
| 62 | + environment = _build_environment(emissions, environment_overrides) |
| 63 | + |
| 64 | + if task is None: |
| 65 | + warnings.warn( |
| 66 | + "No BoAmps task context provided. The output will be missing required " |
| 67 | + "fields (taskStage, taskFamily, algorithms, dataset) and will not " |
| 68 | + "validate against the BoAmps schema.", |
| 69 | + UserWarning, |
| 70 | + stacklevel=2, |
| 71 | + ) |
| 72 | + |
| 73 | + return BoAmpsReport( |
| 74 | + header=report_header, |
| 75 | + task=task, |
| 76 | + measures=measures, |
| 77 | + system=system, |
| 78 | + software=software, |
| 79 | + infrastructure=infrastructure, |
| 80 | + environment=environment, |
| 81 | + quality=quality, |
| 82 | + ) |
| 83 | + |
| 84 | + |
| 85 | +def _build_header( |
| 86 | + emissions: EmissionsData, user_header: Optional[BoAmpsHeader] |
| 87 | +) -> BoAmpsHeader: |
| 88 | + """Build header from EmissionsData, merging with user overrides.""" |
| 89 | + auto_header = BoAmpsHeader( |
| 90 | + format_version=BOAMPS_FORMAT_VERSION, |
| 91 | + format_version_specification_uri=BOAMPS_FORMAT_SPEC_URI, |
| 92 | + report_id=emissions.run_id, |
| 93 | + report_datetime=_to_boamps_datetime(emissions.timestamp), |
| 94 | + ) |
| 95 | + |
| 96 | + if user_header is None: |
| 97 | + return auto_header |
| 98 | + |
| 99 | + # User values override auto-detected values |
| 100 | + return BoAmpsHeader( |
| 101 | + licensing=user_header.licensing or auto_header.licensing, |
| 102 | + format_version=user_header.format_version or auto_header.format_version, |
| 103 | + format_version_specification_uri=( |
| 104 | + user_header.format_version_specification_uri |
| 105 | + or auto_header.format_version_specification_uri |
| 106 | + ), |
| 107 | + report_id=user_header.report_id or auto_header.report_id, |
| 108 | + report_datetime=user_header.report_datetime or auto_header.report_datetime, |
| 109 | + report_status=user_header.report_status or auto_header.report_status, |
| 110 | + publisher=user_header.publisher or auto_header.publisher, |
| 111 | + ) |
| 112 | + |
| 113 | + |
| 114 | +def _build_measure(emissions: EmissionsData) -> BoAmpsMeasure: |
| 115 | + """Build a BoAmps measure from EmissionsData.""" |
| 116 | + # Note: emissions.tracking_mode is "process"/"machine" (CodeCarbon's scope), |
| 117 | + # not the CPU/GPU power tracking method (rapl, nvml, etc.) that BoAmps expects |
| 118 | + # for cpuTrackingMode/gpuTrackingMode. We omit these fields since we don't |
| 119 | + # have the actual tracker implementation details in EmissionsData. |
| 120 | + measure = BoAmpsMeasure( |
| 121 | + measurement_method="codecarbon", |
| 122 | + version=emissions.codecarbon_version, |
| 123 | + power_consumption=emissions.energy_consumed, |
| 124 | + measurement_duration=emissions.duration, |
| 125 | + measurement_date_time=_to_boamps_datetime(emissions.timestamp), |
| 126 | + ) |
| 127 | + |
| 128 | + # CPU utilization as fraction (0-1) |
| 129 | + if emissions.cpu_utilization_percent > 0: |
| 130 | + measure.average_utilization_cpu = round( |
| 131 | + emissions.cpu_utilization_percent / 100.0, 4 |
| 132 | + ) |
| 133 | + |
| 134 | + # GPU fields only if GPU is present |
| 135 | + if emissions.gpu_count and emissions.gpu_count > 0: |
| 136 | + if emissions.gpu_utilization_percent > 0: |
| 137 | + measure.average_utilization_gpu = round( |
| 138 | + emissions.gpu_utilization_percent / 100.0, 4 |
| 139 | + ) |
| 140 | + |
| 141 | + return measure |
| 142 | + |
| 143 | + |
| 144 | +def _build_system(emissions: EmissionsData) -> BoAmpsSystem: |
| 145 | + """Build system info from EmissionsData.""" |
| 146 | + return BoAmpsSystem(os=emissions.os) |
| 147 | + |
| 148 | + |
| 149 | +def _build_software(emissions: EmissionsData) -> BoAmpsSoftware: |
| 150 | + """Build software info from EmissionsData.""" |
| 151 | + return BoAmpsSoftware( |
| 152 | + language="python", |
| 153 | + version=emissions.python_version, |
| 154 | + ) |
| 155 | + |
| 156 | + |
| 157 | +def _build_infrastructure( |
| 158 | + emissions: EmissionsData, overrides: Optional[dict] = None |
| 159 | +) -> BoAmpsInfrastructure: |
| 160 | + """Build infrastructure from EmissionsData hardware fields.""" |
| 161 | + components = [] |
| 162 | + |
| 163 | + # CPU component (always present) |
| 164 | + # emissions.cpu_count is logical thread count. BoAmps nbComponent expects |
| 165 | + # physical cores. Standard SMT/HT uses 2 threads per core. |
| 166 | + cpu_cores = max(1, int(emissions.cpu_count) // 2) if emissions.cpu_count else 1 |
| 167 | + cpu_component = BoAmpsHardware( |
| 168 | + component_type="cpu", |
| 169 | + component_name=emissions.cpu_model, |
| 170 | + nb_component=cpu_cores, |
| 171 | + ) |
| 172 | + components.append(cpu_component) |
| 173 | + |
| 174 | + # GPU component (only if present) |
| 175 | + if emissions.gpu_count and emissions.gpu_count > 0: |
| 176 | + gpu_component = BoAmpsHardware( |
| 177 | + component_type="gpu", |
| 178 | + component_name=emissions.gpu_model if emissions.gpu_model else None, |
| 179 | + nb_component=int(emissions.gpu_count), |
| 180 | + ) |
| 181 | + components.append(gpu_component) |
| 182 | + |
| 183 | + # RAM component (always present) |
| 184 | + ram_component = BoAmpsHardware( |
| 185 | + component_type="ram", |
| 186 | + nb_component=1, |
| 187 | + memory_size=emissions.ram_total_size, |
| 188 | + ) |
| 189 | + components.append(ram_component) |
| 190 | + |
| 191 | + # emissions.on_cloud can be "N" even on public cloud (the tracker clears |
| 192 | + # cloud_provider/region for some providers). Use cloud_provider as a |
| 193 | + # secondary signal to avoid misreporting cloud runs as on-premise. |
| 194 | + is_cloud = emissions.on_cloud == "Y" or bool(emissions.cloud_provider) |
| 195 | + infra = BoAmpsInfrastructure( |
| 196 | + infra_type="publicCloud" if is_cloud else "onPremise", |
| 197 | + cloud_provider=( |
| 198 | + emissions.cloud_provider if is_cloud and emissions.cloud_provider else None |
| 199 | + ), |
| 200 | + components=components, |
| 201 | + ) |
| 202 | + |
| 203 | + # Apply overrides from context file |
| 204 | + if overrides: |
| 205 | + for attr in ("cloud_instance", "cloud_service", "infra_type"): |
| 206 | + if attr in overrides: |
| 207 | + setattr(infra, attr, overrides[attr]) |
| 208 | + |
| 209 | + # Merge user-provided components: enrich auto-detected components |
| 210 | + # with user-supplied details (manufacturer, family, series, share, etc.) |
| 211 | + # by matching on component_type. Extra user components are appended. |
| 212 | + if "components" in overrides: |
| 213 | + user_components = overrides["components"] |
| 214 | + auto_by_type = {c.component_type: c for c in infra.components} |
| 215 | + merged = [] |
| 216 | + used_types = set() |
| 217 | + for user_comp in user_components: |
| 218 | + if user_comp.component_type in auto_by_type: |
| 219 | + auto = auto_by_type[user_comp.component_type] |
| 220 | + # Build a merged copy: user values take precedence, |
| 221 | + # auto-detected fill blanks. Avoids mutating the originals. |
| 222 | + fill = { |
| 223 | + f.name: getattr(auto, f.name) |
| 224 | + for f in dataclass_fields(user_comp) |
| 225 | + if f.name != "component_type" |
| 226 | + and getattr(user_comp, f.name) is None |
| 227 | + } |
| 228 | + user_comp = replace(user_comp, **fill) if fill else user_comp |
| 229 | + used_types.add(user_comp.component_type) |
| 230 | + merged.append(user_comp) |
| 231 | + # Keep auto-detected components that the user didn't override |
| 232 | + for auto in infra.components: |
| 233 | + if auto.component_type not in used_types: |
| 234 | + merged.append(auto) |
| 235 | + infra.components = merged |
| 236 | + |
| 237 | + return infra |
| 238 | + |
| 239 | + |
| 240 | +def _build_environment( |
| 241 | + emissions: EmissionsData, overrides: Optional[dict] = None |
| 242 | +) -> BoAmpsEnvironment: |
| 243 | + """Build environment from EmissionsData location fields.""" |
| 244 | + env = BoAmpsEnvironment( |
| 245 | + country=emissions.country_name, |
| 246 | + latitude=emissions.latitude, |
| 247 | + longitude=emissions.longitude, |
| 248 | + ) |
| 249 | + |
| 250 | + if overrides: |
| 251 | + for attr in ( |
| 252 | + "location", |
| 253 | + "power_supplier_type", |
| 254 | + "power_source", |
| 255 | + "power_source_carbon_intensity", |
| 256 | + ): |
| 257 | + if attr in overrides: |
| 258 | + setattr(env, attr, overrides[attr]) |
| 259 | + |
| 260 | + return env |
0 commit comments