From 7782e71990360a12f07f55ba537e0716d3e93df0 Mon Sep 17 00:00:00 2001 From: Hector van der Aa Date: Mon, 8 Dec 2025 10:29:37 +0100 Subject: [PATCH] Full startup await with Done ()! parser --- src/config/stream/event.rs | 17 +++++++- src/config/stream/line.rs | 8 +--- src/config/stream/log.rs | 1 + src/config/stream/mod.rs | 1 + src/instance/handle.rs | 88 +++++++++++++++++++++++++++++--------- src/lib.rs | 1 + src/parser/mod.rs | 26 +++++++++++ 7 files changed, 114 insertions(+), 28 deletions(-) create mode 100644 src/parser/mod.rs diff --git a/src/config/stream/event.rs b/src/config/stream/event.rs index a1eb4f7..777865c 100644 --- a/src/config/stream/event.rs +++ b/src/config/stream/event.rs @@ -1,6 +1,6 @@ use std::fmt::{self, Display}; -use uuid::Uuid; +use uuid::{Uuid, timestamp}; use crate::instance::InstanceStatus; @@ -28,6 +28,11 @@ pub struct InstanceEvent { pub payload: EventPayload, } +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum InternalEvent { + ServerStarted, +} + impl InstanceEvent { pub fn stdout>(line: S) -> Self { let line = line.into(); @@ -54,6 +59,16 @@ impl InstanceEvent { payload, } } + + pub fn new(payload: EventPayload) -> Self { + let timestamp = chrono::Utc::now(); + + Self { + id: Uuid::new_v4(), + timestamp, + payload, + } + } } impl Display for InstanceEvent { diff --git a/src/config/stream/line.rs b/src/config/stream/line.rs index bf715c1..d44aba2 100644 --- a/src/config/stream/line.rs +++ b/src/config/stream/line.rs @@ -15,8 +15,8 @@ pub enum StreamSource { #[derive(Debug, Clone, PartialEq, Eq)] pub struct StreamLine { - line: String, - source: StreamSource, + pub line: String, + pub source: StreamSource, } impl StreamLine { @@ -29,8 +29,6 @@ impl StreamLine { pub fn stdout>(line: S) -> Self { let line = line.into(); - let re = Regex::new(r#"^\[[^\]]*\]\s*\[[^\]]*\]:\s*"#).unwrap(); - let line = re.replace(&line, "").to_string(); Self { line, source: StreamSource::Stdout, @@ -39,8 +37,6 @@ impl StreamLine { pub fn stderr>(line: S) -> Self { let line = line.into(); - let re = Regex::new(r#"^\[[^\]]*\]\s*\[[^\]]*\]:\s*"#).unwrap(); - let line = re.replace(&line, "").to_string(); Self { line, source: StreamSource::Stderr, diff --git a/src/config/stream/log.rs b/src/config/stream/log.rs index 39c464f..9449d49 100644 --- a/src/config/stream/log.rs +++ b/src/config/stream/log.rs @@ -11,6 +11,7 @@ pub struct LogMeta { } #[cfg(feature = "mc-vanilla")] +#[derive(Debug, Clone, Eq, PartialEq)] pub enum LogLevel { Info, Warn, diff --git a/src/config/stream/mod.rs b/src/config/stream/mod.rs index 4adb8d2..8685fc1 100644 --- a/src/config/stream/mod.rs +++ b/src/config/stream/mod.rs @@ -3,6 +3,7 @@ mod line; #[cfg(feature = "mc-vanilla")] mod log; +pub use event::InternalEvent; pub use event::{EventPayload, InstanceEvent}; pub use line::{StreamLine, StreamSource}; #[cfg(feature = "mc-vanilla")] diff --git a/src/instance/handle.rs b/src/instance/handle.rs index 2107fe2..78be3c1 100644 --- a/src/instance/handle.rs +++ b/src/instance/handle.rs @@ -15,7 +15,10 @@ use uuid::Uuid; #[cfg(feature = "events")] use crate::config::stream::InstanceEvent; use crate::{ - config::{MinecraftType, MinecraftVersion, StreamSource, stream::EventPayload}, + config::{ + MinecraftType, MinecraftVersion, StreamSource, + stream::{EventPayload, InternalEvent}, + }, error::{HandleError, ServerError, SubscribeError}, }; @@ -30,13 +33,14 @@ pub struct InstanceHandle { #[cfg(feature = "events")] events_tx: broadcast::Sender, #[cfg(feature = "events")] - internal_tx: mpsc::Sender, + internal_events_tx: mpsc::Sender, #[cfg(feature = "events")] - internal_rx: Option>, + internal_events_rx: Option>, stdin_tx: mpsc::Sender, stdin_rx: Option>, child: Option>>, shutdown: CancellationToken, + internal_bus_tx: broadcast::Sender, } impl InstanceHandle { @@ -82,13 +86,14 @@ impl InstanceHandle { #[cfg(feature = "events")] events_tx: broadcast::Sender::new(2048), #[cfg(feature = "events")] - internal_tx, + internal_events_tx: internal_tx, #[cfg(feature = "events")] - internal_rx: Some(internal_rx), + internal_events_rx: Some(internal_rx), stdin_tx, stdin_rx: Some(stdin_rx), child: None, shutdown: CancellationToken::new(), + internal_bus_tx: broadcast::Sender::new(2048), }) } @@ -108,6 +113,7 @@ impl InstanceHandle { pub async fn start(&mut self) -> Result<(), ServerError> { self.validate_start_parameters().await?; + self.setup_loopback()?; self.transition_status(InstanceStatus::Starting).await; @@ -116,10 +122,23 @@ impl InstanceHandle { self.setup_stream_pumps(child)?; - self.transition_status(InstanceStatus::Running).await; - self.setup_parser()?; + let mut rx = self.internal_bus_tx.subscribe(); + + loop { + match rx.recv().await { + Ok(event) => { + if event == InternalEvent::ServerStarted { + self.transition_status(InstanceStatus::Running).await; + break; + } + continue; + } + _ => continue, + } + } + Ok(()) } @@ -150,7 +169,7 @@ impl InstanceHandle { payload: EventPayload::StateChange { old, new }, }; - _ = self.internal_tx.send(event).await; + _ = self.internal_events_tx.send(event).await; } fn build_start_command(&self) -> process::Command { @@ -187,8 +206,8 @@ impl InstanceHandle { let stdout_status = self.status.clone(); let stderr_status = self.status.clone(); - let internal_tx1 = self.internal_tx.clone(); - let internal_tx2 = self.internal_tx.clone(); + let internal_tx1 = self.internal_events_tx.clone(); + let internal_tx2 = self.internal_events_tx.clone(); tokio::spawn(async move { let mut stdout_reader = BufReader::new(stdout).lines(); @@ -286,17 +305,14 @@ impl InstanceHandle { } #[cfg(all(feature = "events", any(feature = "mc-vanilla")))] - fn setup_parser(&mut self) -> Result<(), ServerError> { - let stdout_stream = self - .subscribe(StreamSource::Stdout) - .map_err(|_| ServerError::NoStdoutPipe)?; + fn setup_loopback(&mut self) -> Result<(), ServerError> { let shutdown1 = self.shutdown.clone(); - let shutdown2 = self.shutdown.clone(); - let event_tx = self.events_tx.clone(); - if let Some(mut internal_rx) = self.internal_rx.take() { + let event_tx1 = self.events_tx.clone(); + //internal mpsc to broadcast loopback + if let Some(mut internal_rx) = self.internal_events_rx.take() { tokio::spawn(async move { - let tx = event_tx; + let tx = event_tx1; loop { tokio::select! { _ = shutdown1.cancelled() => { @@ -312,20 +328,50 @@ impl InstanceHandle { } }); } + Ok(()) + } + + #[cfg(all(feature = "events", any(feature = "mc-vanilla")))] + fn setup_parser(&mut self) -> Result<(), ServerError> { + use crate::config::LogMeta; + + let stdout_stream = self + .subscribe(StreamSource::Stdout) + .map_err(|_| ServerError::NoStdoutPipe)?; + let shutdown2 = self.shutdown.clone(); + let bus_tx = self.internal_bus_tx.clone(); #[cfg(feature = "mc-vanilla")] if self.data.mc_type == MinecraftType::Vanilla { tokio::spawn(async move { let mut rx = stdout_stream; + let tx = bus_tx; loop { tokio::select! { _ = shutdown2.cancelled() => { break; } - line = rx.next() => { - if let Some(Ok(val)) = line { - // TODO: Call parser + next_line = rx.next() => { + if let Some(Ok(val)) = next_line { + let event_line = match val.payload { + EventPayload::StdLine{line} => { + line + }, + _ => continue, + }; + + let meta = match LogMeta::new(event_line.line) { + Ok(Some(log_meta)) => { + log_meta + }, + _ => continue, + }; + + match meta.parse_event() { + Ok(Some(event)) => _ = tx.send(event), + _ => continue, + } } } } diff --git a/src/lib.rs b/src/lib.rs index 92ef381..35bbafe 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -2,5 +2,6 @@ pub mod config; pub mod error; pub mod instance; pub mod manifests; +pub mod parser; pub mod server; pub mod utils; diff --git a/src/parser/mod.rs b/src/parser/mod.rs new file mode 100644 index 0000000..b1f64c6 --- /dev/null +++ b/src/parser/mod.rs @@ -0,0 +1,26 @@ +use regex::Regex; + +use crate::{ + config::{ + LogMeta, + stream::{EventPayload, InstanceEvent, InternalEvent, LogLevel}, + }, + error::ParserError, +}; + +impl LogMeta { + pub fn parse_event(&self) -> Result, ParserError> { + if self.thread == "Server thread" && self.level == LogLevel::Info { + return self.parse_server_thread_info_lv2(); + } + Ok(None) + } + + fn parse_server_thread_info_lv2(&self) -> Result, ParserError> { + let re = Regex::new(r"Done \([0-9.]+s\)!").unwrap(); + if re.is_match(&self.msg) { + return Ok(Some(InternalEvent::ServerStarted)); + } + Ok(None) + } +}