From 9d3d054b2abd0838ea886e14dc1d0f9140ab5afc Mon Sep 17 00:00:00 2001 From: Hector van der Aa <103751865+H3ct0r55@users.noreply.github.com> Date: Wed, 3 Dec 2025 21:44:11 +0100 Subject: [PATCH] Refactor start logic and add stderr streaming --- src/config.rs | 9 +++++++- src/error.rs | 6 +++++ src/instance.rs | 61 ++++++++++++++++++++++++++++++++++++++++--------- 3 files changed, 64 insertions(+), 12 deletions(-) diff --git a/src/config.rs b/src/config.rs index 28823b6..842bbcc 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,5 +1,5 @@ use std::{ - fmt::{self, Display, write}, + fmt::{self, Display}, str::FromStr, }; @@ -151,6 +151,13 @@ impl StreamLine { source: StreamSource::Stdout, } } + + pub fn stderr>(line: S) -> Self { + Self { + line: line.into(), + source: StreamSource::Stderr, + } + } } impl Display for StreamLine { diff --git a/src/error.rs b/src/error.rs index 2fbee55..ba2e74f 100644 --- a/src/error.rs +++ b/src/error.rs @@ -61,6 +61,9 @@ pub enum HandleError { pub enum SubscribeError { #[error("No stdout found")] NoStdout, + + #[error("No stderr found")] + NoStderr, } #[derive(Debug, Clone, Error)] @@ -83,6 +86,9 @@ pub enum ServerError { #[error("Failed to access child stdin pipe")] NoStdinPipe, + #[error("Failed to access child stderr pipe")] + NoStderrPipe, + #[error("Failed to write to stdin")] StdinWriteFailed, } diff --git a/src/instance.rs b/src/instance.rs index 360858a..b1a4867 100644 --- a/src/instance.rs +++ b/src/instance.rs @@ -103,36 +103,63 @@ impl InstanceHandle { } pub async fn start(&mut self) -> Result<(), ServerError> { + self.validate_start_parameters().await?; + + self.transition_status(InstanceStatus::Starting).await; + + let command = self.build_start_command(); + let child = self.spawn_child_process(command)?; + + self.setup_stream_pumps(child)?; + + self.transition_status(InstanceStatus::Running).await; + + Ok(()) + } + + async fn validate_start_parameters(&self) -> Result<(), ServerError> { if self.child.is_some() { return Err(ServerError::AlreadyRunning); } - let mut status = self.status.write().await; - *status = InstanceStatus::Starting; + Ok(()) + } - let jar_path: PathBuf = self.data.jar_path.clone(); - let root_dir: PathBuf = self.data.root_dir.clone(); + async fn transition_status(&self, status: InstanceStatus) { + let mut guard = self.status.write().await; + *guard = status; + } + fn build_start_command(&self) -> process::Command { let mut command = process::Command::new("java"); command .arg("-jar") - .arg(&jar_path) + .arg(&self.data.jar_path) .arg("nogui") - .current_dir(&root_dir) + .current_dir(&self.data.root_dir) .stdout(Stdio::piped()) + .stderr(Stdio::piped()) .stdin(Stdio::piped()); command.process_group(0); + command + } - let mut child = command.spawn().map_err(|_| ServerError::CommandFailed)?; + fn spawn_child_process(&self, mut command: process::Command) -> Result { + command.spawn().map_err(|_| ServerError::CommandFailed) + } + fn setup_stream_pumps(&mut self, mut child: Child) -> Result<(), ServerError> { let stdout = child.stdout.take().ok_or(ServerError::NoStdoutPipe)?; + let stderr = child.stderr.take().ok_or(ServerError::NoStderrPipe)?; let stdin = child.stdin.take().ok_or(ServerError::NoStdinPipe)?; let child = Arc::new(RwLock::new(child)); self.child = Some(child); let stdout_tx = self.stdout_tx.clone(); + let stderr_tx = broadcast::Sender::new(2048); + self.stderr_tx = Some(stderr_tx.clone()); let shutdown = self.shutdown.clone(); tokio::spawn(async move { @@ -150,6 +177,21 @@ impl InstanceHandle { } }); + tokio::spawn(async move { + let mut stderr_reader = BufReader::new(stderr).lines(); + loop { + match stderr_reader.next_line().await { + Ok(Some(line)) => { + let _ = stderr_tx.send(StreamLine::stderr(line)); + } + Ok(None) => { + break; + } + _ => break, + } + } + }); + let mut stdin_rx = self.stdin_rx.take().ok_or(ServerError::NoStdinPipe)?; tokio::spawn(async move { @@ -173,9 +215,6 @@ impl InstanceHandle { } }); - let mut status = self.status.write().await; - *status = InstanceStatus::Running; - Ok(()) } @@ -229,7 +268,7 @@ impl InstanceHandle { StreamSource::Stderr => { let rx = match &self.stderr_tx { Some(value) => value.subscribe(), - None => return Err(SubscribeError::NoStdout), + None => return Err(SubscribeError::NoStderr), }; Ok(BroadcastStream::new(rx)) }