Switched from StreamLine to encapsulated InstanceEvent for all public streams

This commit is contained in:
2025-12-05 18:29:16 +01:00
parent 2169e95423
commit 9c4c23f881
6 changed files with 485 additions and 20 deletions

View File

@@ -1,7 +1,15 @@
use std::fmt::{self, Display};
#[cfg(feature = "events")]
use chrono::{DateTime, Utc};
use chrono::{Local, NaiveTime, TimeZone};
use regex::Regex;
#[cfg(feature = "events")]
use uuid::Uuid;
#[cfg(feature = "mc-vanilla")]
use crate::error::ParserError;
use crate::instance::InstanceStatus;
/// Identifies which process stream produced a line of output.
#[derive(Debug, Clone, PartialEq, Eq)]
@@ -19,12 +27,6 @@ pub struct StreamLine {
source: StreamSource,
}
#[cfg(feature = "events")]
pub struct InstanceEvent {}
#[cfg(feature = "events")]
pub enum Events {}
#[cfg(feature = "mc-vanilla")]
pub struct LogMeta {
time: String,
@@ -41,6 +43,28 @@ pub enum LogLevel {
Other,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum EventPayload {
#[cfg(feature = "events")]
StateChange {
old: InstanceStatus,
new: InstanceStatus,
},
StdLine {
line: StreamLine,
},
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct InstanceEvent {
pub id: Uuid,
pub timestamp: DateTime<Utc>,
pub payload: EventPayload,
}
#[cfg(feature = "mc-vanilla")]
impl LogMeta {
pub fn new<S: Into<String>>(line: S) -> Result<Option<Self>, ParserError> {
@@ -145,6 +169,52 @@ impl StreamLine {
pub fn msg(&self) -> String {
self.line.clone()
}
pub fn extract_timestamp(&self) -> Option<DateTime<Utc>> {
let input = self.line.as_str();
let re = Regex::new(r"\[(.*?)\]").unwrap();
let time_s = re.captures(input).map(|v| v[1].to_string());
if time_s.is_none() {
return None;
}
let time = NaiveTime::parse_from_str(&time_s.unwrap(), "%H:%M:%S").ok()?;
let today = Local::now().date_naive();
let naive_dt = today.and_time(time);
let local_dt = Local.from_local_datetime(&naive_dt).unwrap();
let utc_dt = local_dt.with_timezone(&Utc);
Some(utc_dt)
}
}
impl InstanceEvent {
pub fn stdout<S: Into<String>>(line: S) -> Self {
let line = line.into();
let s_line = StreamLine::stdout(line);
let timestamp = s_line.extract_timestamp().unwrap_or(Utc::now());
let payload = EventPayload::StdLine { line: s_line };
Self {
id: Uuid::new_v4(),
timestamp,
payload,
}
}
pub fn stderr<S: Into<String>>(line: S) -> Self {
let line = line.into();
let s_line = StreamLine::stderr(line);
let timestamp = s_line.extract_timestamp().unwrap_or(Utc::now());
let payload = EventPayload::StdLine { line: s_line };
Self {
id: Uuid::new_v4(),
timestamp,
payload,
}
}
}
impl Display for StreamLine {
@@ -152,3 +222,25 @@ impl Display for StreamLine {
write!(f, "{}", self.line)
}
}
impl Display for InstanceEvent {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let head = format!(
"UUID: {}\nTimestamp:{}\nPayload:\n",
self.id.to_string(),
self.timestamp.to_string()
);
match self.payload.clone() {
EventPayload::StdLine { line } => {
let full = format!("{}{}", head, line);
write!(f, "{}", full)
}
#[cfg(feature = "events")]
EventPayload::StateChange { old, new } => {
let full = format!("{}State changed: {:?} -> {:?}", head, old, new);
write!(f, "{}", full)
}
}
}
}

View File

@@ -1,5 +1,6 @@
use std::{path::PathBuf, process::Stdio, sync::Arc};
use chrono::Utc;
use tokio::{
io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter},
process::{self, Child},
@@ -7,9 +8,12 @@ use tokio::{
};
use tokio_stream::wrappers::BroadcastStream;
use tokio_util::sync::CancellationToken;
use uuid::Uuid;
#[cfg(feature = "events")]
use crate::config::stream::InstanceEvent;
use crate::{
config::{MinecraftType, MinecraftVersion, StreamLine, StreamSource},
config::{MinecraftType, MinecraftVersion, StreamLine, StreamSource, stream::EventPayload},
error::{HandleError, ServerError, SubscribeError},
};
@@ -38,10 +42,14 @@ pub enum InstanceStatus {
pub struct InstanceHandle {
pub data: InstanceData,
pub status: Arc<RwLock<InstanceStatus>>,
stdout_tx: broadcast::Sender<StreamLine>,
stderr_tx: Option<broadcast::Sender<StreamLine>>,
stdout_tx: broadcast::Sender<InstanceEvent>,
stderr_tx: Option<broadcast::Sender<InstanceEvent>>,
#[cfg(feature = "events")]
events_tx: broadcast::Sender<StreamLine>,
events_tx: broadcast::Sender<InstanceEvent>,
#[cfg(feature = "events")]
internal_tx: mpsc::Sender<InstanceEvent>,
#[cfg(feature = "events")]
internal_rx: Option<mpsc::Receiver<InstanceEvent>>,
stdin_tx: mpsc::Sender<String>,
stdin_rx: Option<mpsc::Receiver<String>>,
child: Option<Arc<RwLock<Child>>>,
@@ -80,6 +88,7 @@ impl InstanceHandle {
let status = InstanceStatus::Stopped;
let (stdin_tx, stdin_rx) = mpsc::channel(1024);
let (internal_tx, internal_rx) = mpsc::channel(1024);
Ok(Self {
data,
status: Arc::new(RwLock::new(status)),
@@ -87,6 +96,10 @@ impl InstanceHandle {
stderr_tx: None,
#[cfg(feature = "events")]
events_tx: broadcast::Sender::new(2048),
#[cfg(feature = "events")]
internal_tx,
#[cfg(feature = "events")]
internal_rx: Some(internal_rx),
stdin_tx,
stdin_rx: Some(stdin_rx),
child: None,
@@ -134,9 +147,25 @@ impl InstanceHandle {
}
async fn transition_status(&self, status: InstanceStatus) {
let r_guard = self.status.read().await;
let old = r_guard.clone();
drop(r_guard);
let new = status.clone();
let mut guard = self.status.write().await;
*guard = status;
drop(guard);
let event = InstanceEvent {
id: Uuid::new_v4(),
timestamp: Utc::now(),
payload: EventPayload::StateChange { old, new },
};
self.internal_tx.send(event);
}
fn build_start_command(&self) -> process::Command {
@@ -179,7 +208,7 @@ impl InstanceHandle {
loop {
match stdout_reader.next_line().await {
Ok(Some(line)) => {
let _ = stdout_tx.send(StreamLine::stdout(line));
let _ = stdout_tx.send(InstanceEvent::stdout(line));
}
_ => {
let status_guard = stdout_status.read().await;
@@ -203,7 +232,7 @@ impl InstanceHandle {
loop {
match stderr_reader.next_line().await {
Ok(Some(line)) => {
let _ = stderr_tx.send(StreamLine::stderr(line));
let _ = stderr_tx.send(InstanceEvent::stderr(line));
}
_ => {
let status_guard = stderr_status.read().await;
@@ -267,12 +296,8 @@ impl InstanceHandle {
}
line = rx.next() => {
if let Some(Ok(val)) = line {
let msg = val.msg();
let meta = LogMeta::new(msg);
if let Ok(val) = meta
&& val.is_some() {
println!("{}", val.unwrap());
}
println!("{}", val);
}
}
}
@@ -319,7 +344,7 @@ impl InstanceHandle {
pub fn subscribe(
&self,
stream: StreamSource,
) -> Result<BroadcastStream<StreamLine>, SubscribeError> {
) -> Result<BroadcastStream<InstanceEvent>, SubscribeError> {
match stream {
StreamSource::Stdout => {
let rx = self.stdout_tx.subscribe();

View File

@@ -1,3 +1,4 @@
pub mod config;
pub mod error;
pub mod instance;
pub mod utils;

26
src/utils.rs Normal file
View File

@@ -0,0 +1,26 @@
use chrono::{DateTime, Datelike, Local, NaiveTime, TimeZone, Timelike, Utc};
use regex::Regex;
pub fn extract_timestamp(input: &str) -> Option<DateTime<Utc>> {
let re = Regex::new(r"\[(.*?)\]").unwrap();
let time_s = re.captures(input).map(|v| v[1].to_string());
if time_s.is_none() {
return None;
}
let time = NaiveTime::parse_from_str(&time_s.unwrap(), "%H:%M:%S").ok()?;
let today = Local::now().date_naive();
let local_dt = Local
.with_ymd_and_hms(
today.year(),
today.month(),
today.day(),
time.hour(),
time.minute(),
time.second(),
)
.single()?;
Some(local_dt.with_timezone(&Utc))
}