Files
MineGuard/src/instance.rs
2025-12-05 00:11:13 +01:00

343 lines
11 KiB
Rust

use std::{path::PathBuf, process::Stdio, sync::Arc};
use tokio::{
io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter},
process::{self, Child},
sync::{RwLock, broadcast, mpsc},
};
use tokio_stream::wrappers::BroadcastStream;
use tokio_util::sync::CancellationToken;
use crate::{
config::{MinecraftType, MinecraftVersion, StreamLine, StreamSource},
error::{HandleError, ServerError, SubscribeError},
};
use tokio_stream::StreamExt;
#[derive(Debug, Clone)]
pub struct InstanceData {
pub root_dir: PathBuf,
pub jar_path: PathBuf,
pub mc_version: MinecraftVersion,
pub mc_type: MinecraftType,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum InstanceStatus {
Starting,
Running,
Stopping,
Stopped,
Crashed,
Killing,
Killed,
}
#[derive(Debug)]
pub struct InstanceHandle {
pub data: InstanceData,
pub status: Arc<RwLock<InstanceStatus>>,
stdout_tx: broadcast::Sender<StreamLine>,
stderr_tx: Option<broadcast::Sender<StreamLine>>,
#[cfg(feature = "events")]
events_tx: broadcast::Sender<StreamLine>,
stdin_tx: mpsc::Sender<String>,
stdin_rx: Option<mpsc::Receiver<String>>,
child: Option<Arc<RwLock<Child>>>,
shutdown: CancellationToken,
}
impl InstanceHandle {
pub fn new_with_params(
root_dir: &str,
jar_path: &str,
mc_version: &str,
mc_type: MinecraftType,
) -> Result<Self, HandleError> {
let parsed_version: MinecraftVersion = mc_version
.parse()
.map_err(|_| HandleError::InvalidVersion(mc_version.to_string()))?;
let root: PathBuf = root_dir.into();
if !root.exists() || !root.is_dir() {
return Err(HandleError::InvalidDirectory(root_dir.to_string()));
}
let path: PathBuf = jar_path.into();
let conc = root.join(path.clone());
if !path.is_relative() || !conc.is_file() {
return Err(HandleError::InvalidPathJAR(jar_path.to_string()));
}
let data = InstanceData {
root_dir: root,
jar_path: path,
mc_version: parsed_version,
mc_type,
};
let status = InstanceStatus::Stopped;
let (stdin_tx, stdin_rx) = mpsc::channel(1024);
Ok(Self {
data,
status: Arc::new(RwLock::new(status)),
stdout_tx: broadcast::Sender::new(2048),
stderr_tx: None,
#[cfg(feature = "events")]
events_tx: broadcast::Sender::new(2048),
stdin_tx,
stdin_rx: Some(stdin_rx),
child: None,
shutdown: CancellationToken::new(),
})
}
pub async fn send_command<S: Into<String>>(&self, cmd: S) -> Result<(), ServerError> {
let mut command = cmd.into();
if !command.ends_with('\n') {
command.push('\n');
}
self.stdin_tx
.send(command)
.await
.map_err(|_| ServerError::StdinWriteFailed)?;
Ok(())
}
pub async fn start(&mut self) -> Result<(), ServerError> {
self.validate_start_parameters().await?;
self.transition_status(InstanceStatus::Starting).await;
let command = self.build_start_command();
let child = self.spawn_child_process(command)?;
self.setup_stream_pumps(child)?;
self.transition_status(InstanceStatus::Running).await;
self.setup_parser()?;
Ok(())
}
async fn validate_start_parameters(&self) -> Result<(), ServerError> {
if self.child.is_some() {
return Err(ServerError::AlreadyRunning);
}
Ok(())
}
async fn transition_status(&self, status: InstanceStatus) {
let mut guard = self.status.write().await;
*guard = status;
drop(guard);
}
fn build_start_command(&self) -> process::Command {
let mut command = process::Command::new("java");
command
.arg("-jar")
.arg(&self.data.jar_path)
.arg("nogui")
.current_dir(&self.data.root_dir)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.stdin(Stdio::piped());
command.process_group(0);
command
}
fn spawn_child_process(&self, mut command: process::Command) -> Result<Child, ServerError> {
command.spawn().map_err(|_| ServerError::CommandFailed)
}
fn setup_stream_pumps(&mut self, mut child: Child) -> Result<(), ServerError> {
let stdout = child.stdout.take().ok_or(ServerError::NoStdoutPipe)?;
let stderr = child.stderr.take().ok_or(ServerError::NoStderrPipe)?;
let stdin = child.stdin.take().ok_or(ServerError::NoStdinPipe)?;
let child = Arc::new(RwLock::new(child));
self.child = Some(child);
let stdout_tx = self.stdout_tx.clone();
let stderr_tx = broadcast::Sender::new(2048);
self.stderr_tx = Some(stderr_tx.clone());
let shutdown = self.shutdown.clone();
let stdout_status = self.status.clone();
let stderr_status = self.status.clone();
tokio::spawn(async move {
let mut stdout_reader = BufReader::new(stdout).lines();
loop {
match stdout_reader.next_line().await {
Ok(Some(line)) => {
let _ = stdout_tx.send(StreamLine::stdout(line));
}
_ => {
let status_guard = stdout_status.read().await;
if *status_guard != InstanceStatus::Killing
|| *status_guard != InstanceStatus::Stopping
{
drop(status_guard);
let mut status = stdout_status.write().await;
*status = InstanceStatus::Crashed;
drop(status);
break;
}
drop(status_guard);
}
}
}
});
tokio::spawn(async move {
let mut stderr_reader = BufReader::new(stderr).lines();
loop {
match stderr_reader.next_line().await {
Ok(Some(line)) => {
let _ = stderr_tx.send(StreamLine::stderr(line));
}
_ => {
let status_guard = stderr_status.read().await;
if *status_guard != InstanceStatus::Killing
|| *status_guard != InstanceStatus::Stopping
{
drop(status_guard);
let mut status = stderr_status.write().await;
*status = InstanceStatus::Crashed;
drop(status);
break;
}
drop(status_guard);
}
}
}
});
let mut stdin_rx = self.stdin_rx.take().ok_or(ServerError::NoStdinPipe)?;
tokio::spawn(async move {
let mut writer = BufWriter::new(stdin);
loop {
tokio::select! {
_ = shutdown.cancelled() => {
break;
}
maybe_cmd = stdin_rx.recv() => {
if let Some(cmd) = maybe_cmd {
_ = writer.write_all(cmd.as_bytes()).await;
_ = writer.flush().await;
}
}
}
}
});
Ok(())
}
#[cfg(all(feature = "events", any(feature = "mc-vanilla")))]
fn setup_parser(&mut self) -> Result<(), ServerError> {
let stdout_stream = self
.subscribe(StreamSource::Stdout)
.map_err(|_| ServerError::NoStdoutPipe)?;
let shutdown = self.shutdown.clone();
// TODO: Stream events!!!!
let _event_tx = self.events_tx.clone();
#[cfg(feature = "mc-vanilla")]
if self.data.mc_type == MinecraftType::Vanilla {
use crate::config::LogMeta;
tokio::spawn(async move {
let mut rx = stdout_stream;
loop {
tokio::select! {
_ = shutdown.cancelled() => {
break;
}
line = rx.next() => {
if let Some(Ok(val)) = line {
let msg = val.msg();
let meta = LogMeta::new(msg);
if let Ok(val) = meta
&& val.is_some() {
println!("{}", val.unwrap());
}
}
}
}
}
});
}
Ok(())
}
pub async fn kill(&mut self) -> Result<(), ServerError> {
if let Some(child_arc) = self.child.clone() {
self.transition_status(InstanceStatus::Killing).await;
let mut child = child_arc.write().await;
child.kill().await.map_err(|_| ServerError::CommandFailed)?;
self.shutdown.cancel();
self.child = None;
self.transition_status(InstanceStatus::Killed).await;
Ok(())
} else {
Err(ServerError::NotRunning)
}
}
pub async fn stop(&mut self) -> Result<(), ServerError> {
if let Some(child_arc) = self.child.clone() {
self.transition_status(InstanceStatus::Stopping).await;
_ = self.send_command("stop").await;
let mut child = child_arc.write().await;
child.wait().await.map_err(|_| ServerError::CommandFailed)?;
self.shutdown.cancel();
self.child = None;
self.transition_status(InstanceStatus::Stopped).await;
Ok(())
} else {
Err(ServerError::NotRunning)
}
}
pub fn subscribe(
&self,
stream: StreamSource,
) -> Result<BroadcastStream<StreamLine>, SubscribeError> {
match stream {
StreamSource::Stdout => {
let rx = self.stdout_tx.subscribe();
Ok(BroadcastStream::new(rx))
}
StreamSource::Stderr => {
let rx = match &self.stderr_tx {
Some(value) => value.subscribe(),
None => return Err(SubscribeError::NoStderr),
};
Ok(BroadcastStream::new(rx))
}
#[cfg(feature = "events")]
StreamSource::Event => {
let rx = self.events_tx.subscribe();
Ok(BroadcastStream::new(rx))
}
}
}
}