From 98d5e59413971e001677fc6f4fa5c82e6481a582 Mon Sep 17 00:00:00 2001 From: Hector van der Aa Date: Wed, 3 Dec 2025 23:05:22 +0100 Subject: [PATCH] Events feature skeleton --- Cargo.toml | 2 +- src/config/stream.rs | 8 ++++ src/error.rs | 2 - src/instance.rs | 90 ++++++++++++++++++++++++++++++++++++-------- 4 files changed, 84 insertions(+), 18 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index c6e8313..76aa4e2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,7 +11,7 @@ license = "MIT" publish = false [features] -default = ["core"] +default = ["core", "events"] # Core runtime requirements for the currently implemented functionality. core = ["dep:thiserror", "dep:tokio", "dep:tokio-stream", "dep:tokio-util"] # Placeholder for upcoming event-driven functionality. diff --git a/src/config/stream.rs b/src/config/stream.rs index 3b1b6f2..704d624 100644 --- a/src/config/stream.rs +++ b/src/config/stream.rs @@ -5,6 +5,8 @@ use std::fmt::{self, Display}; pub enum StreamSource { Stdout, Stderr, + #[cfg(feature = "events")] + Event, } /// Captures a single line of process output along with its origin stream. @@ -14,6 +16,12 @@ pub struct StreamLine { source: StreamSource, } +#[cfg(feature = "events")] +pub struct InstanceEvent {} + +#[cfg(feature = "events")] +pub enum Events {} + impl StreamLine { pub fn new>(line: S, source: StreamSource) -> Self { Self { diff --git a/src/error.rs b/src/error.rs index ba2e74f..a911508 100644 --- a/src/error.rs +++ b/src/error.rs @@ -92,5 +92,3 @@ pub enum ServerError { #[error("Failed to write to stdin")] StdinWriteFailed, } - -type Result = std::result::Result; diff --git a/src/instance.rs b/src/instance.rs index b1a4867..1b9a0ce 100644 --- a/src/instance.rs +++ b/src/instance.rs @@ -13,6 +13,8 @@ use crate::{ error::{HandleError, ServerError, SubscribeError}, }; +use tokio_stream::StreamExt; + #[derive(Debug, Clone)] pub struct InstanceData { pub root_dir: PathBuf, @@ -38,6 +40,8 @@ pub struct InstanceHandle { pub status: Arc>, stdout_tx: broadcast::Sender, stderr_tx: Option>, + #[cfg(feature = "events")] + events_tx: broadcast::Sender, stdin_tx: mpsc::Sender, stdin_rx: Option>, child: Option>>, @@ -81,6 +85,8 @@ impl InstanceHandle { status: Arc::new(RwLock::new(status)), stdout_tx: broadcast::Sender::new(2048), stderr_tx: None, + #[cfg(feature = "events")] + events_tx: broadcast::Sender::new(2048), stdin_tx, stdin_rx: Some(stdin_rx), child: None, @@ -114,6 +120,8 @@ impl InstanceHandle { self.transition_status(InstanceStatus::Running).await; + self.setup_parser()?; + Ok(()) } @@ -128,6 +136,7 @@ impl InstanceHandle { async fn transition_status(&self, status: InstanceStatus) { let mut guard = self.status.write().await; *guard = status; + drop(guard); } fn build_start_command(&self) -> process::Command { @@ -162,6 +171,9 @@ impl InstanceHandle { self.stderr_tx = Some(stderr_tx.clone()); let shutdown = self.shutdown.clone(); + let stdout_status = self.status.clone(); + let stderr_status = self.status.clone(); + tokio::spawn(async move { let mut stdout_reader = BufReader::new(stdout).lines(); loop { @@ -169,10 +181,19 @@ impl InstanceHandle { Ok(Some(line)) => { let _ = stdout_tx.send(StreamLine::stdout(line)); } - Ok(None) => { - break; + _ => { + let status_guard = stdout_status.read().await; + if *status_guard != InstanceStatus::Killing + || *status_guard != InstanceStatus::Stopping + { + drop(status_guard); + let mut status = stdout_status.write().await; + *status = InstanceStatus::Crashed; + drop(status); + break; + } + drop(status_guard); } - _ => break, } } }); @@ -184,10 +205,19 @@ impl InstanceHandle { Ok(Some(line)) => { let _ = stderr_tx.send(StreamLine::stderr(line)); } - Ok(None) => { - break; + _ => { + let status_guard = stderr_status.read().await; + if *status_guard != InstanceStatus::Killing + || *status_guard != InstanceStatus::Stopping + { + drop(status_guard); + let mut status = stderr_status.write().await; + *status = InstanceStatus::Crashed; + drop(status); + break; + } + drop(status_guard); } - _ => break, } } }); @@ -218,10 +248,38 @@ impl InstanceHandle { Ok(()) } + #[cfg(feature = "events")] + fn setup_parser(&mut self) -> Result<(), ServerError> { + let stdout_stream = self + .subscribe(StreamSource::Stdout) + .map_err(|_| ServerError::NoStdoutPipe)?; + let shutdown = self.shutdown.clone(); + let event_tx = self.events_tx.clone(); + + tokio::spawn(async move { + let mut rx = stdout_stream; + + loop { + tokio::select! { + _ = shutdown.cancelled() => { + break; + } + line = rx.next() => { + match line { + Some(Ok(_)) => { + }, + _ => (), + } + } + } + } + }); + Ok(()) + } + pub async fn kill(&mut self) -> Result<(), ServerError> { if let Some(child_arc) = self.child.clone() { - let mut status = self.status.write().await; - *status = InstanceStatus::Killing; + self.transition_status(InstanceStatus::Killing).await; let mut child = child_arc.write().await; child.kill().await.map_err(|_| ServerError::CommandFailed)?; @@ -229,8 +287,7 @@ impl InstanceHandle { self.shutdown.cancel(); self.child = None; - let mut status = self.status.write().await; - *status = InstanceStatus::Killed; + self.transition_status(InstanceStatus::Killed).await; Ok(()) } else { Err(ServerError::NotRunning) @@ -239,17 +296,15 @@ impl InstanceHandle { pub async fn stop(&mut self) -> Result<(), ServerError> { if let Some(child_arc) = self.child.clone() { - let mut status = self.status.write().await; - *status = InstanceStatus::Stopping; + self.transition_status(InstanceStatus::Stopping).await; _ = self.send_command("stop").await; - let mut child = child_arc.write().await; child.wait().await.map_err(|_| ServerError::CommandFailed)?; self.shutdown.cancel(); self.child = None; - let mut status = self.status.write().await; - *status = InstanceStatus::Stopped; + + self.transition_status(InstanceStatus::Stopped).await; Ok(()) } else { Err(ServerError::NotRunning) @@ -272,6 +327,11 @@ impl InstanceHandle { }; Ok(BroadcastStream::new(rx)) } + #[cfg(feature = "events")] + StreamSource::Event => { + let rx = self.events_tx.subscribe(); + Ok(BroadcastStream::new(rx)) + } } } }