use futures_util::TryStreamExt; use sqlx::{postgres::PgPool, Row}; use std::cmp::max; use std::error::Error; use std::fmt::Display; use std::path::{Path, PathBuf}; use time::ext::NumericalStdDuration; use time::OffsetDateTime; use tokio::fs; use tokio::sync::mpsc::Receiver; use tokio::time::timeout; pub(crate) async fn delete_old_files( mut receiver: Receiver<()>, db: PgPool, files_dir: PathBuf, ) -> Result<(), DeletionError> { loop { wait_for_file_expiry(&mut receiver, &db).await?; let now = OffsetDateTime::now_utc(); let mut rows = sqlx::query("SELECT file_id FROM files WHERE files.valid_till < $1") .bind(now) .fetch(&db); while let Some(row) = rows.try_next().await? { let file_id: String = row.try_get("file_id").expect("we selected this column"); delete_content(&file_id, &files_dir).await? } sqlx::query("DELETE FROM files WHERE valid_till < $1") .bind(now) .execute(&db) .await?; } } pub(crate) async fn delete_by_id( db: &PgPool, file_id: &str, files_dir: &Path, ) -> Result<(), sqlx::Error> { delete_content(file_id, files_dir).await?; sqlx::query("DELETE FROM files WHERE file_id = $1") .bind(file_id) .execute(db) .await?; Ok(()) } async fn delete_content(file_id: &str, files_dir: &Path) -> Result<(), std::io::Error> { let path = files_dir.join(file_id); if fs::try_exists(&path).await? { fs::remove_file(&path).await?; log::info!("deleted file {}", file_id); } else { log::warn!("expiring file {} was missing from the filesystem", file_id); } Ok(()) } async fn wait_for_file_expiry( receiver: &mut Receiver<()>, db: &PgPool, ) -> Result<(), DeletionError> { let valid_till: (Option,) = sqlx::query_as("SELECT MIN(valid_till) as min from files") .fetch_one(db) .await?; let next_timeout = match valid_till.0 { Some(valid_till) => (max( 0, valid_till.unix_timestamp() - OffsetDateTime::now_utc().unix_timestamp(), ) as u64) .std_seconds(), None => 1_u64.std_days(), }; let _ = timeout(next_timeout, receiver.recv()).await; Ok(()) } #[derive(Debug)] pub enum DeletionError { Db(sqlx::Error), Fs(std::io::Error), } impl From for DeletionError { fn from(value: sqlx::Error) -> Self { DeletionError::Db(value) } } impl From for DeletionError { fn from(value: std::io::Error) -> Self { DeletionError::Fs(value) } } impl Display for DeletionError { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { match self { DeletionError::Db(_) => write!(f, "Failed to fetch expired files from database"), DeletionError::Fs(_) => write!(f, "Failed to delete file from filesystem"), } } } impl Error for DeletionError { fn source(&self) -> Option<&(dyn Error + 'static)> { match self { DeletionError::Db(err) => Some(err), DeletionError::Fs(err) => Some(err), } } }