Merge pull request #1 from H3ct0r55/codex/task1
Refactor instance start handling and add stderr streaming
This commit is contained in:
@@ -1,5 +1,5 @@
|
|||||||
use std::{
|
use std::{
|
||||||
fmt::{self, Display, write},
|
fmt::{self, Display},
|
||||||
str::FromStr,
|
str::FromStr,
|
||||||
};
|
};
|
||||||
|
|
||||||
@@ -151,6 +151,13 @@ impl StreamLine {
|
|||||||
source: StreamSource::Stdout,
|
source: StreamSource::Stdout,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn stderr<S: Into<String>>(line: S) -> Self {
|
||||||
|
Self {
|
||||||
|
line: line.into(),
|
||||||
|
source: StreamSource::Stderr,
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Display for StreamLine {
|
impl Display for StreamLine {
|
||||||
|
|||||||
@@ -61,6 +61,9 @@ pub enum HandleError {
|
|||||||
pub enum SubscribeError {
|
pub enum SubscribeError {
|
||||||
#[error("No stdout found")]
|
#[error("No stdout found")]
|
||||||
NoStdout,
|
NoStdout,
|
||||||
|
|
||||||
|
#[error("No stderr found")]
|
||||||
|
NoStderr,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Error)]
|
#[derive(Debug, Clone, Error)]
|
||||||
@@ -83,6 +86,9 @@ pub enum ServerError {
|
|||||||
#[error("Failed to access child stdin pipe")]
|
#[error("Failed to access child stdin pipe")]
|
||||||
NoStdinPipe,
|
NoStdinPipe,
|
||||||
|
|
||||||
|
#[error("Failed to access child stderr pipe")]
|
||||||
|
NoStderrPipe,
|
||||||
|
|
||||||
#[error("Failed to write to stdin")]
|
#[error("Failed to write to stdin")]
|
||||||
StdinWriteFailed,
|
StdinWriteFailed,
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -103,36 +103,63 @@ impl InstanceHandle {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub async fn start(&mut self) -> Result<(), ServerError> {
|
pub async fn start(&mut self) -> Result<(), ServerError> {
|
||||||
|
self.validate_start_parameters().await?;
|
||||||
|
|
||||||
|
self.transition_status(InstanceStatus::Starting).await;
|
||||||
|
|
||||||
|
let command = self.build_start_command();
|
||||||
|
let child = self.spawn_child_process(command)?;
|
||||||
|
|
||||||
|
self.setup_stream_pumps(child)?;
|
||||||
|
|
||||||
|
self.transition_status(InstanceStatus::Running).await;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn validate_start_parameters(&self) -> Result<(), ServerError> {
|
||||||
if self.child.is_some() {
|
if self.child.is_some() {
|
||||||
return Err(ServerError::AlreadyRunning);
|
return Err(ServerError::AlreadyRunning);
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut status = self.status.write().await;
|
Ok(())
|
||||||
*status = InstanceStatus::Starting;
|
}
|
||||||
|
|
||||||
let jar_path: PathBuf = self.data.jar_path.clone();
|
async fn transition_status(&self, status: InstanceStatus) {
|
||||||
let root_dir: PathBuf = self.data.root_dir.clone();
|
let mut guard = self.status.write().await;
|
||||||
|
*guard = status;
|
||||||
|
}
|
||||||
|
|
||||||
|
fn build_start_command(&self) -> process::Command {
|
||||||
let mut command = process::Command::new("java");
|
let mut command = process::Command::new("java");
|
||||||
command
|
command
|
||||||
.arg("-jar")
|
.arg("-jar")
|
||||||
.arg(&jar_path)
|
.arg(&self.data.jar_path)
|
||||||
.arg("nogui")
|
.arg("nogui")
|
||||||
.current_dir(&root_dir)
|
.current_dir(&self.data.root_dir)
|
||||||
.stdout(Stdio::piped())
|
.stdout(Stdio::piped())
|
||||||
|
.stderr(Stdio::piped())
|
||||||
.stdin(Stdio::piped());
|
.stdin(Stdio::piped());
|
||||||
|
|
||||||
command.process_group(0);
|
command.process_group(0);
|
||||||
|
command
|
||||||
|
}
|
||||||
|
|
||||||
let mut child = command.spawn().map_err(|_| ServerError::CommandFailed)?;
|
fn spawn_child_process(&self, mut command: process::Command) -> Result<Child, ServerError> {
|
||||||
|
command.spawn().map_err(|_| ServerError::CommandFailed)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn setup_stream_pumps(&mut self, mut child: Child) -> Result<(), ServerError> {
|
||||||
let stdout = child.stdout.take().ok_or(ServerError::NoStdoutPipe)?;
|
let stdout = child.stdout.take().ok_or(ServerError::NoStdoutPipe)?;
|
||||||
|
let stderr = child.stderr.take().ok_or(ServerError::NoStderrPipe)?;
|
||||||
let stdin = child.stdin.take().ok_or(ServerError::NoStdinPipe)?;
|
let stdin = child.stdin.take().ok_or(ServerError::NoStdinPipe)?;
|
||||||
|
|
||||||
let child = Arc::new(RwLock::new(child));
|
let child = Arc::new(RwLock::new(child));
|
||||||
self.child = Some(child);
|
self.child = Some(child);
|
||||||
|
|
||||||
let stdout_tx = self.stdout_tx.clone();
|
let stdout_tx = self.stdout_tx.clone();
|
||||||
|
let stderr_tx = broadcast::Sender::new(2048);
|
||||||
|
self.stderr_tx = Some(stderr_tx.clone());
|
||||||
let shutdown = self.shutdown.clone();
|
let shutdown = self.shutdown.clone();
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
@@ -150,6 +177,21 @@ impl InstanceHandle {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let mut stderr_reader = BufReader::new(stderr).lines();
|
||||||
|
loop {
|
||||||
|
match stderr_reader.next_line().await {
|
||||||
|
Ok(Some(line)) => {
|
||||||
|
let _ = stderr_tx.send(StreamLine::stderr(line));
|
||||||
|
}
|
||||||
|
Ok(None) => {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
_ => break,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
let mut stdin_rx = self.stdin_rx.take().ok_or(ServerError::NoStdinPipe)?;
|
let mut stdin_rx = self.stdin_rx.take().ok_or(ServerError::NoStdinPipe)?;
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
@@ -173,9 +215,6 @@ impl InstanceHandle {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
let mut status = self.status.write().await;
|
|
||||||
*status = InstanceStatus::Running;
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -229,7 +268,7 @@ impl InstanceHandle {
|
|||||||
StreamSource::Stderr => {
|
StreamSource::Stderr => {
|
||||||
let rx = match &self.stderr_tx {
|
let rx = match &self.stderr_tx {
|
||||||
Some(value) => value.subscribe(),
|
Some(value) => value.subscribe(),
|
||||||
None => return Err(SubscribeError::NoStdout),
|
None => return Err(SubscribeError::NoStderr),
|
||||||
};
|
};
|
||||||
Ok(BroadcastStream::new(rx))
|
Ok(BroadcastStream::new(rx))
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user