Events feature skeleton
This commit is contained in:
@@ -11,7 +11,7 @@ license = "MIT"
|
|||||||
publish = false
|
publish = false
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
default = ["core"]
|
default = ["core", "events"]
|
||||||
# Core runtime requirements for the currently implemented functionality.
|
# Core runtime requirements for the currently implemented functionality.
|
||||||
core = ["dep:thiserror", "dep:tokio", "dep:tokio-stream", "dep:tokio-util"]
|
core = ["dep:thiserror", "dep:tokio", "dep:tokio-stream", "dep:tokio-util"]
|
||||||
# Placeholder for upcoming event-driven functionality.
|
# Placeholder for upcoming event-driven functionality.
|
||||||
|
|||||||
@@ -5,6 +5,8 @@ use std::fmt::{self, Display};
|
|||||||
pub enum StreamSource {
|
pub enum StreamSource {
|
||||||
Stdout,
|
Stdout,
|
||||||
Stderr,
|
Stderr,
|
||||||
|
#[cfg(feature = "events")]
|
||||||
|
Event,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Captures a single line of process output along with its origin stream.
|
/// Captures a single line of process output along with its origin stream.
|
||||||
@@ -14,6 +16,12 @@ pub struct StreamLine {
|
|||||||
source: StreamSource,
|
source: StreamSource,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "events")]
|
||||||
|
pub struct InstanceEvent {}
|
||||||
|
|
||||||
|
#[cfg(feature = "events")]
|
||||||
|
pub enum Events {}
|
||||||
|
|
||||||
impl StreamLine {
|
impl StreamLine {
|
||||||
pub fn new<S: Into<String>>(line: S, source: StreamSource) -> Self {
|
pub fn new<S: Into<String>>(line: S, source: StreamSource) -> Self {
|
||||||
Self {
|
Self {
|
||||||
|
|||||||
@@ -92,5 +92,3 @@ pub enum ServerError {
|
|||||||
#[error("Failed to write to stdin")]
|
#[error("Failed to write to stdin")]
|
||||||
StdinWriteFailed,
|
StdinWriteFailed,
|
||||||
}
|
}
|
||||||
|
|
||||||
type Result<T> = std::result::Result<T, Error>;
|
|
||||||
|
|||||||
@@ -13,6 +13,8 @@ use crate::{
|
|||||||
error::{HandleError, ServerError, SubscribeError},
|
error::{HandleError, ServerError, SubscribeError},
|
||||||
};
|
};
|
||||||
|
|
||||||
|
use tokio_stream::StreamExt;
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone)]
|
||||||
pub struct InstanceData {
|
pub struct InstanceData {
|
||||||
pub root_dir: PathBuf,
|
pub root_dir: PathBuf,
|
||||||
@@ -38,6 +40,8 @@ pub struct InstanceHandle {
|
|||||||
pub status: Arc<RwLock<InstanceStatus>>,
|
pub status: Arc<RwLock<InstanceStatus>>,
|
||||||
stdout_tx: broadcast::Sender<StreamLine>,
|
stdout_tx: broadcast::Sender<StreamLine>,
|
||||||
stderr_tx: Option<broadcast::Sender<StreamLine>>,
|
stderr_tx: Option<broadcast::Sender<StreamLine>>,
|
||||||
|
#[cfg(feature = "events")]
|
||||||
|
events_tx: broadcast::Sender<StreamLine>,
|
||||||
stdin_tx: mpsc::Sender<String>,
|
stdin_tx: mpsc::Sender<String>,
|
||||||
stdin_rx: Option<mpsc::Receiver<String>>,
|
stdin_rx: Option<mpsc::Receiver<String>>,
|
||||||
child: Option<Arc<RwLock<Child>>>,
|
child: Option<Arc<RwLock<Child>>>,
|
||||||
@@ -81,6 +85,8 @@ impl InstanceHandle {
|
|||||||
status: Arc::new(RwLock::new(status)),
|
status: Arc::new(RwLock::new(status)),
|
||||||
stdout_tx: broadcast::Sender::new(2048),
|
stdout_tx: broadcast::Sender::new(2048),
|
||||||
stderr_tx: None,
|
stderr_tx: None,
|
||||||
|
#[cfg(feature = "events")]
|
||||||
|
events_tx: broadcast::Sender::new(2048),
|
||||||
stdin_tx,
|
stdin_tx,
|
||||||
stdin_rx: Some(stdin_rx),
|
stdin_rx: Some(stdin_rx),
|
||||||
child: None,
|
child: None,
|
||||||
@@ -114,6 +120,8 @@ impl InstanceHandle {
|
|||||||
|
|
||||||
self.transition_status(InstanceStatus::Running).await;
|
self.transition_status(InstanceStatus::Running).await;
|
||||||
|
|
||||||
|
self.setup_parser()?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -128,6 +136,7 @@ impl InstanceHandle {
|
|||||||
async fn transition_status(&self, status: InstanceStatus) {
|
async fn transition_status(&self, status: InstanceStatus) {
|
||||||
let mut guard = self.status.write().await;
|
let mut guard = self.status.write().await;
|
||||||
*guard = status;
|
*guard = status;
|
||||||
|
drop(guard);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn build_start_command(&self) -> process::Command {
|
fn build_start_command(&self) -> process::Command {
|
||||||
@@ -162,6 +171,9 @@ impl InstanceHandle {
|
|||||||
self.stderr_tx = Some(stderr_tx.clone());
|
self.stderr_tx = Some(stderr_tx.clone());
|
||||||
let shutdown = self.shutdown.clone();
|
let shutdown = self.shutdown.clone();
|
||||||
|
|
||||||
|
let stdout_status = self.status.clone();
|
||||||
|
let stderr_status = self.status.clone();
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let mut stdout_reader = BufReader::new(stdout).lines();
|
let mut stdout_reader = BufReader::new(stdout).lines();
|
||||||
loop {
|
loop {
|
||||||
@@ -169,10 +181,19 @@ impl InstanceHandle {
|
|||||||
Ok(Some(line)) => {
|
Ok(Some(line)) => {
|
||||||
let _ = stdout_tx.send(StreamLine::stdout(line));
|
let _ = stdout_tx.send(StreamLine::stdout(line));
|
||||||
}
|
}
|
||||||
Ok(None) => {
|
_ => {
|
||||||
|
let status_guard = stdout_status.read().await;
|
||||||
|
if *status_guard != InstanceStatus::Killing
|
||||||
|
|| *status_guard != InstanceStatus::Stopping
|
||||||
|
{
|
||||||
|
drop(status_guard);
|
||||||
|
let mut status = stdout_status.write().await;
|
||||||
|
*status = InstanceStatus::Crashed;
|
||||||
|
drop(status);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
_ => break,
|
drop(status_guard);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
@@ -184,10 +205,19 @@ impl InstanceHandle {
|
|||||||
Ok(Some(line)) => {
|
Ok(Some(line)) => {
|
||||||
let _ = stderr_tx.send(StreamLine::stderr(line));
|
let _ = stderr_tx.send(StreamLine::stderr(line));
|
||||||
}
|
}
|
||||||
Ok(None) => {
|
_ => {
|
||||||
|
let status_guard = stderr_status.read().await;
|
||||||
|
if *status_guard != InstanceStatus::Killing
|
||||||
|
|| *status_guard != InstanceStatus::Stopping
|
||||||
|
{
|
||||||
|
drop(status_guard);
|
||||||
|
let mut status = stderr_status.write().await;
|
||||||
|
*status = InstanceStatus::Crashed;
|
||||||
|
drop(status);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
_ => break,
|
drop(status_guard);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
@@ -218,10 +248,38 @@ impl InstanceHandle {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "events")]
|
||||||
|
fn setup_parser(&mut self) -> Result<(), ServerError> {
|
||||||
|
let stdout_stream = self
|
||||||
|
.subscribe(StreamSource::Stdout)
|
||||||
|
.map_err(|_| ServerError::NoStdoutPipe)?;
|
||||||
|
let shutdown = self.shutdown.clone();
|
||||||
|
let event_tx = self.events_tx.clone();
|
||||||
|
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let mut rx = stdout_stream;
|
||||||
|
|
||||||
|
loop {
|
||||||
|
tokio::select! {
|
||||||
|
_ = shutdown.cancelled() => {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
line = rx.next() => {
|
||||||
|
match line {
|
||||||
|
Some(Ok(_)) => {
|
||||||
|
},
|
||||||
|
_ => (),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn kill(&mut self) -> Result<(), ServerError> {
|
pub async fn kill(&mut self) -> Result<(), ServerError> {
|
||||||
if let Some(child_arc) = self.child.clone() {
|
if let Some(child_arc) = self.child.clone() {
|
||||||
let mut status = self.status.write().await;
|
self.transition_status(InstanceStatus::Killing).await;
|
||||||
*status = InstanceStatus::Killing;
|
|
||||||
let mut child = child_arc.write().await;
|
let mut child = child_arc.write().await;
|
||||||
|
|
||||||
child.kill().await.map_err(|_| ServerError::CommandFailed)?;
|
child.kill().await.map_err(|_| ServerError::CommandFailed)?;
|
||||||
@@ -229,8 +287,7 @@ impl InstanceHandle {
|
|||||||
self.shutdown.cancel();
|
self.shutdown.cancel();
|
||||||
self.child = None;
|
self.child = None;
|
||||||
|
|
||||||
let mut status = self.status.write().await;
|
self.transition_status(InstanceStatus::Killed).await;
|
||||||
*status = InstanceStatus::Killed;
|
|
||||||
Ok(())
|
Ok(())
|
||||||
} else {
|
} else {
|
||||||
Err(ServerError::NotRunning)
|
Err(ServerError::NotRunning)
|
||||||
@@ -239,17 +296,15 @@ impl InstanceHandle {
|
|||||||
|
|
||||||
pub async fn stop(&mut self) -> Result<(), ServerError> {
|
pub async fn stop(&mut self) -> Result<(), ServerError> {
|
||||||
if let Some(child_arc) = self.child.clone() {
|
if let Some(child_arc) = self.child.clone() {
|
||||||
let mut status = self.status.write().await;
|
self.transition_status(InstanceStatus::Stopping).await;
|
||||||
*status = InstanceStatus::Stopping;
|
|
||||||
|
|
||||||
_ = self.send_command("stop").await;
|
_ = self.send_command("stop").await;
|
||||||
|
|
||||||
let mut child = child_arc.write().await;
|
let mut child = child_arc.write().await;
|
||||||
child.wait().await.map_err(|_| ServerError::CommandFailed)?;
|
child.wait().await.map_err(|_| ServerError::CommandFailed)?;
|
||||||
self.shutdown.cancel();
|
self.shutdown.cancel();
|
||||||
self.child = None;
|
self.child = None;
|
||||||
let mut status = self.status.write().await;
|
|
||||||
*status = InstanceStatus::Stopped;
|
self.transition_status(InstanceStatus::Stopped).await;
|
||||||
Ok(())
|
Ok(())
|
||||||
} else {
|
} else {
|
||||||
Err(ServerError::NotRunning)
|
Err(ServerError::NotRunning)
|
||||||
@@ -272,6 +327,11 @@ impl InstanceHandle {
|
|||||||
};
|
};
|
||||||
Ok(BroadcastStream::new(rx))
|
Ok(BroadcastStream::new(rx))
|
||||||
}
|
}
|
||||||
|
#[cfg(feature = "events")]
|
||||||
|
StreamSource::Event => {
|
||||||
|
let rx = self.events_tx.subscribe();
|
||||||
|
Ok(BroadcastStream::new(rx))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user