Added Start, Stop, Kill, Send command
This commit is contained in:
173
src/instance.rs
173
src/instance.rs
@@ -1,11 +1,16 @@
|
||||
use std::{path::PathBuf, sync::Arc};
|
||||
use std::{path::PathBuf, process::Stdio, sync::Arc};
|
||||
|
||||
use tokio::sync::broadcast;
|
||||
use tokio::{
|
||||
io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter},
|
||||
process::{self, Child},
|
||||
sync::{RwLock, broadcast, mpsc},
|
||||
};
|
||||
use tokio_stream::wrappers::BroadcastStream;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
use crate::{
|
||||
config::{MinecraftType, MinecraftVersion, StreamLine, StreamType},
|
||||
error::{HandleError, SubscribeError},
|
||||
config::{MinecraftType, MinecraftVersion, StreamLine, StreamSource},
|
||||
error::{HandleError, ServerError, SubscribeError},
|
||||
};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
@@ -23,14 +28,20 @@ pub enum InstanceStatus {
|
||||
Stopping,
|
||||
Stopped,
|
||||
Crashed,
|
||||
Killing,
|
||||
Killed,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct InstanceHandle {
|
||||
pub data: InstanceData,
|
||||
pub status: InstanceStatus,
|
||||
stdout_tx: Option<broadcast::Sender<StreamLine>>,
|
||||
pub status: Arc<RwLock<InstanceStatus>>,
|
||||
stdout_tx: broadcast::Sender<StreamLine>,
|
||||
stderr_tx: Option<broadcast::Sender<StreamLine>>,
|
||||
stdin_tx: mpsc::Sender<String>,
|
||||
stdin_rx: Option<mpsc::Receiver<String>>,
|
||||
child: Option<Arc<RwLock<Child>>>,
|
||||
shutdown: CancellationToken,
|
||||
}
|
||||
|
||||
impl InstanceHandle {
|
||||
@@ -63,27 +74,159 @@ impl InstanceHandle {
|
||||
};
|
||||
|
||||
let status = InstanceStatus::Stopped;
|
||||
|
||||
let (stdin_tx, stdin_rx) = mpsc::channel(1024);
|
||||
Ok(Self {
|
||||
data,
|
||||
status,
|
||||
stdout_tx: None,
|
||||
status: Arc::new(RwLock::new(status)),
|
||||
stdout_tx: broadcast::Sender::new(2048),
|
||||
stderr_tx: None,
|
||||
stdin_tx,
|
||||
stdin_rx: Some(stdin_rx),
|
||||
child: None,
|
||||
shutdown: CancellationToken::new(),
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn send_command<S: Into<String>>(&self, cmd: S) -> Result<(), ServerError> {
|
||||
let mut command = cmd.into();
|
||||
if !command.ends_with('\n') {
|
||||
command.push('\n');
|
||||
}
|
||||
|
||||
self.stdin_tx
|
||||
.send(command)
|
||||
.await
|
||||
.map_err(|_| ServerError::StdinWriteFailed)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn start(&mut self) -> Result<(), ServerError> {
|
||||
if self.child.is_some() {
|
||||
return Err(ServerError::AlreadyRunning);
|
||||
}
|
||||
|
||||
let mut status = self.status.write().await;
|
||||
*status = InstanceStatus::Starting;
|
||||
|
||||
let jar_path: PathBuf = self.data.jar_path.clone();
|
||||
let root_dir: PathBuf = self.data.root_dir.clone();
|
||||
|
||||
let mut command = process::Command::new("java");
|
||||
command
|
||||
.arg("-jar")
|
||||
.arg(&jar_path)
|
||||
.arg("nogui")
|
||||
.current_dir(&root_dir)
|
||||
.stdout(Stdio::piped())
|
||||
.stdin(Stdio::piped());
|
||||
|
||||
command.process_group(0);
|
||||
|
||||
let mut child = command.spawn().map_err(|_| ServerError::CommandFailed)?;
|
||||
|
||||
let stdout = child.stdout.take().ok_or(ServerError::NoStdoutPipe)?;
|
||||
let stdin = child.stdin.take().ok_or(ServerError::NoStdinPipe)?;
|
||||
|
||||
let child = Arc::new(RwLock::new(child));
|
||||
self.child = Some(child);
|
||||
|
||||
let stdout_tx = self.stdout_tx.clone();
|
||||
let shutdown = self.shutdown.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
let mut stdout_reader = BufReader::new(stdout).lines();
|
||||
loop {
|
||||
match stdout_reader.next_line().await {
|
||||
Ok(Some(line)) => {
|
||||
let _ = stdout_tx.send(StreamLine::stdout(line));
|
||||
}
|
||||
Ok(None) => {
|
||||
break;
|
||||
}
|
||||
_ => break,
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let mut stdin_rx = self.stdin_rx.take().ok_or(ServerError::NoStdinPipe)?;
|
||||
|
||||
tokio::spawn(async move {
|
||||
let mut writer = BufWriter::new(stdin);
|
||||
|
||||
loop {
|
||||
tokio::select! {
|
||||
_ = shutdown.cancelled() => {
|
||||
break;
|
||||
}
|
||||
maybe_cmd = stdin_rx.recv() => {
|
||||
match maybe_cmd {
|
||||
Some(cmd) => {
|
||||
_ = writer.write_all(cmd.as_bytes()).await;
|
||||
_ = writer.flush().await;
|
||||
},
|
||||
_ => (),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let mut status = self.status.write().await;
|
||||
*status = InstanceStatus::Running;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn kill(&mut self) -> Result<(), ServerError> {
|
||||
if let Some(child_arc) = self.child.clone() {
|
||||
let mut status = self.status.write().await;
|
||||
*status = InstanceStatus::Killing;
|
||||
let mut child = child_arc.write().await;
|
||||
|
||||
child.kill().await.map_err(|_| ServerError::CommandFailed)?;
|
||||
|
||||
self.shutdown.cancel();
|
||||
self.child = None;
|
||||
|
||||
let mut status = self.status.write().await;
|
||||
*status = InstanceStatus::Killed;
|
||||
Ok(())
|
||||
} else {
|
||||
Err(ServerError::NotRunning)
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn stop(&mut self) -> Result<(), ServerError> {
|
||||
if let Some(child_arc) = self.child.clone() {
|
||||
let mut status = self.status.write().await;
|
||||
*status = InstanceStatus::Stopping;
|
||||
|
||||
_ = self.send_command("stop").await;
|
||||
|
||||
let mut child = child_arc.write().await;
|
||||
child.wait().await.map_err(|_| ServerError::CommandFailed)?;
|
||||
self.shutdown.cancel();
|
||||
self.child = None;
|
||||
let mut status = self.status.write().await;
|
||||
*status = InstanceStatus::Stopped;
|
||||
Ok(())
|
||||
} else {
|
||||
Err(ServerError::NotRunning)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn subscribe(
|
||||
&self,
|
||||
stream: StreamType,
|
||||
stream: StreamSource,
|
||||
) -> Result<BroadcastStream<StreamLine>, SubscribeError> {
|
||||
match stream {
|
||||
StreamType::Stdout => {
|
||||
let rx = match &self.stdout_tx {
|
||||
Some(value) => value.subscribe(),
|
||||
None => return Err(SubscribeError::NoStdout),
|
||||
};
|
||||
StreamSource::Stdout => {
|
||||
let rx = self.stdout_tx.subscribe();
|
||||
Ok(BroadcastStream::new(rx))
|
||||
}
|
||||
StreamType::Stderr => {
|
||||
StreamSource::Stderr => {
|
||||
let rx = match &self.stderr_tx {
|
||||
Some(value) => value.subscribe(),
|
||||
None => return Err(SubscribeError::NoStdout),
|
||||
|
||||
Reference in New Issue
Block a user