From 9f0c253d30eb209ff844879180df44329373063f Mon Sep 17 00:00:00 2001 From: Hector van der Aa Date: Fri, 5 Dec 2025 19:51:14 +0100 Subject: [PATCH] Clean up output and stopping order --- src/config/stream.rs | 22 ++++++++----- src/instance.rs | 78 ++++++++++++++++++++++++++++++++++++-------- 2 files changed, 78 insertions(+), 22 deletions(-) diff --git a/src/config/stream.rs b/src/config/stream.rs index bd27850..5ebd259 100644 --- a/src/config/stream.rs +++ b/src/config/stream.rs @@ -146,22 +146,28 @@ impl Display for LogLevel { impl StreamLine { pub fn new>(line: S, source: StreamSource) -> Self { - Self { - line: line.into(), - source, - } + let line = line.into(); + let re = Regex::new(r#"^\[[^\]]*\]\s*\[[^\]]*\]:\s*"#).unwrap(); + let line = re.replace(&line, "").to_string(); + Self { line, source } } pub fn stdout>(line: S) -> Self { + let line = line.into(); + let re = Regex::new(r#"^\[[^\]]*\]\s*\[[^\]]*\]:\s*"#).unwrap(); + let line = re.replace(&line, "").to_string(); Self { - line: line.into(), + line, source: StreamSource::Stdout, } } pub fn stderr>(line: S) -> Self { + let line = line.into(); + let re = Regex::new(r#"^\[[^\]]*\]\s*\[[^\]]*\]:\s*"#).unwrap(); + let line = re.replace(&line, "").to_string(); Self { - line: line.into(), + line, source: StreamSource::Stderr, } } @@ -233,13 +239,13 @@ impl Display for InstanceEvent { match self.payload.clone() { EventPayload::StdLine { line } => { let full = format!("{}{}", head, line); - write!(f, "{}", full) + write!(f, "{}\n", full) } #[cfg(feature = "events")] EventPayload::StateChange { old, new } => { let full = format!("{}State changed: {:?} -> {:?}", head, old, new); - write!(f, "{}", full) + write!(f, "{}\n", full) } } } diff --git a/src/instance.rs b/src/instance.rs index 00d4a8c..71e56ea 100644 --- a/src/instance.rs +++ b/src/instance.rs @@ -1,10 +1,11 @@ -use std::{path::PathBuf, process::Stdio, sync::Arc}; +use std::{path::PathBuf, process::Stdio, sync::Arc, time::Duration}; use chrono::Utc; use tokio::{ io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter}, process::{self, Child}, sync::{RwLock, broadcast, mpsc}, + time::sleep, }; use tokio_stream::wrappers::BroadcastStream; use tokio_util::sync::CancellationToken; @@ -165,7 +166,7 @@ impl InstanceHandle { payload: EventPayload::StateChange { old, new }, }; - self.internal_tx.send(event); + _ = self.internal_tx.send(event).await; } fn build_start_command(&self) -> process::Command { @@ -202,6 +203,8 @@ impl InstanceHandle { let stdout_status = self.status.clone(); let stderr_status = self.status.clone(); + let internal_tx1 = self.internal_tx.clone(); + let internal_tx2 = self.internal_tx.clone(); tokio::spawn(async move { let mut stdout_reader = BufReader::new(stdout).lines(); @@ -212,12 +215,24 @@ impl InstanceHandle { } _ => { let status_guard = stdout_status.read().await; - if *status_guard != InstanceStatus::Killing - || *status_guard != InstanceStatus::Stopping - { + let state = status_guard.clone(); + if state == InstanceStatus::Running && state == InstanceStatus::Starting { + let old = status_guard.clone(); drop(status_guard); let mut status = stdout_status.write().await; *status = InstanceStatus::Crashed; + let event = InstanceEvent { + id: Uuid::new_v4(), + + timestamp: Utc::now(), + + payload: EventPayload::StateChange { + old, + new: status.clone(), + }, + }; + + _ = internal_tx1.send(event).await; drop(status); break; } @@ -236,12 +251,24 @@ impl InstanceHandle { } _ => { let status_guard = stderr_status.read().await; - if *status_guard != InstanceStatus::Killing - || *status_guard != InstanceStatus::Stopping - { + let state = status_guard.clone(); + if state == InstanceStatus::Running && state == InstanceStatus::Starting { + let old = status_guard.clone(); drop(status_guard); let mut status = stderr_status.write().await; *status = InstanceStatus::Crashed; + let event = InstanceEvent { + id: Uuid::new_v4(), + + timestamp: Utc::now(), + + payload: EventPayload::StateChange { + old, + new: status.clone(), + }, + }; + + _ = internal_tx2.send(event).await; drop(status); break; } @@ -279,9 +306,31 @@ impl InstanceHandle { let stdout_stream = self .subscribe(StreamSource::Stdout) .map_err(|_| ServerError::NoStdoutPipe)?; - let shutdown = self.shutdown.clone(); + let shutdown1 = self.shutdown.clone(); + let shutdown2 = self.shutdown.clone(); let _event_tx = self.events_tx.clone(); + if let Some(mut internal_rx) = self.internal_rx.take() { + tokio::spawn(async move { + loop { + tokio::select! { + _ = shutdown1.cancelled() => { + break; + } + + maybe_event = internal_rx.recv() => { + match maybe_event { + Some(event) => { + println!("event: {}", event); + } + _ => (), + } + } + } + } + }); + } + #[cfg(feature = "mc-vanilla")] if self.data.mc_type == MinecraftType::Vanilla { tokio::spawn(async move { @@ -289,7 +338,7 @@ impl InstanceHandle { loop { tokio::select! { - _ = shutdown.cancelled() => { + _ = shutdown2.cancelled() => { break; } line = rx.next() => { @@ -312,10 +361,10 @@ impl InstanceHandle { child.kill().await.map_err(|_| ServerError::CommandFailed)?; + self.transition_status(InstanceStatus::Killed).await; + sleep(Duration::from_secs(1)); self.shutdown.cancel(); self.child = None; - - self.transition_status(InstanceStatus::Killed).await; Ok(()) } else { Err(ServerError::NotRunning) @@ -329,10 +378,11 @@ impl InstanceHandle { _ = self.send_command("stop").await; let mut child = child_arc.write().await; child.wait().await.map_err(|_| ServerError::CommandFailed)?; - self.shutdown.cancel(); - self.child = None; self.transition_status(InstanceStatus::Stopped).await; + sleep(Duration::from_secs(1)); + self.shutdown.cancel(); + self.child = None; Ok(()) } else { Err(ServerError::NotRunning)