Refactor start logic and add stderr streaming
This commit is contained in:
@@ -1,5 +1,5 @@
|
||||
use std::{
|
||||
fmt::{self, Display, write},
|
||||
fmt::{self, Display},
|
||||
str::FromStr,
|
||||
};
|
||||
|
||||
@@ -151,6 +151,13 @@ impl StreamLine {
|
||||
source: StreamSource::Stdout,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn stderr<S: Into<String>>(line: S) -> Self {
|
||||
Self {
|
||||
line: line.into(),
|
||||
source: StreamSource::Stderr,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Display for StreamLine {
|
||||
|
||||
@@ -61,6 +61,9 @@ pub enum HandleError {
|
||||
pub enum SubscribeError {
|
||||
#[error("No stdout found")]
|
||||
NoStdout,
|
||||
|
||||
#[error("No stderr found")]
|
||||
NoStderr,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Error)]
|
||||
@@ -83,6 +86,9 @@ pub enum ServerError {
|
||||
#[error("Failed to access child stdin pipe")]
|
||||
NoStdinPipe,
|
||||
|
||||
#[error("Failed to access child stderr pipe")]
|
||||
NoStderrPipe,
|
||||
|
||||
#[error("Failed to write to stdin")]
|
||||
StdinWriteFailed,
|
||||
}
|
||||
|
||||
@@ -103,36 +103,63 @@ impl InstanceHandle {
|
||||
}
|
||||
|
||||
pub async fn start(&mut self) -> Result<(), ServerError> {
|
||||
self.validate_start_parameters().await?;
|
||||
|
||||
self.transition_status(InstanceStatus::Starting).await;
|
||||
|
||||
let command = self.build_start_command();
|
||||
let child = self.spawn_child_process(command)?;
|
||||
|
||||
self.setup_stream_pumps(child)?;
|
||||
|
||||
self.transition_status(InstanceStatus::Running).await;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn validate_start_parameters(&self) -> Result<(), ServerError> {
|
||||
if self.child.is_some() {
|
||||
return Err(ServerError::AlreadyRunning);
|
||||
}
|
||||
|
||||
let mut status = self.status.write().await;
|
||||
*status = InstanceStatus::Starting;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
let jar_path: PathBuf = self.data.jar_path.clone();
|
||||
let root_dir: PathBuf = self.data.root_dir.clone();
|
||||
async fn transition_status(&self, status: InstanceStatus) {
|
||||
let mut guard = self.status.write().await;
|
||||
*guard = status;
|
||||
}
|
||||
|
||||
fn build_start_command(&self) -> process::Command {
|
||||
let mut command = process::Command::new("java");
|
||||
command
|
||||
.arg("-jar")
|
||||
.arg(&jar_path)
|
||||
.arg(&self.data.jar_path)
|
||||
.arg("nogui")
|
||||
.current_dir(&root_dir)
|
||||
.current_dir(&self.data.root_dir)
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::piped())
|
||||
.stdin(Stdio::piped());
|
||||
|
||||
command.process_group(0);
|
||||
command
|
||||
}
|
||||
|
||||
let mut child = command.spawn().map_err(|_| ServerError::CommandFailed)?;
|
||||
fn spawn_child_process(&self, mut command: process::Command) -> Result<Child, ServerError> {
|
||||
command.spawn().map_err(|_| ServerError::CommandFailed)
|
||||
}
|
||||
|
||||
fn setup_stream_pumps(&mut self, mut child: Child) -> Result<(), ServerError> {
|
||||
let stdout = child.stdout.take().ok_or(ServerError::NoStdoutPipe)?;
|
||||
let stderr = child.stderr.take().ok_or(ServerError::NoStderrPipe)?;
|
||||
let stdin = child.stdin.take().ok_or(ServerError::NoStdinPipe)?;
|
||||
|
||||
let child = Arc::new(RwLock::new(child));
|
||||
self.child = Some(child);
|
||||
|
||||
let stdout_tx = self.stdout_tx.clone();
|
||||
let stderr_tx = broadcast::Sender::new(2048);
|
||||
self.stderr_tx = Some(stderr_tx.clone());
|
||||
let shutdown = self.shutdown.clone();
|
||||
|
||||
tokio::spawn(async move {
|
||||
@@ -150,6 +177,21 @@ impl InstanceHandle {
|
||||
}
|
||||
});
|
||||
|
||||
tokio::spawn(async move {
|
||||
let mut stderr_reader = BufReader::new(stderr).lines();
|
||||
loop {
|
||||
match stderr_reader.next_line().await {
|
||||
Ok(Some(line)) => {
|
||||
let _ = stderr_tx.send(StreamLine::stderr(line));
|
||||
}
|
||||
Ok(None) => {
|
||||
break;
|
||||
}
|
||||
_ => break,
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let mut stdin_rx = self.stdin_rx.take().ok_or(ServerError::NoStdinPipe)?;
|
||||
|
||||
tokio::spawn(async move {
|
||||
@@ -173,9 +215,6 @@ impl InstanceHandle {
|
||||
}
|
||||
});
|
||||
|
||||
let mut status = self.status.write().await;
|
||||
*status = InstanceStatus::Running;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -229,7 +268,7 @@ impl InstanceHandle {
|
||||
StreamSource::Stderr => {
|
||||
let rx = match &self.stderr_tx {
|
||||
Some(value) => value.subscribe(),
|
||||
None => return Err(SubscribeError::NoStdout),
|
||||
None => return Err(SubscribeError::NoStderr),
|
||||
};
|
||||
Ok(BroadcastStream::new(rx))
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user