pub mod http;
use crate::queue::Queue;
use crate::runtime::Counters;
use http::{HttpSender, SendError};
use std::path::PathBuf;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Mutex;
use tokio::time::sleep;
use tracing::warn;
pub struct SenderRuntime {
queue: Arc<Mutex<Queue>>,
http: HttpSender,
counters: Arc<Counters>,
health_path: PathBuf,
}
impl SenderRuntime {
pub fn new(
queue: Arc<Mutex<Queue>>,
http: HttpSender,
counters: Arc<Counters>,
health_path: PathBuf,
) -> Self {
Self {
queue,
http,
counters,
health_path,
}
}
pub async fn flush_one(&self) -> Result<Option<PathBuf>, SendError> {
let (seg, dropped) = {
let q = self.queue.lock().await;
(
q.claim_oldest_segment().map_err(|_| SendError::Other)?,
q.dropped,
)
};
let seg = match seg {
Some(s) => s,
None => return Ok(None),
};
self.counters
.events_dropped_disk
.store(dropped, Ordering::Relaxed);
let bytes = std::fs::metadata(&seg).map(|m| m.len()).unwrap_or(0);
match self.http.send_segment(&seg, dropped).await {
Ok(()) => {}
Err(SendError::BadRequest) => {
let q = self.queue.lock().await;
q.complete_claim(&seg).map_err(|_| SendError::Other)?;
return Err(SendError::BadRequest);
}
Err(e) => {
let q = self.queue.lock().await;
if let Err(restore_err) = q.restore_claim(&seg) {
warn!(?restore_err, segment = %seg.display(), "telemetry segment restore failed");
}
return Err(e);
}
}
{
let q = self.queue.lock().await;
if let Err(e) = q.complete_claim(&seg) {
warn!(?e, "delete segment failed");
}
}
self.counters
.segments_posted
.fetch_add(1, Ordering::Relaxed);
self.counters
.bytes_sent
.fetch_add(bytes, Ordering::Relaxed);
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_millis() as i64)
.unwrap_or(0);
self.counters
.last_post_unix_ms
.store(now_ms, Ordering::Relaxed);
tracing::info!(segment = %seg.display(), bytes, "telemetry segment posted");
self.persist_health();
Ok(Some(seg))
}
pub fn backoff(attempt: u32) -> Duration {
match attempt {
0 => Duration::from_secs(2),
1 => Duration::from_secs(8),
2 => Duration::from_secs(30),
3 => Duration::from_secs(120),
_ => Duration::from_secs(300),
}
}
pub async fn drain_with_backoff(&self) {
let mut attempt = 0u32;
loop {
match self.flush_one().await {
Ok(None) => break,
Ok(Some(_)) => {
attempt = 0;
}
Err(SendError::Unauthorized) => {
warn!("telemetry unauthorized — holding for 1h");
sleep(Duration::from_secs(3600)).await;
attempt = 0;
}
Err(SendError::BadRequest) => {
warn!("telemetry 400 — dropped claimed segment");
attempt = 0;
}
Err(SendError::RateLimited(Some(d))) => {
sleep(d).await;
attempt += 1;
}
Err(e) => {
warn!(?e, "telemetry send error; backing off");
sleep(Self::backoff(attempt)).await;
attempt = attempt.saturating_add(1);
}
}
}
}
fn persist_health(&self) {
let snap = self.counters.snapshot();
if let Ok(json) = serde_json::to_string(&snap) {
if let Some(parent) = self.health_path.parent() {
let _ = std::fs::create_dir_all(parent);
}
let _ = std::fs::write(&self.health_path, json);
}
}
}