diff --git a/Cargo.lock b/Cargo.lock index 7e84197..00a32d3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -14,12 +14,55 @@ version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" +[[package]] +name = "futures-io" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" + +[[package]] +name = "futures-macro" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "futures-sink" version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" +[[package]] +name = "futures-task" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" + +[[package]] +name = "futures-util" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" +dependencies = [ + "futures-core", + "futures-macro", + "futures-task", + "pin-project-lite", + "pin-utils", + "slab", +] + +[[package]] +name = "hashbrown" +version = "0.15.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1" + [[package]] name = "libc" version = "0.2.178" @@ -33,6 +76,7 @@ dependencies = [ "thiserror", "tokio", "tokio-stream", + "tokio-util", ] [[package]] @@ -52,6 +96,12 @@ version = "0.2.16" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3b3cff922bd51709b605d9ead9aa71031d81447142d828eb4a6eba76fe619f9b" +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + [[package]] name = "proc-macro2" version = "1.0.103" @@ -79,6 +129,12 @@ dependencies = [ "libc", ] +[[package]] +name = "slab" +version = "0.4.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a2ae44ef20feb57a68b23d846850f861394c2e02dc425a50098ae8c90267589" + [[package]] name = "socket2" version = "0.6.1" @@ -167,8 +223,12 @@ checksum = "2efa149fe76073d6e8fd97ef4f4eca7b67f599660115591483572e406e165594" dependencies = [ "bytes", "futures-core", + "futures-io", "futures-sink", + "futures-util", + "hashbrown", "pin-project-lite", + "slab", "tokio", ] diff --git a/Cargo.toml b/Cargo.toml index 04eda54..81602be 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,5 +12,6 @@ publish = false [dependencies] thiserror = "2.0.17" -tokio = { version = "1.48.0", features = ["process", "rt-multi-thread", "macros"] } +tokio = { version = "1.48.0", features = ["process", "rt-multi-thread", "macros", "io-std", "io-util"] } tokio-stream = { version = "0.1.17", features = ["full", "io-util", "signal", "tokio-util"] } +tokio-util = { version = "0.7.17", features = ["full"] } diff --git a/src/config.rs b/src/config.rs index 3133636..28823b6 100644 --- a/src/config.rs +++ b/src/config.rs @@ -1,5 +1,5 @@ use std::{ - fmt::{self, Display}, + fmt::{self, Display, write}, str::FromStr, }; @@ -31,15 +31,15 @@ pub enum MinecraftVersion { } #[derive(Debug, Clone, PartialEq, Eq)] -pub enum StreamType { +pub enum StreamSource { Stdout, Stderr, } #[derive(Debug, Clone, PartialEq, Eq)] pub struct StreamLine { - source: StreamType, - val: String, + line: String, + source: StreamSource, } impl Display for Version { @@ -136,3 +136,25 @@ impl FromStr for MinecraftVersion { Err(VersionError::UnknownVersionFormat(s.to_string())) } } + +impl StreamLine { + pub fn new>(line: S, source: StreamSource) -> Self { + Self { + line: line.into(), + source, + } + } + + pub fn stdout>(line: S) -> Self { + Self { + line: line.into(), + source: StreamSource::Stdout, + } + } +} + +impl Display for StreamLine { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", self.line) + } +} diff --git a/src/error.rs b/src/error.rs index 42a3ee1..2fbee55 100644 --- a/src/error.rs +++ b/src/error.rs @@ -63,4 +63,28 @@ pub enum SubscribeError { NoStdout, } +#[derive(Debug, Clone, Error)] +pub enum ServerError { + #[error("Server is already running")] + AlreadyRunning, + + #[error("Server is not running")] + NotRunning, + + #[error("Server crashed early")] + EarlyCrash, + + #[error("Failed to run java command")] + CommandFailed, + + #[error("Failed to access child stdout pipe")] + NoStdoutPipe, + + #[error("Failed to access child stdin pipe")] + NoStdinPipe, + + #[error("Failed to write to stdin")] + StdinWriteFailed, +} + type Result = std::result::Result; diff --git a/src/instance.rs b/src/instance.rs index d6416be..360858a 100644 --- a/src/instance.rs +++ b/src/instance.rs @@ -1,11 +1,16 @@ -use std::{path::PathBuf, sync::Arc}; +use std::{path::PathBuf, process::Stdio, sync::Arc}; -use tokio::sync::broadcast; +use tokio::{ + io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter}, + process::{self, Child}, + sync::{RwLock, broadcast, mpsc}, +}; use tokio_stream::wrappers::BroadcastStream; +use tokio_util::sync::CancellationToken; use crate::{ - config::{MinecraftType, MinecraftVersion, StreamLine, StreamType}, - error::{HandleError, SubscribeError}, + config::{MinecraftType, MinecraftVersion, StreamLine, StreamSource}, + error::{HandleError, ServerError, SubscribeError}, }; #[derive(Debug, Clone)] @@ -23,14 +28,20 @@ pub enum InstanceStatus { Stopping, Stopped, Crashed, + Killing, + Killed, } #[derive(Debug)] pub struct InstanceHandle { pub data: InstanceData, - pub status: InstanceStatus, - stdout_tx: Option>, + pub status: Arc>, + stdout_tx: broadcast::Sender, stderr_tx: Option>, + stdin_tx: mpsc::Sender, + stdin_rx: Option>, + child: Option>>, + shutdown: CancellationToken, } impl InstanceHandle { @@ -63,27 +74,159 @@ impl InstanceHandle { }; let status = InstanceStatus::Stopped; + + let (stdin_tx, stdin_rx) = mpsc::channel(1024); Ok(Self { data, - status, - stdout_tx: None, + status: Arc::new(RwLock::new(status)), + stdout_tx: broadcast::Sender::new(2048), stderr_tx: None, + stdin_tx, + stdin_rx: Some(stdin_rx), + child: None, + shutdown: CancellationToken::new(), }) } + pub async fn send_command>(&self, cmd: S) -> Result<(), ServerError> { + let mut command = cmd.into(); + if !command.ends_with('\n') { + command.push('\n'); + } + + self.stdin_tx + .send(command) + .await + .map_err(|_| ServerError::StdinWriteFailed)?; + + Ok(()) + } + + pub async fn start(&mut self) -> Result<(), ServerError> { + if self.child.is_some() { + return Err(ServerError::AlreadyRunning); + } + + let mut status = self.status.write().await; + *status = InstanceStatus::Starting; + + let jar_path: PathBuf = self.data.jar_path.clone(); + let root_dir: PathBuf = self.data.root_dir.clone(); + + let mut command = process::Command::new("java"); + command + .arg("-jar") + .arg(&jar_path) + .arg("nogui") + .current_dir(&root_dir) + .stdout(Stdio::piped()) + .stdin(Stdio::piped()); + + command.process_group(0); + + let mut child = command.spawn().map_err(|_| ServerError::CommandFailed)?; + + let stdout = child.stdout.take().ok_or(ServerError::NoStdoutPipe)?; + let stdin = child.stdin.take().ok_or(ServerError::NoStdinPipe)?; + + let child = Arc::new(RwLock::new(child)); + self.child = Some(child); + + let stdout_tx = self.stdout_tx.clone(); + let shutdown = self.shutdown.clone(); + + tokio::spawn(async move { + let mut stdout_reader = BufReader::new(stdout).lines(); + loop { + match stdout_reader.next_line().await { + Ok(Some(line)) => { + let _ = stdout_tx.send(StreamLine::stdout(line)); + } + Ok(None) => { + break; + } + _ => break, + } + } + }); + + let mut stdin_rx = self.stdin_rx.take().ok_or(ServerError::NoStdinPipe)?; + + tokio::spawn(async move { + let mut writer = BufWriter::new(stdin); + + loop { + tokio::select! { + _ = shutdown.cancelled() => { + break; + } + maybe_cmd = stdin_rx.recv() => { + match maybe_cmd { + Some(cmd) => { + _ = writer.write_all(cmd.as_bytes()).await; + _ = writer.flush().await; + }, + _ => (), + } + } + } + } + }); + + let mut status = self.status.write().await; + *status = InstanceStatus::Running; + + Ok(()) + } + + pub async fn kill(&mut self) -> Result<(), ServerError> { + if let Some(child_arc) = self.child.clone() { + let mut status = self.status.write().await; + *status = InstanceStatus::Killing; + let mut child = child_arc.write().await; + + child.kill().await.map_err(|_| ServerError::CommandFailed)?; + + self.shutdown.cancel(); + self.child = None; + + let mut status = self.status.write().await; + *status = InstanceStatus::Killed; + Ok(()) + } else { + Err(ServerError::NotRunning) + } + } + + pub async fn stop(&mut self) -> Result<(), ServerError> { + if let Some(child_arc) = self.child.clone() { + let mut status = self.status.write().await; + *status = InstanceStatus::Stopping; + + _ = self.send_command("stop").await; + + let mut child = child_arc.write().await; + child.wait().await.map_err(|_| ServerError::CommandFailed)?; + self.shutdown.cancel(); + self.child = None; + let mut status = self.status.write().await; + *status = InstanceStatus::Stopped; + Ok(()) + } else { + Err(ServerError::NotRunning) + } + } + pub fn subscribe( &self, - stream: StreamType, + stream: StreamSource, ) -> Result, SubscribeError> { match stream { - StreamType::Stdout => { - let rx = match &self.stdout_tx { - Some(value) => value.subscribe(), - None => return Err(SubscribeError::NoStdout), - }; + StreamSource::Stdout => { + let rx = self.stdout_tx.subscribe(); Ok(BroadcastStream::new(rx)) } - StreamType::Stderr => { + StreamSource::Stderr => { let rx = match &self.stderr_tx { Some(value) => value.subscribe(), None => return Err(SubscribeError::NoStdout),