Full startup await with Done ()! parser

This commit is contained in:
2025-12-08 10:29:37 +01:00
parent 7e7bfbc576
commit 7782e71990
7 changed files with 114 additions and 28 deletions

View File

@@ -1,6 +1,6 @@
use std::fmt::{self, Display}; use std::fmt::{self, Display};
use uuid::Uuid; use uuid::{Uuid, timestamp};
use crate::instance::InstanceStatus; use crate::instance::InstanceStatus;
@@ -28,6 +28,11 @@ pub struct InstanceEvent {
pub payload: EventPayload, pub payload: EventPayload,
} }
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum InternalEvent {
ServerStarted,
}
impl InstanceEvent { impl InstanceEvent {
pub fn stdout<S: Into<String>>(line: S) -> Self { pub fn stdout<S: Into<String>>(line: S) -> Self {
let line = line.into(); let line = line.into();
@@ -54,6 +59,16 @@ impl InstanceEvent {
payload, payload,
} }
} }
pub fn new(payload: EventPayload) -> Self {
let timestamp = chrono::Utc::now();
Self {
id: Uuid::new_v4(),
timestamp,
payload,
}
}
} }
impl Display for InstanceEvent { impl Display for InstanceEvent {

View File

@@ -15,8 +15,8 @@ pub enum StreamSource {
#[derive(Debug, Clone, PartialEq, Eq)] #[derive(Debug, Clone, PartialEq, Eq)]
pub struct StreamLine { pub struct StreamLine {
line: String, pub line: String,
source: StreamSource, pub source: StreamSource,
} }
impl StreamLine { impl StreamLine {
@@ -29,8 +29,6 @@ impl StreamLine {
pub fn stdout<S: Into<String>>(line: S) -> Self { pub fn stdout<S: Into<String>>(line: S) -> Self {
let line = line.into(); let line = line.into();
let re = Regex::new(r#"^\[[^\]]*\]\s*\[[^\]]*\]:\s*"#).unwrap();
let line = re.replace(&line, "").to_string();
Self { Self {
line, line,
source: StreamSource::Stdout, source: StreamSource::Stdout,
@@ -39,8 +37,6 @@ impl StreamLine {
pub fn stderr<S: Into<String>>(line: S) -> Self { pub fn stderr<S: Into<String>>(line: S) -> Self {
let line = line.into(); let line = line.into();
let re = Regex::new(r#"^\[[^\]]*\]\s*\[[^\]]*\]:\s*"#).unwrap();
let line = re.replace(&line, "").to_string();
Self { Self {
line, line,
source: StreamSource::Stderr, source: StreamSource::Stderr,

View File

@@ -11,6 +11,7 @@ pub struct LogMeta {
} }
#[cfg(feature = "mc-vanilla")] #[cfg(feature = "mc-vanilla")]
#[derive(Debug, Clone, Eq, PartialEq)]
pub enum LogLevel { pub enum LogLevel {
Info, Info,
Warn, Warn,

View File

@@ -3,6 +3,7 @@ mod line;
#[cfg(feature = "mc-vanilla")] #[cfg(feature = "mc-vanilla")]
mod log; mod log;
pub use event::InternalEvent;
pub use event::{EventPayload, InstanceEvent}; pub use event::{EventPayload, InstanceEvent};
pub use line::{StreamLine, StreamSource}; pub use line::{StreamLine, StreamSource};
#[cfg(feature = "mc-vanilla")] #[cfg(feature = "mc-vanilla")]

View File

@@ -15,7 +15,10 @@ use uuid::Uuid;
#[cfg(feature = "events")] #[cfg(feature = "events")]
use crate::config::stream::InstanceEvent; use crate::config::stream::InstanceEvent;
use crate::{ use crate::{
config::{MinecraftType, MinecraftVersion, StreamSource, stream::EventPayload}, config::{
MinecraftType, MinecraftVersion, StreamSource,
stream::{EventPayload, InternalEvent},
},
error::{HandleError, ServerError, SubscribeError}, error::{HandleError, ServerError, SubscribeError},
}; };
@@ -30,13 +33,14 @@ pub struct InstanceHandle {
#[cfg(feature = "events")] #[cfg(feature = "events")]
events_tx: broadcast::Sender<InstanceEvent>, events_tx: broadcast::Sender<InstanceEvent>,
#[cfg(feature = "events")] #[cfg(feature = "events")]
internal_tx: mpsc::Sender<InstanceEvent>, internal_events_tx: mpsc::Sender<InstanceEvent>,
#[cfg(feature = "events")] #[cfg(feature = "events")]
internal_rx: Option<mpsc::Receiver<InstanceEvent>>, internal_events_rx: Option<mpsc::Receiver<InstanceEvent>>,
stdin_tx: mpsc::Sender<String>, stdin_tx: mpsc::Sender<String>,
stdin_rx: Option<mpsc::Receiver<String>>, stdin_rx: Option<mpsc::Receiver<String>>,
child: Option<Arc<RwLock<Child>>>, child: Option<Arc<RwLock<Child>>>,
shutdown: CancellationToken, shutdown: CancellationToken,
internal_bus_tx: broadcast::Sender<InternalEvent>,
} }
impl InstanceHandle { impl InstanceHandle {
@@ -82,13 +86,14 @@ impl InstanceHandle {
#[cfg(feature = "events")] #[cfg(feature = "events")]
events_tx: broadcast::Sender::new(2048), events_tx: broadcast::Sender::new(2048),
#[cfg(feature = "events")] #[cfg(feature = "events")]
internal_tx, internal_events_tx: internal_tx,
#[cfg(feature = "events")] #[cfg(feature = "events")]
internal_rx: Some(internal_rx), internal_events_rx: Some(internal_rx),
stdin_tx, stdin_tx,
stdin_rx: Some(stdin_rx), stdin_rx: Some(stdin_rx),
child: None, child: None,
shutdown: CancellationToken::new(), shutdown: CancellationToken::new(),
internal_bus_tx: broadcast::Sender::new(2048),
}) })
} }
@@ -108,6 +113,7 @@ impl InstanceHandle {
pub async fn start(&mut self) -> Result<(), ServerError> { pub async fn start(&mut self) -> Result<(), ServerError> {
self.validate_start_parameters().await?; self.validate_start_parameters().await?;
self.setup_loopback()?;
self.transition_status(InstanceStatus::Starting).await; self.transition_status(InstanceStatus::Starting).await;
@@ -116,10 +122,23 @@ impl InstanceHandle {
self.setup_stream_pumps(child)?; self.setup_stream_pumps(child)?;
self.transition_status(InstanceStatus::Running).await;
self.setup_parser()?; self.setup_parser()?;
let mut rx = self.internal_bus_tx.subscribe();
loop {
match rx.recv().await {
Ok(event) => {
if event == InternalEvent::ServerStarted {
self.transition_status(InstanceStatus::Running).await;
break;
}
continue;
}
_ => continue,
}
}
Ok(()) Ok(())
} }
@@ -150,7 +169,7 @@ impl InstanceHandle {
payload: EventPayload::StateChange { old, new }, payload: EventPayload::StateChange { old, new },
}; };
_ = self.internal_tx.send(event).await; _ = self.internal_events_tx.send(event).await;
} }
fn build_start_command(&self) -> process::Command { fn build_start_command(&self) -> process::Command {
@@ -187,8 +206,8 @@ impl InstanceHandle {
let stdout_status = self.status.clone(); let stdout_status = self.status.clone();
let stderr_status = self.status.clone(); let stderr_status = self.status.clone();
let internal_tx1 = self.internal_tx.clone(); let internal_tx1 = self.internal_events_tx.clone();
let internal_tx2 = self.internal_tx.clone(); let internal_tx2 = self.internal_events_tx.clone();
tokio::spawn(async move { tokio::spawn(async move {
let mut stdout_reader = BufReader::new(stdout).lines(); let mut stdout_reader = BufReader::new(stdout).lines();
@@ -286,17 +305,14 @@ impl InstanceHandle {
} }
#[cfg(all(feature = "events", any(feature = "mc-vanilla")))] #[cfg(all(feature = "events", any(feature = "mc-vanilla")))]
fn setup_parser(&mut self) -> Result<(), ServerError> { fn setup_loopback(&mut self) -> Result<(), ServerError> {
let stdout_stream = self
.subscribe(StreamSource::Stdout)
.map_err(|_| ServerError::NoStdoutPipe)?;
let shutdown1 = self.shutdown.clone(); let shutdown1 = self.shutdown.clone();
let shutdown2 = self.shutdown.clone();
let event_tx = self.events_tx.clone();
if let Some(mut internal_rx) = self.internal_rx.take() { let event_tx1 = self.events_tx.clone();
//internal mpsc to broadcast loopback
if let Some(mut internal_rx) = self.internal_events_rx.take() {
tokio::spawn(async move { tokio::spawn(async move {
let tx = event_tx; let tx = event_tx1;
loop { loop {
tokio::select! { tokio::select! {
_ = shutdown1.cancelled() => { _ = shutdown1.cancelled() => {
@@ -312,20 +328,50 @@ impl InstanceHandle {
} }
}); });
} }
Ok(())
}
#[cfg(all(feature = "events", any(feature = "mc-vanilla")))]
fn setup_parser(&mut self) -> Result<(), ServerError> {
use crate::config::LogMeta;
let stdout_stream = self
.subscribe(StreamSource::Stdout)
.map_err(|_| ServerError::NoStdoutPipe)?;
let shutdown2 = self.shutdown.clone();
let bus_tx = self.internal_bus_tx.clone();
#[cfg(feature = "mc-vanilla")] #[cfg(feature = "mc-vanilla")]
if self.data.mc_type == MinecraftType::Vanilla { if self.data.mc_type == MinecraftType::Vanilla {
tokio::spawn(async move { tokio::spawn(async move {
let mut rx = stdout_stream; let mut rx = stdout_stream;
let tx = bus_tx;
loop { loop {
tokio::select! { tokio::select! {
_ = shutdown2.cancelled() => { _ = shutdown2.cancelled() => {
break; break;
} }
line = rx.next() => { next_line = rx.next() => {
if let Some(Ok(val)) = line { if let Some(Ok(val)) = next_line {
// TODO: Call parser let event_line = match val.payload {
EventPayload::StdLine{line} => {
line
},
_ => continue,
};
let meta = match LogMeta::new(event_line.line) {
Ok(Some(log_meta)) => {
log_meta
},
_ => continue,
};
match meta.parse_event() {
Ok(Some(event)) => _ = tx.send(event),
_ => continue,
}
} }
} }
} }

View File

@@ -2,5 +2,6 @@ pub mod config;
pub mod error; pub mod error;
pub mod instance; pub mod instance;
pub mod manifests; pub mod manifests;
pub mod parser;
pub mod server; pub mod server;
pub mod utils; pub mod utils;

26
src/parser/mod.rs Normal file
View File

@@ -0,0 +1,26 @@
use regex::Regex;
use crate::{
config::{
LogMeta,
stream::{EventPayload, InstanceEvent, InternalEvent, LogLevel},
},
error::ParserError,
};
impl LogMeta {
pub fn parse_event(&self) -> Result<Option<InternalEvent>, ParserError> {
if self.thread == "Server thread" && self.level == LogLevel::Info {
return self.parse_server_thread_info_lv2();
}
Ok(None)
}
fn parse_server_thread_info_lv2(&self) -> Result<Option<InternalEvent>, ParserError> {
let re = Regex::new(r"Done \([0-9.]+s\)!").unwrap();
if re.is_match(&self.msg) {
return Ok(Some(InternalEvent::ServerStarted));
}
Ok(None)
}
}