-
Notifications
You must be signed in to change notification settings - Fork 670
Expand file tree
/
Copy pathviews.py
More file actions
6039 lines (5363 loc) · 247 KB
/
views.py
File metadata and controls
6039 lines (5363 loc) · 247 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import io
import logging
import re
import pandas as pd
from rest_framework import status
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework.parsers import FileUploadParser
from .serializers import LoadFileSerializer
from core.utils import build_questions_dict
from core.models import (
Actor,
Asset,
Evidence,
Folder,
Perimeter,
RequirementAssessment,
RequirementNode,
RiskMatrix,
AppliedControl,
FindingsAssessment,
RiskScenario,
Policy,
SecurityException,
Incident,
Vulnerability,
)
from core.serializers import (
BaseModelSerializer,
AssetWriteSerializer,
PerimeterWriteSerializer,
AppliedControlWriteSerializer,
ComplianceAssessmentWriteSerializer,
RequirementAssessmentWriteSerializer,
FindingsAssessmentWriteSerializer,
FindingWriteSerializer,
UserWriteSerializer,
RiskAssessmentWriteSerializer,
RiskScenarioWriteSerializer,
ReferenceControlWriteSerializer,
ThreatWriteSerializer,
EvidenceWriteSerializer,
FolderWriteSerializer,
PolicyWriteSerializer,
SecurityExceptionWriteSerializer,
IncidentWriteSerializer,
VulnerabilityWriteSerializer,
)
from ebios_rm.models import (
EbiosRMStudy,
FearedEvent,
RoTo,
Stakeholder,
StrategicScenario,
AttackPath,
ElementaryAction,
KillChain,
)
from ebios_rm.serializers import (
ElementaryActionWriteSerializer,
EbiosRMStudyWriteSerializer,
)
from .ebios_rm_excel_helpers import (
extract_elementary_actions,
process_excel_file as process_ebios_rm_excel,
)
from core.models import Terminology
from data_wizard.arm_helpers import process_arm_file
from tprm.models import Entity, Solution, Contract
from tprm.serializers import (
EntityWriteSerializer,
SolutionWriteSerializer,
ContractWriteSerializer,
)
from resilience.models import (
BusinessImpactAnalysis,
AssetAssessment,
EscalationThreshold,
)
from resilience.serializers import (
BusinessImpactAnalysisWriteSerializer,
AssetAssessmentWriteSerializer,
EscalationThresholdWriteSerializer,
)
from privacy.models import Processing, ProcessingNature
from privacy.serializers import ProcessingWriteSerializer
from iam.models import RoleAssignment, User
from core.models import FilteringLabel
from core.utils import get_global_currency
from uuid import UUID
from django.core.files.uploadedfile import UploadedFile
from django.http import HttpRequest
from datetime import datetime
from typing import Optional, Final, ClassVar
from dataclasses import dataclass, field
from abc import ABC, abstractmethod
import enum
logger = logging.getLogger(__name__)
def get_accessible_folders_map(user: User) -> dict[str, UUID]:
"""
Build a map of folder names to IDs that the provided user can access.
Used by the data wizard import flow to validate targets.
"""
(viewable_folders_ids, _, _) = RoleAssignment.get_accessible_object_ids(
Folder.get_root_folder(), user, Folder
)
folders_map = {
f.name.lower(): f.id for f in Folder.objects.filter(id__in=viewable_folders_ids)
}
return folders_map
ZIP_MAGIC_NUMBER: Final[bytes] = bytes([0x50, 0x4B, 0x03, 0x04])
def is_excel_file(file: io.BytesIO) -> bool:
file_data = file.read(len(ZIP_MAGIC_NUMBER))
is_excel = file_data == ZIP_MAGIC_NUMBER
file.seek(0)
return is_excel
def normalize_datetime_columns(df: pd.DataFrame) -> pd.DataFrame:
"""
Convert datetime columns to ISO format strings.
Uses date-only format (YYYY-MM-DD) when there is no time component,
full ISO format otherwise. NaT values become empty strings.
"""
for col in df.select_dtypes(include=["datetime64", "datetimetz"]).columns:
df[col] = df[col].apply(
lambda x: (
x.strftime("%Y-%m-%d")
if pd.notna(x) and x == x.normalize()
else (x.isoformat() if pd.notna(x) else "")
)
)
return df
def _parse_date(value) -> Optional[str]:
"""Normalize a value to a YYYY-MM-DD string for DRF DateField."""
if not value or value == "":
return None
if isinstance(value, datetime):
return value.strftime("%Y-%m-%d")
if isinstance(value, str) and "T" in value:
return value.split("T")[0]
return value
def _parse_datetime(value) -> Optional[str]:
"""Normalize a value to an ISO datetime string for DRF DateTimeField."""
if not value or value == "":
return None
if isinstance(value, datetime):
return value.isoformat()
return value
def _parse_time_to_seconds(s: str) -> int | None:
"""Parse a time string like '4h', '2h30m', '24h10s', '01m30s' to seconds."""
if not s:
return None
s = s.strip()
try:
return int(s)
except ValueError:
pass
m = re.fullmatch(r"(?:(\d+)h)?(?:(\d+)m)?(?:(\d+)s)?", s)
if not m or not any(m.groups()):
return None
h, mn, sc = (int(x) if x else 0 for x in m.groups())
return h * 3600 + mn * 60 + sc
def _get_security_objective_scale() -> list:
"""Return the active security-objective scale from GlobalSettings."""
from global_settings.models import GlobalSettings
settings = GlobalSettings.objects.filter(name="general").first()
scale_key = (
settings.value.get("security_objective_scale", "1-4") if settings else "1-4"
)
return Asset.SECURITY_OBJECTIVES_SCALES[scale_key]
def _reverse_scale_value(display_val: str, scale: list) -> int | None:
"""Map a display value back to its raw 0-based index.
The scale list maps raw index → display value. We need the reverse:
find the *first* index whose display value matches.
Supports both numeric scales (1-4, 0-3, …) and string scales (FIPS-199).
"""
# Try numeric comparison first
try:
numeric = int(display_val)
for idx, sv in enumerate(scale):
if isinstance(sv, (int, float)) and int(sv) == numeric:
return idx
except (ValueError, TypeError):
pass
# Fall back to case-insensitive string comparison (e.g. FIPS-199)
display_lower = display_val.strip().lower()
for idx, sv in enumerate(scale):
if str(sv).lower() == display_lower:
return idx
return None
def _parse_security_objectives(raw: str, scale: list | None = None) -> dict:
"""Parse 'confidentiality: 3,integrity: 2' → {key: {"value": int, "is_enabled": True}}.
Values in the input are *display* values (scale-mapped). They are
reverse-mapped back to the raw 0-based index before storage so that
an export → import round-trip is lossless.
"""
result = {}
if not raw:
return result
if scale is None:
scale = _get_security_objective_scale()
for part in str(raw).split(","):
part = part.strip()
if ":" not in part:
continue
key, _, val = part.partition(":")
key = key.strip()
v = _reverse_scale_value(val.strip(), scale)
if v is not None and 0 <= v <= 4:
result[key] = {"value": v, "is_enabled": True}
return result
def _parse_recovery_objectives(raw: str) -> dict:
"""Parse 'rto: 4h,rpo: 5h,mtd: 6h' → {key: {"value": seconds}}."""
result = {}
if not raw:
return result
for part in str(raw).split(","):
part = part.strip()
if ":" not in part:
continue
key, _, val = part.partition(":")
key = key.strip()
secs = _parse_time_to_seconds(val.strip())
if secs is not None and secs >= 0:
result[key] = {"value": secs}
return result
def _resolve_filtering_labels(value) -> list[UUID]:
"""Parse pipe- or comma-separated label names and return list of FilteringLabel IDs.
Labels that do not yet exist are created on the fly.
"""
if not value or not isinstance(value, str):
return []
separator = "|" if "|" in value else ","
label_names = [name.strip() for name in value.split(separator) if name.strip()]
label_ids: list[UUID] = []
for label_name in label_names:
label = FilteringLabel.objects.filter(label=label_name).first()
if label is None:
try:
label = FilteringLabel(label=label_name)
label.full_clean()
label.save()
except Exception:
logging.error(f"Failed to save label: {value}")
label_ids.append(label.id)
return label_ids
class RecordFileType(enum.StrEnum):
XLSX = "Excel"
CSV = "CSV"
def get_error(self) -> str:
match self:
case RecordFileType.XLSX:
return "ExcelParsingFailed"
case RecordFileType.CSV:
return "CSVParsingFailed"
case _:
raise NotImplementedError(
f"Unreachable code detected (unknown {type(self).__name__} enum variant)."
)
class ConflictMode(enum.StrEnum):
STOP = "stop"
SKIP = "skip"
UPDATE = "update"
class ModelType(enum.StrEnum):
TPRM = "TPRM"
EBIOS_RM_STUDY_ARM = "EbiosRMStudyARM"
EBIOS_RM_STUDY_EXCEL = "EbiosRMStudyExcel"
ASSET = "Asset"
APPLIED_CONTROL = "AppliedControl"
PERIMETER = "Perimeter"
USER = "User"
COMPLIANCE_ASSESSMENT = "ComplianceAssessment"
FINDINGS_ASSESSMENT = "FindingsAssessment"
RISK_ASSESSMENT = "RiskAssessment"
ELEMENTARY_ACTION = "ElementaryAction"
REFERENCE_CONTROL = "ReferenceControl"
THREAT = "Threat"
PROCESSING = "Processing"
FOLDER = "Folder"
EVIDENCE = "Evidence"
POLICY = "Policy"
SECURITY_EXCEPTION = "SecurityException"
INCIDENT = "Incident"
VULNERABILITY = "Vulnerability"
BUSINESS_IMPACT_ANALYSIS = "BusinessImpactAnalysis"
@staticmethod
def from_string(model_type: str) -> Optional["ModelType"]:
"""Returns a ModelType on success, otherwise return None."""
try:
return ModelType(model_type)
except ValueError:
return
@dataclass(frozen=True)
class Error:
record: dict
error: str
is_warning: bool = False
def to_dict(self) -> dict:
return {"record": self.record, "error": self.error}
@dataclass
class Result:
created: int = 0
updated: int = 0
skipped: int = 0
failed: int = 0
stopped: bool = False
errors: list[Error] = field(default_factory=list)
warnings: list[Error] = field(default_factory=list)
details: dict = field(default_factory=dict)
@property
def successful(self) -> int:
return self.created + self.updated
def add_created(self):
self.created += 1
def add_updated(self):
self.updated += 1
def add_skipped(self):
self.skipped += 1
def add_error(self, error: Error, fail_count: int = 1):
self.failed += fail_count
self.errors.append(error)
def to_dict(self) -> dict:
return {
"successful": self.successful,
"created": self.created,
"updated": self.updated,
"skipped": self.skipped,
"failed": self.failed,
"stopped": self.stopped,
"errors": [error.to_dict() for error in self.errors],
**(
{"warnings": [w.to_dict() for w in self.warnings]}
if self.warnings
else {}
),
**({"details": self.details} if self.details else {}),
}
@dataclass(frozen=True)
class BaseContext:
request: HttpRequest
folders_map: dict[str, UUID] = field(default_factory=dict)
folder_id: Optional[str] = None
perimeter_id: Optional[str] = None
matrix_id: Optional[str] = None
framework_id: Optional[str] = None
on_conflict: ConflictMode = ConflictMode.STOP
class RecordConsumer[Context](ABC):
SERIALIZER_CLASS: ClassVar[type[BaseModelSerializer]]
# Maps record_data keys to possible source record keys when they differ.
# Override in subclasses that use alternative/aliased column names.
SOURCE_KEY_MAP: ClassVar[dict[str, tuple[str, ...]]] = {}
def __init__(self, base_context: BaseContext):
self.request = base_context.request
self.folders_map = base_context.folders_map
self.folder_id = base_context.folder_id
self.perimeter_id = base_context.perimeter_id
self.matrix_id = base_context.matrix_id
self.framework_id = base_context.framework_id
self.on_conflict = base_context.on_conflict
def __init_subclass__(cls):
provided_class = getattr(cls, "SERIALIZER_CLASS", None)
is_defined = provided_class is not None
is_serializer = is_defined and issubclass(provided_class, BaseModelSerializer)
assert is_serializer, f"Invalid serializer for class {cls.__name__}"
@abstractmethod
def create_context(self) -> tuple[Context, Optional[Error]]:
pass
@abstractmethod
def prepare_create(
self, record: dict, context: Context
) -> tuple[dict, Optional[Error]]:
pass
def find_existing(self, record_data: dict):
"""Find an existing record matching this data based on the model's fields_to_check."""
model_class = self.SERIALIZER_CLASS.Meta.model
fields_to_check = getattr(model_class, "fields_to_check", [])
if not fields_to_check:
return None
query = {}
for f in fields_to_check:
value = record_data.get(f)
if value is None or value == "":
continue
if isinstance(value, str):
query[f"{f}__iexact"] = value
else:
query[f] = value
if not query:
return None
folder = record_data.get("folder")
if folder and hasattr(model_class, "folder"):
query["folder"] = folder
return model_class.objects.filter(**query).first()
def _build_update_data(self, record: dict, record_data: dict) -> dict:
"""
Filter record_data to only include fields that the user actually
provided with non-empty values in the source record.
Identity fields (used for matching) are always preserved.
"""
model_class = self.SERIALIZER_CLASS.Meta.model
identity_fields = set(getattr(model_class, "fields_to_check", []))
if hasattr(model_class, "folder"):
identity_fields.add("folder")
update_data = {}
for key, value in record_data.items():
if key in identity_fields:
update_data[key] = value
continue
source_keys = self.SOURCE_KEY_MAP.get(key, (key,))
# For M2M owner, propagate even when blank so UPDATE mode clears stale owners.
if key == "owner" and any(sk in record for sk in source_keys):
update_data[key] = value
continue
if any(record.get(sk) not in (None, "") for sk in source_keys):
update_data[key] = value
return update_data
def process_records(self, records: list[dict]) -> Result:
results = Result()
context, error = self.create_context()
if error is not None:
results.add_error(error, fail_count=len(records))
return results
model_class = self.SERIALIZER_CLASS.Meta.model
(viewable_ids, _, _) = RoleAssignment.get_accessible_object_ids(
Folder.get_root_folder(), self.request.user, model_class
)
viewable_ids = set(viewable_ids)
for record in records:
record_data, error = self.prepare_create(record, context)
if error is not None:
if error.is_warning:
results.warnings.append(error)
else:
results.add_error(error)
if self.on_conflict == ConflictMode.STOP:
results.stopped = True
break
continue
existing = None
internal_id = record.get("internal_id")
if internal_id:
existing = model_class.objects.filter(
pk=internal_id, id__in=viewable_ids
).first()
if existing is None:
existing = self.find_existing(record_data)
if existing:
match self.on_conflict:
case ConflictMode.SKIP:
results.add_skipped()
continue
case ConflictMode.STOP:
results.add_error(
Error(record=record, error="Record already exists")
)
results.stopped = True
break
case ConflictMode.UPDATE:
update_data = self._build_update_data(record, record_data)
serializer = self.SERIALIZER_CLASS(
instance=existing,
data=update_data,
partial=True,
context={"request": self.request},
)
if serializer.is_valid():
try:
serializer.save()
results.add_updated()
except Exception as e:
results.add_error(Error(record=record, error=str(e)))
else:
results.add_error(
Error(
record=record,
error=str(serializer.errors),
)
)
continue
serializer = self.SERIALIZER_CLASS(
data=record_data, context={"request": self.request}
)
if serializer.is_valid():
try:
serializer.save()
results.add_created()
except Exception as e:
results.add_error(Error(record=record, error=str(e)))
if self.on_conflict == ConflictMode.STOP:
results.stopped = True
break
else:
results.add_error(Error(record=record, error=str(serializer.errors)))
if self.on_conflict == ConflictMode.STOP:
results.stopped = True
break
logger.info(
f"{self.__class__.__name__} record processing complete. "
f"Created: {results.created}, Updated: {results.updated}, "
f"Skipped: {results.skipped}, Failed: {results.failed}"
)
return results
class AssetRecordConsumer(RecordConsumer[list]):
"""
Consumer for importing Asset records.
Supports parent_assets linking via ref_id in a second pass.
"""
SERIALIZER_CLASS = AssetWriteSerializer
SOURCE_KEY_MAP: ClassVar[dict[str, tuple[str, ...]]] = {
"reference_link": ("reference_link", "link"),
"filtering_labels": ("filtering_labels", "labels", "étiquette", "label"),
}
TYPE_MAP: Final[dict[str, str]] = {
"primary": "PR",
"pr": "PR",
"support": "SP",
"sp": "SP",
}
def create_context(self):
return _get_security_objective_scale(), None
def prepare_create(
self, record: dict, context: list
) -> tuple[dict, Optional[Error]]:
domain = self.folder_id
domain_name = record.get("domain")
if domain_name is not None:
domain = self.folders_map.get(domain_name.lower(), self.folder_id)
name = record.get("name")
if not name:
return {}, Error(record=record, error="Name field is mandatory")
# Map type field
asset_type = record.get("type", "SP")
if isinstance(asset_type, str):
asset_type = self.TYPE_MAP.get(
asset_type.lower().strip(), asset_type.upper()
)
data = {
"ref_id": record.get("ref_id", ""),
"name": name,
"type": asset_type,
"folder": domain,
"description": record.get("description", ""),
"business_value": record.get("business_value", ""),
"reference_link": record.get("reference_link", "")
or record.get("link", ""),
"observation": record.get("observation", ""),
}
raw_labels = (
record.get("filtering_labels")
or record.get("labels")
or record.get("étiquette")
or record.get("label")
)
filtering_labels = _resolve_filtering_labels(raw_labels)
if filtering_labels:
data["filtering_labels"] = filtering_labels
# Accept both export column names and native field names for support assets
raw_sec = record.get("security_objectives") or record.get(
"security_capabilities", ""
)
raw_rec = record.get("disaster_recovery_objectives") or record.get(
"recovery_capabilities", ""
)
sec_objectives = _parse_security_objectives(raw_sec, scale=context)
rec_objectives = _parse_recovery_objectives(raw_rec)
parse_warning_msgs = []
if raw_sec and not sec_objectives:
parse_warning_msgs.append(
f"Could not parse security_objectives: '{raw_sec}'"
)
if raw_rec and not rec_objectives:
parse_warning_msgs.append(
f"Could not parse disaster_recovery_objectives: '{raw_rec}'"
)
if asset_type == "PR":
if sec_objectives:
data["security_objectives"] = {"objectives": sec_objectives}
if rec_objectives:
data["disaster_recovery_objectives"] = {"objectives": rec_objectives}
else: # SP (support)
if sec_objectives:
data["security_capabilities"] = {"objectives": sec_objectives}
if rec_objectives:
data["recovery_capabilities"] = {"objectives": rec_objectives}
if parse_warning_msgs:
return data, Error(
record=record,
error="; ".join(parse_warning_msgs),
is_warning=True,
)
return data, None
def process_records(self, records: list[dict]) -> Result:
"""
Override to add second pass for parent_assets linking.
"""
# First pass: create all assets
results = super().process_records(records)
# Second pass: link parent_assets by ref_id
for record in records:
parent_assets_ref = record.get("parent_assets") or record.get(
"parent_asset_ref_id"
)
if not parent_assets_ref:
continue
asset_ref_id = record.get("ref_id")
if not asset_ref_id:
continue
# Find the created asset
asset = Asset.objects.filter(ref_id=asset_ref_id).first()
if not asset:
continue
# Parse parent ref_ids (comma or pipe separated)
if isinstance(parent_assets_ref, str):
parent_ref_ids = [
ref.strip()
for ref in parent_assets_ref.replace("|", ",").split(",")
if ref.strip()
]
else:
parent_ref_ids = [str(parent_assets_ref)]
# Link parent assets
for parent_ref_id in parent_ref_ids:
parent_asset = Asset.objects.filter(ref_id=parent_ref_id).first()
if parent_asset and parent_asset.id != asset.id:
asset.parent_assets.add(parent_asset)
return results
@dataclass(frozen=True)
class AppliedControlContext:
currency: str = field(default_factory=get_global_currency)
class AppliedControlRecordConsumer(RecordConsumer[AppliedControlContext]):
"""
Consumer for importing AppliedControl records.
Supports reference_control linking via ref_id and owner resolution
by user email or team name.
"""
SERIALIZER_CLASS = AppliedControlWriteSerializer
SOURCE_KEY_MAP: ClassVar[dict[str, tuple[str, ...]]] = {
"control_impact": ("control_impact", "impact"),
"reference_control": ("reference_control", "reference_control_ref_id"),
"owner": ("owner",),
}
IMPACT_MAP: Final[dict[str, int]] = {
"very low": 1,
"low": 2,
"medium": 3,
"high": 4,
"very high": 5,
}
EFFORT_MAP: Final[dict[str, str]] = {
"extra small": "XS",
"extrasmall": "XS",
"xs": "XS",
"small": "S",
"s": "S",
"medium": "M",
"m": "M",
"large": "L",
"l": "L",
"extra large": "XL",
"extralarge": "XL",
"xl": "XL",
}
COST_KEYS: Final[frozenset[str]] = frozenset(
{
"amortization_period",
"build_fixed_cost",
"build_people_days",
"run_fixed_cost",
"run_people_days",
}
)
def create_context(self) -> tuple[AppliedControlContext, Optional[Error]]:
return AppliedControlContext(), None
def prepare_create(
self, record: dict, context: AppliedControlContext
) -> tuple[dict, Optional[Error]]:
domain = self.folder_id
domain_name = record.get("domain")
if domain_name is not None:
domain = self.folders_map.get(domain_name.lower(), self.folder_id)
name = record.get("name")
if not name:
return {}, Error(record=record, error="Name field is mandatory")
# Parse priority
priority = record.get("priority")
if isinstance(priority, (int, float)):
priority = int(priority)
elif isinstance(priority, str) and priority.isdigit():
priority = int(priority)
else:
priority = None
# Parse effort
effort = record.get("effort")
if isinstance(effort, str):
effort = self.EFFORT_MAP.get(effort.lower().strip(), effort.upper())
if effort not in ("XS", "S", "M", "L", "XL"):
effort = None
# Parse control_impact (1-5 scale)
control_impact = record.get("control_impact") or record.get("impact")
if isinstance(control_impact, (int, float)):
control_impact = int(control_impact)
elif isinstance(control_impact, str):
if control_impact.isdigit():
control_impact = int(control_impact)
else:
control_impact = self.IMPACT_MAP.get(control_impact.lower().strip())
if isinstance(control_impact, int) and not (1 <= control_impact <= 5):
control_impact = None
# Look up reference_control by ref_id
reference_control_id = None
reference_control_ref = record.get("reference_control") or record.get(
"reference_control_ref_id"
)
if reference_control_ref:
from core.models import ReferenceControl
ref_control = ReferenceControl.objects.filter(
ref_id=reference_control_ref
).first()
if ref_control:
reference_control_id = ref_control.id
csf_function = record.get("csf_function", "govern")
if isinstance(csf_function, str):
csf_function = csf_function.lower()
category = record.get("category", "")
if isinstance(category, str):
category = category.lower()
data = {
"ref_id": record.get("ref_id", ""),
"name": name,
"description": record.get("description", ""),
"category": category,
"folder": domain,
"status": record.get("status", "to_do"),
"priority": priority,
"csf_function": csf_function,
"effort": effort,
"control_impact": control_impact,
"link": record.get("link", ""),
"eta": _parse_date(record.get("eta")),
"expiry_date": _parse_date(record.get("expiry_date")),
"start_date": _parse_date(record.get("start_date")),
"observation": record.get("observation", ""),
}
if reference_control_id:
data["reference_control"] = reference_control_id
has_cost_related_key = any(
key in self.COST_KEYS and record.get(key) not in (None, "")
for key in record.keys()
)
if has_cost_related_key:
cost = {
"currency": context.currency,
"amortization_period": int(record.get("amortization_period") or 1),
"build": {
"fixed_cost": int(record.get("build_fixed_cost") or 0),
"people_days": int(record.get("build_people_days") or 0),
},
"run": {
"fixed_cost": int(record.get("run_fixed_cost") or 0),
"people_days": int(record.get("run_people_days") or 0),
},
}
data["cost"] = cost
filtering_labels = _resolve_filtering_labels(record.get("filtering_labels"))
if filtering_labels:
data["filtering_labels"] = filtering_labels
# Resolve owner field (semicolon-separated user emails or team names).
# Always set data["owner"] when the column is present, even if blank,
# so that UPDATE mode can clear the M2M instead of silently preserving it.
if "owner" in record:
data["owner"] = self._resolve_owners(record.get("owner"))
return data, None
@staticmethod
def _resolve_owners(value) -> list:
"""Resolve semicolon-separated user emails or team names to Actor IDs.
Each entry is matched first as a user email, then as a team name.
Unresolvable entries are silently skipped.
"""
if not value or not isinstance(value, str):
return []
entries = [entry.strip() for entry in value.split(";") if entry.strip()]
actor_ids = []
for entry in entries:
# Try matching as user email first
actor = Actor.objects.filter(user__email__iexact=entry).first()
if actor is None:
# Try matching as team name
actor = Actor.objects.filter(team__name__iexact=entry).first()
if actor is not None:
actor_ids.append(actor.id)
else:
logger.warning(
"Could not resolve an owner reference to a user email or team name during Applied Control import; skipping."
)
return actor_ids
class EvidenceRecordConsumer(RecordConsumer[None]):
SERIALIZER_CLASS = EvidenceWriteSerializer
def create_context(self):
return None, None
def prepare_create(
self, record: dict, context: None
) -> tuple[dict, Optional[Error]]:
domain = self.folder_id
domain_name = record.get("domain")
if domain_name is not None:
domain = self.folders_map.get(domain_name.lower(), self.folder_id)
name = record.get("name")
if not name:
return {}, Error(record=record, error="Name field is mandatory")
data = {
"name": name,
"description": record.get("description", ""),
"ref_id": record.get("ref_id", ""),
"folder": domain,
}
filtering_labels = _resolve_filtering_labels(record.get("filtering_labels"))
if filtering_labels:
data["filtering_labels"] = filtering_labels
return data, None
class UserRecordConsumer(RecordConsumer[None]):
SERIALIZER_CLASS = UserWriteSerializer
def find_existing(self, record_data: dict):
email = record_data.get("email")
if not email:
return None
return User.objects.filter(email__iexact=email).first()
def create_context(self):
return None, None
def prepare_create(
self, record: dict, context: None
) -> tuple[dict, Optional[Error]]:
email = record.get("email")
if email is None:
return {}, Error(record=record, error="email field is mandatory")
return {
"email": email,
"first_name": record.get("first_name"),
"last_name": record.get("last_name"),
}, None
class PerimeterRecordConsumer(RecordConsumer[None]):
SERIALIZER_CLASS = PerimeterWriteSerializer
def create_context(self):
return None, None
def prepare_create(
self, record: dict, context: None
) -> tuple[dict, Optional[Error]]:
domain = self.folder_id
domain_name = record.get("domain")
if domain_name is not None:
domain = self.folders_map.get(domain_name.lower(), self.folder_id)
name = record.get("name")
if not name:
return {}, Error(record=record, error="Name field is mandatory")
return {
"name": name,
"folder": domain,
"ref_id": record.get("ref_id", ""),
"description": record.get("description", ""),
"status": record.get("status"),
}, None