Clean up output and stopping order

This commit is contained in:
2025-12-05 19:51:14 +01:00
parent 454776fcf2
commit 9f0c253d30
2 changed files with 78 additions and 22 deletions

View File

@@ -146,22 +146,28 @@ impl Display for LogLevel {
impl StreamLine { impl StreamLine {
pub fn new<S: Into<String>>(line: S, source: StreamSource) -> Self { pub fn new<S: Into<String>>(line: S, source: StreamSource) -> Self {
Self { let line = line.into();
line: line.into(), let re = Regex::new(r#"^\[[^\]]*\]\s*\[[^\]]*\]:\s*"#).unwrap();
source, let line = re.replace(&line, "").to_string();
} Self { line, source }
} }
pub fn stdout<S: Into<String>>(line: S) -> Self { pub fn stdout<S: Into<String>>(line: S) -> Self {
let line = line.into();
let re = Regex::new(r#"^\[[^\]]*\]\s*\[[^\]]*\]:\s*"#).unwrap();
let line = re.replace(&line, "").to_string();
Self { Self {
line: line.into(), line,
source: StreamSource::Stdout, source: StreamSource::Stdout,
} }
} }
pub fn stderr<S: Into<String>>(line: S) -> Self { pub fn stderr<S: Into<String>>(line: S) -> Self {
let line = line.into();
let re = Regex::new(r#"^\[[^\]]*\]\s*\[[^\]]*\]:\s*"#).unwrap();
let line = re.replace(&line, "").to_string();
Self { Self {
line: line.into(), line,
source: StreamSource::Stderr, source: StreamSource::Stderr,
} }
} }
@@ -233,13 +239,13 @@ impl Display for InstanceEvent {
match self.payload.clone() { match self.payload.clone() {
EventPayload::StdLine { line } => { EventPayload::StdLine { line } => {
let full = format!("{}{}", head, line); let full = format!("{}{}", head, line);
write!(f, "{}", full) write!(f, "{}\n", full)
} }
#[cfg(feature = "events")] #[cfg(feature = "events")]
EventPayload::StateChange { old, new } => { EventPayload::StateChange { old, new } => {
let full = format!("{}State changed: {:?} -> {:?}", head, old, new); let full = format!("{}State changed: {:?} -> {:?}", head, old, new);
write!(f, "{}", full) write!(f, "{}\n", full)
} }
} }
} }

View File

@@ -1,10 +1,11 @@
use std::{path::PathBuf, process::Stdio, sync::Arc}; use std::{path::PathBuf, process::Stdio, sync::Arc, time::Duration};
use chrono::Utc; use chrono::Utc;
use tokio::{ use tokio::{
io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter}, io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter},
process::{self, Child}, process::{self, Child},
sync::{RwLock, broadcast, mpsc}, sync::{RwLock, broadcast, mpsc},
time::sleep,
}; };
use tokio_stream::wrappers::BroadcastStream; use tokio_stream::wrappers::BroadcastStream;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
@@ -165,7 +166,7 @@ impl InstanceHandle {
payload: EventPayload::StateChange { old, new }, payload: EventPayload::StateChange { old, new },
}; };
self.internal_tx.send(event); _ = self.internal_tx.send(event).await;
} }
fn build_start_command(&self) -> process::Command { fn build_start_command(&self) -> process::Command {
@@ -202,6 +203,8 @@ impl InstanceHandle {
let stdout_status = self.status.clone(); let stdout_status = self.status.clone();
let stderr_status = self.status.clone(); let stderr_status = self.status.clone();
let internal_tx1 = self.internal_tx.clone();
let internal_tx2 = self.internal_tx.clone();
tokio::spawn(async move { tokio::spawn(async move {
let mut stdout_reader = BufReader::new(stdout).lines(); let mut stdout_reader = BufReader::new(stdout).lines();
@@ -212,12 +215,24 @@ impl InstanceHandle {
} }
_ => { _ => {
let status_guard = stdout_status.read().await; let status_guard = stdout_status.read().await;
if *status_guard != InstanceStatus::Killing let state = status_guard.clone();
|| *status_guard != InstanceStatus::Stopping if state == InstanceStatus::Running && state == InstanceStatus::Starting {
{ let old = status_guard.clone();
drop(status_guard); drop(status_guard);
let mut status = stdout_status.write().await; let mut status = stdout_status.write().await;
*status = InstanceStatus::Crashed; *status = InstanceStatus::Crashed;
let event = InstanceEvent {
id: Uuid::new_v4(),
timestamp: Utc::now(),
payload: EventPayload::StateChange {
old,
new: status.clone(),
},
};
_ = internal_tx1.send(event).await;
drop(status); drop(status);
break; break;
} }
@@ -236,12 +251,24 @@ impl InstanceHandle {
} }
_ => { _ => {
let status_guard = stderr_status.read().await; let status_guard = stderr_status.read().await;
if *status_guard != InstanceStatus::Killing let state = status_guard.clone();
|| *status_guard != InstanceStatus::Stopping if state == InstanceStatus::Running && state == InstanceStatus::Starting {
{ let old = status_guard.clone();
drop(status_guard); drop(status_guard);
let mut status = stderr_status.write().await; let mut status = stderr_status.write().await;
*status = InstanceStatus::Crashed; *status = InstanceStatus::Crashed;
let event = InstanceEvent {
id: Uuid::new_v4(),
timestamp: Utc::now(),
payload: EventPayload::StateChange {
old,
new: status.clone(),
},
};
_ = internal_tx2.send(event).await;
drop(status); drop(status);
break; break;
} }
@@ -279,9 +306,31 @@ impl InstanceHandle {
let stdout_stream = self let stdout_stream = self
.subscribe(StreamSource::Stdout) .subscribe(StreamSource::Stdout)
.map_err(|_| ServerError::NoStdoutPipe)?; .map_err(|_| ServerError::NoStdoutPipe)?;
let shutdown = self.shutdown.clone(); let shutdown1 = self.shutdown.clone();
let shutdown2 = self.shutdown.clone();
let _event_tx = self.events_tx.clone(); let _event_tx = self.events_tx.clone();
if let Some(mut internal_rx) = self.internal_rx.take() {
tokio::spawn(async move {
loop {
tokio::select! {
_ = shutdown1.cancelled() => {
break;
}
maybe_event = internal_rx.recv() => {
match maybe_event {
Some(event) => {
println!("event: {}", event);
}
_ => (),
}
}
}
}
});
}
#[cfg(feature = "mc-vanilla")] #[cfg(feature = "mc-vanilla")]
if self.data.mc_type == MinecraftType::Vanilla { if self.data.mc_type == MinecraftType::Vanilla {
tokio::spawn(async move { tokio::spawn(async move {
@@ -289,7 +338,7 @@ impl InstanceHandle {
loop { loop {
tokio::select! { tokio::select! {
_ = shutdown.cancelled() => { _ = shutdown2.cancelled() => {
break; break;
} }
line = rx.next() => { line = rx.next() => {
@@ -312,10 +361,10 @@ impl InstanceHandle {
child.kill().await.map_err(|_| ServerError::CommandFailed)?; child.kill().await.map_err(|_| ServerError::CommandFailed)?;
self.transition_status(InstanceStatus::Killed).await;
sleep(Duration::from_secs(1));
self.shutdown.cancel(); self.shutdown.cancel();
self.child = None; self.child = None;
self.transition_status(InstanceStatus::Killed).await;
Ok(()) Ok(())
} else { } else {
Err(ServerError::NotRunning) Err(ServerError::NotRunning)
@@ -329,10 +378,11 @@ impl InstanceHandle {
_ = self.send_command("stop").await; _ = self.send_command("stop").await;
let mut child = child_arc.write().await; let mut child = child_arc.write().await;
child.wait().await.map_err(|_| ServerError::CommandFailed)?; child.wait().await.map_err(|_| ServerError::CommandFailed)?;
self.shutdown.cancel();
self.child = None;
self.transition_status(InstanceStatus::Stopped).await; self.transition_status(InstanceStatus::Stopped).await;
sleep(Duration::from_secs(1));
self.shutdown.cancel();
self.child = None;
Ok(()) Ok(())
} else { } else {
Err(ServerError::NotRunning) Err(ServerError::NotRunning)