|
1 | 1 | from __future__ import annotations |
2 | | -from typing import Dict |
| 2 | + |
3 | 3 | from urllib.parse import parse_qs, urlparse |
| 4 | + |
4 | 5 | from .audit import InMemoryAuditRecorder |
5 | 6 | from .exceptions import SecretPolicyError, SecretProviderError, UnsupportedProviderAction |
6 | 7 | from .models import SecretRef, SecretValue, SecretVersionInfo, StoreSecretRequest |
7 | 8 | from .policy import SecretPolicyEngine |
8 | 9 | from .providers.base import SecretProvider |
9 | 10 |
|
| 11 | + |
10 | 12 | class SecretsBroker: |
11 | | - def __init__(self, providers: Dict[str, SecretProvider], policy: SecretPolicyEngine, audit: InMemoryAuditRecorder | None = None) -> None: |
| 13 | + def __init__( |
| 14 | + self, |
| 15 | + providers: dict[str, SecretProvider], |
| 16 | + policy: SecretPolicyEngine, |
| 17 | + audit: InMemoryAuditRecorder | None = None, |
| 18 | + ) -> None: |
12 | 19 | self.providers = providers |
13 | 20 | self.policy = policy |
14 | 21 | self.audit = audit or InMemoryAuditRecorder() |
| 22 | + |
15 | 23 | def parse_ref(self, uri: str) -> SecretRef: |
16 | | - parsed = urlparse(uri); params = parse_qs(parsed.query) |
17 | | - return SecretRef(provider=parsed.scheme, vault=parsed.netloc or None, path=parsed.path.lstrip("/"), version=params.get("version", [None])[0]) |
| 24 | + parsed = urlparse(uri) |
| 25 | + params = parse_qs(parsed.query) |
| 26 | + return SecretRef( |
| 27 | + provider=parsed.scheme, |
| 28 | + vault=parsed.netloc or None, |
| 29 | + path=parsed.path.lstrip("/"), |
| 30 | + version=params.get("version", [None])[0], |
| 31 | + ) |
| 32 | + |
18 | 33 | def resolve(self, ref: SecretRef, *, caller: str) -> SecretValue: |
19 | 34 | decision = self.policy.decide(ref, action="resolve", caller=caller) |
20 | | - self.audit.record(action="resolve", actor=caller, ref=ref, allowed=decision.allowed, provider=ref.provider, reason=decision.reason) |
21 | | - if not decision.allowed: raise SecretPolicyError(decision.reason) |
| 35 | + self.audit.record( |
| 36 | + action="resolve", |
| 37 | + actor=caller, |
| 38 | + ref=ref, |
| 39 | + allowed=decision.allowed, |
| 40 | + provider=ref.provider, |
| 41 | + reason=decision.reason, |
| 42 | + ) |
| 43 | + if not decision.allowed: |
| 44 | + raise SecretPolicyError(decision.reason) |
22 | 45 | provider = self._provider(ref.provider) |
23 | | - if not provider.capabilities().supports_read: raise UnsupportedProviderAction(f"provider does not support resolve: {ref.provider}") |
| 46 | + if not provider.capabilities().supports_read: |
| 47 | + raise UnsupportedProviderAction(f"provider does not support resolve: {ref.provider}") |
24 | 48 | return provider.resolve(ref) |
| 49 | + |
25 | 50 | def store(self, request: StoreSecretRequest, *, caller: str) -> SecretVersionInfo: |
26 | | - decision = self.policy.decide(request.ref, action="store", caller=caller, overwrite=request.allow_overwrite) |
27 | | - self.audit.record(action="store", actor=caller, ref=request.ref, allowed=decision.allowed, provider=request.ref.provider, reason=decision.reason) |
28 | | - if not decision.allowed: raise SecretPolicyError(decision.reason) |
29 | | - provider = self._provider(request.ref.provider); caps = provider.capabilities() |
30 | | - if not caps.supports_write: raise UnsupportedProviderAction(f"provider does not support store: {request.ref.provider}") |
31 | | - if request.tags and not caps.supports_tagging: raise UnsupportedProviderAction(f"provider does not support tags: {request.ref.provider}") |
| 51 | + decision = self.policy.decide( |
| 52 | + request.ref, action="store", caller=caller, overwrite=request.allow_overwrite |
| 53 | + ) |
| 54 | + self.audit.record( |
| 55 | + action="store", |
| 56 | + actor=caller, |
| 57 | + ref=request.ref, |
| 58 | + allowed=decision.allowed, |
| 59 | + provider=request.ref.provider, |
| 60 | + reason=decision.reason, |
| 61 | + ) |
| 62 | + if not decision.allowed: |
| 63 | + raise SecretPolicyError(decision.reason) |
| 64 | + provider = self._provider(request.ref.provider) |
| 65 | + caps = provider.capabilities() |
| 66 | + if not caps.supports_write: |
| 67 | + raise UnsupportedProviderAction( |
| 68 | + f"provider does not support store: {request.ref.provider}" |
| 69 | + ) |
| 70 | + if request.tags and not caps.supports_tagging: |
| 71 | + raise UnsupportedProviderAction( |
| 72 | + f"provider does not support tags: {request.ref.provider}" |
| 73 | + ) |
32 | 74 | return provider.store(request) |
| 75 | + |
33 | 76 | def checkout(self, ref: SecretRef, *, caller: str): |
34 | 77 | decision = self.policy.decide(ref, action="checkout", caller=caller) |
35 | | - self.audit.record(action="checkout", actor=caller, ref=ref, allowed=decision.allowed, provider=ref.provider, reason=decision.reason) |
36 | | - if not decision.allowed: raise SecretPolicyError(decision.reason) |
| 78 | + self.audit.record( |
| 79 | + action="checkout", |
| 80 | + actor=caller, |
| 81 | + ref=ref, |
| 82 | + allowed=decision.allowed, |
| 83 | + provider=ref.provider, |
| 84 | + reason=decision.reason, |
| 85 | + ) |
| 86 | + if not decision.allowed: |
| 87 | + raise SecretPolicyError(decision.reason) |
37 | 88 | provider = self._provider(ref.provider) |
38 | | - if not provider.capabilities().supports_checkout: raise UnsupportedProviderAction(f"provider does not support checkout: {ref.provider}") |
| 89 | + if not provider.capabilities().supports_checkout: |
| 90 | + raise UnsupportedProviderAction(f"provider does not support checkout: {ref.provider}") |
39 | 91 | return provider.checkout(ref) |
| 92 | + |
40 | 93 | def _provider(self, name: str) -> SecretProvider: |
41 | | - if name not in self.providers: raise SecretProviderError(f"unknown provider: {name}") |
| 94 | + if name not in self.providers: |
| 95 | + raise SecretProviderError(f"unknown provider: {name}") |
42 | 96 | return self.providers[name] |
0 commit comments