Skip to content

Commit e331a8d

Browse files
authored
Add error logs subsystem (#117)
* Add error logs subsystem, hidden behind errorlogs flag * Update monitor config to include cloudflare api token * Switch branch for sdk for testing * Add support for wrangler file bindings (#118) * Add support for wrangler file bindings * Add local wrangler files to gitignore * Add support for routes * PR comment updates * Update test to match config change * Bump version to 0.3.1 (#119) * Fix Wrangler path issue (#121) * Fix Wrangler path issue * Add tests * Bump version to v0.3.2 (#122) * Limit files that get uploaded to Cloudflare (#123) * Limit files that get uploaded to CF and add tests * Remove outdated test * Fix another outdated tests * Add error logs subsystem, hidden behind errorlogs flag * Update monitor config to include cloudflare api token * Switch branch for sdk for testing * Handle cancel canary * Add rest of code * Update cargo toml
1 parent 3a2f3b1 commit e331a8d

File tree

11 files changed

+994
-610
lines changed

11 files changed

+994
-610
lines changed

Cargo.lock

Lines changed: 660 additions & 603 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ tempfile = "3.8"
9191

9292
[features]
9393
proxy = ["dep:pingora"]
94+
errorlogs = []
9495

9596
# The profile that 'dist' will build with
9697
[profile.dist]

src/adapters/backend/mod.rs

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use crate::{fs::Session, metrics::ResponseStatusCode, utils::circuit_breaker::Ht
88
use bon::Builder;
99
use chrono::DateTime;
1010
use miette::{IntoDiagnostic, Result, bail};
11-
use multitool_sdk::models::Rollout;
11+
use multitool_sdk::models::{CreateErrorRequest, Rollout};
1212
use multitool_sdk::{
1313
apis::{Api, ApiClient, configuration::Configuration},
1414
models::{
@@ -348,6 +348,30 @@ impl BackendClient {
348348
Ok(())
349349
}
350350

351+
/// Upload errors to the backend.
352+
pub(crate) async fn upload_errors(
353+
&self,
354+
meta: &RolloutMetadata,
355+
url_path: String,
356+
status_code: i32,
357+
logs: Vec<String>,
358+
) -> Result<()> {
359+
trace!("Uploading error logs to backend");
360+
let workspace_id = *meta.workspace_id();
361+
let application_id = *meta.application_id();
362+
let rollout_id = *meta.rollout_id();
363+
364+
let request = CreateErrorRequest::new(logs, status_code, url_path);
365+
366+
self.client
367+
.errors_api()
368+
.log_error(workspace_id, application_id, rollout_id, request)
369+
.await
370+
.into_diagnostic()?;
371+
372+
Ok(())
373+
}
374+
351375
/// Return information about the workspace given its name.
352376
pub(crate) async fn create_workspace<T: AsRef<str>>(
353377
&self,
@@ -583,6 +607,7 @@ pub enum MonitorConfig {
583607
dimensions: Vec<CloudWatchDimensions>,
584608
},
585609
CloudflareWorkersObservability {
610+
api_token: String,
586611
account_id: String,
587612
worker_name: String,
588613
},

src/adapters/cloudflare/metrics.rs

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
use std::collections::HashMap;
2+
13
use serde::Deserialize;
24

35
#[derive(Deserialize, Debug)]
@@ -17,3 +19,101 @@ pub struct Aggregate {
1719
#[serde(default)]
1820
pub count: u32,
1921
}
22+
23+
#[derive(Deserialize, Debug)]
24+
pub struct ErrorLogsResponse {
25+
#[serde(default)]
26+
pub invocations: HashMap<String, Vec<InvocationData>>,
27+
}
28+
29+
#[derive(Deserialize, Debug)]
30+
pub struct InvocationData {
31+
#[serde(rename = "$workers")]
32+
pub workers: WorkersData,
33+
#[serde(rename = "$metadata")]
34+
pub metadata: MetadataData,
35+
pub source: SourceData,
36+
}
37+
38+
#[derive(Deserialize, Debug)]
39+
pub struct WorkersData {
40+
pub event: EventData,
41+
}
42+
43+
#[derive(Deserialize, Debug)]
44+
pub struct EventData {
45+
pub request: RequestData,
46+
#[serde(default)]
47+
pub response: Option<ResponseData>,
48+
}
49+
50+
#[derive(Deserialize, Debug)]
51+
pub struct RequestData {
52+
pub url: String,
53+
pub method: String,
54+
pub path: String,
55+
}
56+
57+
#[derive(Deserialize, Debug, Clone)]
58+
pub struct ResponseData {
59+
pub status: i32,
60+
}
61+
62+
#[derive(Deserialize, Debug)]
63+
pub struct MetadataData {
64+
#[serde(rename = "type")]
65+
pub event_type: String,
66+
}
67+
68+
#[derive(Deserialize, Debug)]
69+
pub struct SourceData {
70+
pub message: String,
71+
}
72+
73+
#[derive(Debug, Clone)]
74+
pub struct CloudflareErrorLog {
75+
pub method: String,
76+
pub path: String,
77+
pub status_code: i32,
78+
pub logs: Vec<String>,
79+
}
80+
81+
impl Into<Vec<CloudflareErrorLog>> for ErrorLogsResponse {
82+
fn into(self) -> Vec<CloudflareErrorLog> {
83+
let mut error_logs = Vec::new();
84+
85+
for (_request_id, invocations) in self.invocations {
86+
// The cf-worker-event entry contains the request/response details
87+
let event_entry = invocations
88+
.iter()
89+
.find(|inv| inv.metadata.event_type == "cf-worker-event");
90+
91+
if let Some(event) = event_entry {
92+
let request = &event.workers.event.request;
93+
let method = request.method.clone();
94+
let path = request.path.clone();
95+
96+
// Extract log line from each invocation event and combine
97+
if let Some(response) = &event.workers.event.response {
98+
let mut logs: Vec<String> = invocations
99+
.iter()
100+
.map(|inv| inv.source.message.clone())
101+
.collect();
102+
103+
logs.reverse(); // Reverse to maintain chronological order
104+
105+
let error_log = CloudflareErrorLog {
106+
method,
107+
path,
108+
status_code: response.status,
109+
logs,
110+
};
111+
112+
error_logs.push(error_log);
113+
}
114+
}
115+
}
116+
117+
error_logs
118+
}
119+
}

src/adapters/cloudflare/mod.rs

Lines changed: 85 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use uploads::{UploadVersionRequest, UploadVersionResponse};
1212
use url::Url;
1313

1414
use deployments::{CreateDeploymentRequest, DeploymentResponse};
15-
use metrics::MetricsResponse;
15+
use metrics::{CloudflareErrorLog, ErrorLogsResponse, MetricsResponse};
1616
use responses::CloudflareResponse;
1717
use routes::{CloudflareRoute, CreateCloudflareRouteRequest, UpdateCloudflareRouteRequest};
1818

@@ -33,6 +33,8 @@ pub struct CloudflareClient {
3333
account_id: String,
3434
/// The name of the Cloudflare worker
3535
worker_name: String,
36+
/// The API token used for authentication
37+
api_token: String,
3638
}
3739

3840
impl CloudflareClient {
@@ -51,6 +53,7 @@ impl CloudflareClient {
5153

5254
Self {
5355
client,
56+
api_token: token.to_string(),
5457
account_id,
5558
worker_name,
5659
}
@@ -569,6 +572,87 @@ impl CloudflareClient {
569572
Ok(())
570573
}
571574

575+
/// Collects the logs produced by Cloudflare worker invocations that result in errors
576+
/// Only collects 5xx errrors for now.
577+
/// Returns grouped error logs maintaining the original invocation structure.
578+
pub async fn collect_errors(
579+
&self,
580+
cf_worker_name: String,
581+
from_time: DateTime<chrono::Utc>,
582+
to_time: DateTime<chrono::Utc>,
583+
) -> Result<Vec<CloudflareErrorLog>> {
584+
let account_id = &self.account_id;
585+
let path = format!("accounts/{account_id}/workers/observability/telemetry/query");
586+
let url = Self::url_with_path(&path);
587+
588+
// Convert DateTime to Unix Millisecond timestamps
589+
let from_timestamp = from_time.timestamp_millis() as u64;
590+
let to_timestamp = to_time.timestamp_millis() as u64;
591+
592+
let query_body = serde_json::json!({
593+
"view": "invocations",
594+
"queryId": "workers-logs-invocations",
595+
"parameters": {
596+
"datasets": [],
597+
"filters": [
598+
{
599+
"key": "$metadata.service",
600+
"operation": "eq",
601+
"type": "string",
602+
"value": cf_worker_name
603+
},
604+
{
605+
"key": "$workers.event.response.status",
606+
"operation": "gte",
607+
"type": "number",
608+
"value": 500
609+
},
610+
{
611+
"key": "$workers.event.response.status",
612+
"operation": "lte",
613+
"type": "number",
614+
"value": 599
615+
}
616+
],
617+
"calculations": [],
618+
"groupBys": [],
619+
"havings": []
620+
},
621+
"timeframe": {
622+
"to": to_timestamp,
623+
"from": from_timestamp
624+
}
625+
});
626+
627+
let response = self
628+
.client
629+
.post(url)
630+
.json(&query_body)
631+
.send()
632+
.await
633+
.into_diagnostic()?;
634+
635+
// If there's an error, just return empty results
636+
if !response.status().is_success() {
637+
error!(
638+
"Failed to query Cloudflare error logs for worker: {}, error: {:?}",
639+
cf_worker_name,
640+
response.json::<serde_json::Value>().await
641+
);
642+
return Ok(Vec::new());
643+
}
644+
645+
let error_logs_response = response
646+
.json::<CloudflareResponse<ErrorLogsResponse>>()
647+
.await
648+
.into_diagnostic()?;
649+
650+
// Convert the enourmous CF response into our error log groups
651+
let error_log_groups = error_logs_response.result.into();
652+
653+
Ok(error_log_groups)
654+
}
655+
572656
fn base_url() -> &'static Url {
573657
URL.get_or_init(init_url)
574658
}

src/adapters/monitors/cloudflare.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ impl Monitor for CloudflareMonitor {
4747

4848
fn get_config(&self) -> MonitorConfig {
4949
MonitorConfig::CloudflareWorkersObservability {
50+
api_token: self.client.api_token().clone(),
5051
account_id: self.client.account_id().clone(),
5152
worker_name: self.client.worker_name().clone(),
5253
}

src/config/run/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ pub struct RunSubcommand {
1010
workspace: Option<String>,
1111
#[arg(short, long, env = "MULTI_APPLICATION")]
1212
application: Option<String>,
13-
#[arg(long, short = 'o', default_value = Some(MULTITOOL_ORIGIN))]
13+
#[arg(long, short = 'o', default_value = Some(MULTITOOL_ORIGIN), env = "ORIGIN")]
1414
origin: Option<String>,
1515

1616
///Cloudflare config

src/subsystems/controller/mod.rs

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,13 @@ use async_trait::async_trait;
22
use bon::bon;
33
use miette::{Report, Result};
44
use tokio_graceful_shutdown::{IntoSubsystem, SubsystemBuilder, SubsystemHandle};
5-
use tracing::{debug, trace};
5+
use tracing::{debug, error, trace};
66

77
use crate::adapters::{BackendClient, BoxedIngress, BoxedMonitor, BoxedPlatform, RolloutMetadata};
88
use crate::subsystems::PLATFORM_SUBSYSTEM_NAME;
99
use crate::{IngressSubsystem, PlatformSubsystem};
1010

11+
use crate::subsystems::{ERROR_LOGS_SUBSYSTEM_NAME, ErrorLogsController};
1112
use monitor::{MONITOR_CONTROLLER_SUBSYSTEM_NAME, MonitorController};
1213

1314
use super::{INGRESS_SUBSYSTEM_NAME, RELAY_SUBSYSTEM_NAME, RelaySubsystem};
@@ -62,22 +63,33 @@ impl IntoSubsystem<Report> for ControllerSubsystem {
6263
let platform_subsystem = PlatformSubsystem::new(self.platform);
6364
let platform_handle = platform_subsystem.handle();
6465

66+
// Extract monitor config before moving the monitor so we can use it
67+
// in the ErrorLogsController.
68+
let monitor_config = self.monitor.get_config();
69+
6570
let mut monitor_controller = MonitorController::builder().monitor(self.monitor).build();
6671
let observation_stream = monitor_controller.stream()?;
6772

6873
let baseline_sender = monitor_controller.get_baseline_sender();
6974
let canary_sender = monitor_controller.get_canary_sender();
7075

7176
let relay_subsystem = RelaySubsystem::builder()
72-
.backend(self.backend)
77+
.backend(self.backend.clone())
7378
.observations(observation_stream)
7479
.platform(platform_handle)
7580
.ingress(ingress_handle)
76-
.meta(self.meta)
81+
.meta(self.meta.clone())
7782
.baseline_sender(baseline_sender)
7883
.canary_sender(canary_sender)
7984
.build();
8085

86+
let error_logs_controller = ErrorLogsController::builder()
87+
.metadata(self.meta)
88+
.monitor(monitor_config)
89+
.backend(self.backend)
90+
.build()
91+
.map_err(|e| miette::miette!("Failed to create ErrorLogsController: {}", e))?;
92+
8193
// • Start the ingress subsystem.
8294
subsys.start(SubsystemBuilder::new(
8395
INGRESS_SUBSYSTEM_NAME,
@@ -102,6 +114,11 @@ impl IntoSubsystem<Report> for ControllerSubsystem {
102114
relay_subsystem.into_subsystem(),
103115
));
104116

117+
subsys.start(SubsystemBuilder::new(
118+
ERROR_LOGS_SUBSYSTEM_NAME,
119+
error_logs_controller.into_subsystem(),
120+
));
121+
105122
subsys.wait_for_children().await;
106123
Ok(())
107124
}

0 commit comments

Comments
 (0)