Skip to content

Commit 933f825

Browse files
Implement Observation
This commit uploads observations in parallel. It appears our API only supports uploading a single observation at a time, but we collect a possible vector observations, one for each group, or potentially more depending on how many were created during the obseravtion period. This commit sends a request to the backend to upload a new observation for each observation, but it does it in parallel for performance.
1 parent e8313da commit 933f825

File tree

5 files changed

+90
-27
lines changed

5 files changed

+90
-27
lines changed

Cargo.lock

Lines changed: 15 additions & 15 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

src/adapters/backend/mod.rs

Lines changed: 49 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,20 @@ use std::sync::Arc;
33

44
use super::{
55
BoxedIngress, BoxedMonitor, BoxedPlatform, IngressBuilder, MonitorBuilder, PlatformBuilder,
6+
StatusCode,
67
};
78
use crate::Cli;
89
use crate::fs::UserCreds;
10+
use crate::{artifacts::LambdaZip, fs::Session, metrics::ResponseStatusCode};
11+
use miette::miette;
912
use miette::{IntoDiagnostic, Result, bail};
1013
use multitool_sdk::apis::{Api, ApiClient, configuration::Configuration};
11-
use multitool_sdk::models::{ApplicationDetails, LoginRequest, LoginSuccess, WorkspaceSummary};
12-
use uuid::Uuid;
13-
14-
use crate::{
15-
artifacts::LambdaZip, fs::Session, metrics::ResponseStatusCode, stats::CategoricalObservation,
14+
use multitool_sdk::models::{
15+
ApplicationDetails, ApplicationGroup, CreateResponseCodeMetricsRequest,
16+
CreateResponseCodeMetricsSuccess, LoginRequest, LoginSuccess, WorkspaceSummary,
1617
};
18+
use tokio::task::JoinSet;
19+
use uuid::Uuid;
1720

1821
pub(crate) use deploy_meta::*;
1922

@@ -159,8 +162,47 @@ impl BackendClient {
159162
}
160163

161164
/// Upload a batch of observations to the backend.
162-
pub async fn upload_observations(&self, data: Vec<()>) -> Result<()> {
163-
todo!();
165+
pub async fn upload_observations(
166+
&self,
167+
meta: &DeploymentMetadata,
168+
data: Vec<StatusCode>,
169+
) -> Result<()> {
170+
let mut req_waiter = JoinSet::new();
171+
172+
for item in data {
173+
let group = match item.group() {
174+
crate::stats::Group::Control => ApplicationGroup::Baseline,
175+
crate::stats::Group::Experimental => ApplicationGroup::Canary,
176+
};
177+
let req_body = CreateResponseCodeMetricsRequest {
178+
app_class: group,
179+
status_2xx_count: item.get_count(&ResponseStatusCode::_2XX) as i32,
180+
status_4xx_count: item.get_count(&ResponseStatusCode::_4XX) as i32,
181+
status_5xx_count: item.get_count(&ResponseStatusCode::_5XX) as i32,
182+
};
183+
let workspace_id = meta.workspace_id().to_string();
184+
let application_id = meta.application_id().to_string();
185+
let deployment_id = *meta.deployment_id();
186+
let cloned_client = self.clone();
187+
req_waiter.spawn_local(async move {
188+
cloned_client
189+
.client
190+
.response_code_metrics_api()
191+
.create_response_code_metrics(
192+
&workspace_id,
193+
&application_id,
194+
deployment_id,
195+
req_body,
196+
)
197+
.await
198+
});
199+
}
200+
let results = req_waiter.join_all().await;
201+
let result: std::result::Result<Vec<CreateResponseCodeMetricsSuccess>, _> =
202+
results.into_iter().collect();
203+
result
204+
.map(|_| ())
205+
.map_err(|err| miette!("Error uploading observation: {err}"))
164206
}
165207

166208
/// Return information about the workspace given its name.

src/stats/observation.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,14 @@ impl<const N: usize, Cat: Categorical<N>> CategoricalObservation<N, Cat> {
4040
pub fn increment_by(&mut self, category: &Cat, count: u32) {
4141
self.histogram.increment_by(category, count);
4242
}
43+
44+
pub fn group(&self) -> Group {
45+
self.group
46+
}
47+
48+
pub fn get_count(&self, cat: &Cat) -> u32 {
49+
self.histogram.get_count(cat)
50+
}
4351
}
4452

4553
impl<const N: usize, Cat: Categorical<N> + fmt::Debug> fmt::Debug

src/subsystems/relay/lock_mgmt.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,10 @@
1-
use crate::adapters::{BackendClient, DeploymentMetadata, StateId};
1+
use async_trait::async_trait;
2+
3+
use crate::{
4+
Shutdownable,
5+
adapters::{BackendClient, DeploymentMetadata, StateId},
6+
subsystems::ShutdownResult,
7+
};
28

39
pub(super) struct LockManagementSubsystem {
410
/// We use this client to refresh locks.
@@ -9,3 +15,11 @@ pub(super) struct LockManagementSubsystem {
915
/// This is the state that this manager is locking.
1016
state_id: StateId,
1117
}
18+
19+
#[async_trait]
20+
impl Shutdownable for LockManagementSubsystem {
21+
async fn shutdown(&mut self) -> ShutdownResult {
22+
// Release any of the locks we've taken.
23+
todo!()
24+
}
25+
}

src/subsystems/relay/mod.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use tokio::{select, sync::mpsc::Receiver, time::Duration};
77
use tokio_graceful_shutdown::{IntoSubsystem, SubsystemBuilder, SubsystemHandle};
88

99
use crate::{
10-
adapters::{BackendClient, BoxedIngress, BoxedPlatform, DeploymentMetadata},
10+
adapters::{BackendClient, BoxedIngress, BoxedPlatform, DeploymentMetadata, StatusCode},
1111
stats::Observation,
1212
};
1313

@@ -80,7 +80,7 @@ impl<T: Observation + Send + 'static> RelaySubsystem<T> {
8080
}
8181

8282
#[async_trait]
83-
impl<T: Observation + Send + Sync> IntoSubsystem<Report> for RelaySubsystem<T> {
83+
impl IntoSubsystem<Report> for RelaySubsystem<StatusCode> {
8484
async fn run(mut self, subsys: SubsystemHandle) -> Result<()> {
8585
// Kick off a task to poll the backend for new states.
8686
let mut poller = self.new_poller();
@@ -111,8 +111,7 @@ impl<T: Observation + Send + Sync> IntoSubsystem<Report> for RelaySubsystem<T> {
111111
// When a new observation arrives, we send it to the backend.
112112
elem = observations.recv() => {
113113
if let Some(batch) = elem {
114-
// self.backend.upload_observations(batch).await?;
115-
self.backend.upload_observations(vec![]).await?;
114+
self.backend.upload_observations(&self.meta, batch).await?;
116115
} else {
117116
// The stream has been closed, so we should shutdown.
118117
subsys.request_shutdown();

0 commit comments

Comments
 (0)