use crate::config::Config;
use crate::immich_api::{Asset, FaceData, ImmichClient};
use crate::pipeline::{Pipeline, PipelineContext, PipelineResult};
use crate::web::AtomicSkipStats;
use super::TimeIntervalTracker;
use bytes::Bytes;
use image::ImageFormat;
use std::io::Cursor;
use std::path::PathBuf;
use std::sync::Arc;
use tokio_util::sync::CancellationToken;
#[derive(Debug, Clone)]
pub struct OutputDirs {
pub images: PathBuf,
pub video: PathBuf,
pub debug: Option<DebugDirs>,
}
#[derive(Debug, Clone)]
pub struct DebugDirs {
pub base: PathBuf,
}
#[derive(Debug)]
#[allow(dead_code)]
pub enum AssetProcessResult {
Success { asset_id: String },
Skipped { asset_id: String, reason: String },
Error { asset_id: String, error: String },
Cancelled { asset_id: String },
}
#[allow(clippy::too_many_arguments)]
pub async fn process_single_asset(
client: &ImmichClient,
config: &Config,
asset: &Asset,
face_data: &FaceData,
output_dirs: &OutputDirs,
cancel_token: &CancellationToken,
skip_stats: &Arc<AtomicSkipStats>,
pipeline: &Pipeline,
time_interval: Option<&Arc<TimeIntervalTracker>>,
) -> AssetProcessResult {
let asset_id = &asset.id;
let timestamp = asset
.file_created_at
.as_ref()
.or(asset.local_date_time.as_ref())
.cloned()
.unwrap_or_else(|| asset_id.clone());
let mut ctx = PipelineContext::new(asset_id.clone(), timestamp.clone(), face_data.clone());
if let Some(tracker) = time_interval {
if tracker.is_full(×tamp) {
tracing::debug!(
"Asset {} skipped early: time slot already full (timestamp: {})",
asset_id,
timestamp
);
skip_stats.increment("time_interval");
return AssetProcessResult::Skipped {
asset_id: asset_id.clone(),
reason: "Time interval too short".to_string(),
};
}
}
if cancel_token.is_cancelled() {
return AssetProcessResult::Cancelled {
asset_id: asset_id.clone(),
};
}
let download_result = if config.processing.use_preview {
client.download_asset_preview(asset_id).await
} else {
client.download_asset(asset_id).await
};
let image_bytes: Bytes = match download_result {
Ok(bytes) => bytes,
Err(e) => {
skip_stats.increment("download_failed");
return AssetProcessResult::Error {
asset_id: asset_id.clone(),
error: e.to_string(),
};
}
};
ctx = ctx.with_bytes(image_bytes);
let debug_dir = output_dirs.debug.as_ref().map(|d| d.base.clone());
let result = pipeline
.execute(ctx, config, cancel_token, skip_stats, debug_dir.as_ref())
.await;
match result {
PipelineResult::Success {
image,
asset_id,
timestamp,
..
} => {
if let Some(tracker) = time_interval {
if !tracker.try_claim(×tamp) {
tracing::debug!(
"Asset {} skipped: time interval too short (timestamp: {})",
asset_id,
timestamp
);
skip_stats.increment("time_interval");
return AssetProcessResult::Skipped {
asset_id,
reason: "Time interval too short".to_string(),
};
}
}
let safe_timestamp: String = timestamp
.chars()
.map(|c| {
if c.is_alphanumeric() || c == '-' || c == '_' {
c
} else {
'_'
}
})
.collect();
let filename = format!("{}_{}.jpg", safe_timestamp, asset_id);
let output_path = output_dirs.images.join(&filename);
let encoded = tokio::task::spawn_blocking(move || {
let mut buffer = Cursor::new(Vec::new());
image
.write_to(&mut buffer, ImageFormat::Jpeg)
.map(|_| buffer.into_inner())
})
.await;
let jpeg_bytes = match encoded {
Ok(Ok(bytes)) => bytes,
Ok(Err(e)) => {
return AssetProcessResult::Error {
asset_id,
error: format!("Failed to encode image: {}", e),
};
}
Err(e) => {
return AssetProcessResult::Error {
asset_id,
error: format!("Image encoding task panicked: {}", e),
};
}
};
if let Err(e) = tokio::fs::write(&output_path, jpeg_bytes).await {
return AssetProcessResult::Error {
asset_id,
error: format!("Failed to save image: {}", e),
};
}
skip_stats.increment_kept();
AssetProcessResult::Success { asset_id }
}
PipelineResult::Skipped {
asset_id, reason, ..
} => AssetProcessResult::Skipped { asset_id, reason },
PipelineResult::Error { asset_id, error } => AssetProcessResult::Error { asset_id, error },
PipelineResult::Cancelled { asset_id } => AssetProcessResult::Cancelled { asset_id },
}
}