-- | -- Module : Process.Supervisor -- Description : Primitives for supervising a single child process. -- -- This module defines the types and operations used by 'withProcess' to run -- and supervise one child process. It provides: -- -- - configuration via 'ProcessOptions', -- - the 'Process' handle and supervision result types, -- - and functions for waiting, polling, signalling, and requesting -- stdin closure. -- -- The supervision behaviour itself is documented on 'withProcess'. -- -- == Limitations -- -- - The exposed PID is advisory and may be recycled. -- - No pidfd support (pending upstream @process@ APIs). -- - No supervision trees, restart strategies, or backpressure. module Process.Supervisor ( -- * Core types Process , ProcessOptions(..) , GracePeriod(..) , SupervisionResult(..) , Termination(..) , ProcessError(..) -- * Running and supervising a process , defaultOptions , withProcess -- * Interacting with a running process , waitProcess , pollResult , getAdvisoryPid , sendSignal , requestStdinClose ) where import Control.Concurrent (MVar, newEmptyMVar, tryPutMVar, tryReadMVar, readMVar) import Control.Concurrent.Async (async, cancel, waitCatch) import Control.Exception ( bracket, catch, try, SomeException, throwIO, Exception , fromException, mask, asyncExceptionFromException, SomeAsyncException ) import Control.Monad (void, when) import Data.ByteString qualified as BS import Data.Int (Int64) import Data.Maybe (isJust) import Foreign.C.Error (Errno(Errno), ePIPE) import GHC.Generics (Generic) import GHC.IO.Exception (IOErrorType(..), IOException(..)) import System.Exit (ExitCode(..)) import System.IO (Handle, hClose, hFlush, hIsEOF, hPutStrLn, hWaitForInput, stderr) import System.IO.Error (ioeGetErrorType, isEOFError) import System.Posix.Signals (Signal, signalProcess, sigKILL) import System.Posix.Types (CPid) import System.Process qualified as P import System.Timeout (timeout) -- | Handle for a supervised child process. -- -- Created by 'withProcess' and valid for the duration of the user action. -- Provides access to the final result and basic interaction primitives. data Process = Process { pFinalResult :: !(MVar SupervisionResult) -- ^ Final result, written exactly once by the coordinator. , pStopStdin :: !(MVar ()) -- ^ Request flag for closing stdin. , pProcessHandle :: !P.ProcessHandle -- ^ Underlying process handle. } -- | Configuration for running a supervised process. -- -- These options control process creation and worker behaviour. They do not -- affect the escalation or classification logic used by 'withProcess'. -- -- Default values can obtained via 'defaultOptions'. data ProcessOptions = ProcessOptions { optFlushStdin :: !Bool -- ^ Flush after each stdin write. /Default: 'True'./ , optGracePeriod :: !GracePeriod -- ^ Grace period for natural exit and TERM‑graceful exit. /Default: 5 seconds./ , optBufferSize :: !Int -- ^ Chunk size for stdout\/stderr reads. /Default: 4096 bytes./ , optCmd :: !FilePath -- ^ Executable to run. , optArgs :: ![String] -- ^ Arguments to pass. , optEnv :: !(Maybe [(String, String)]) -- ^ Optional environment override. /Default: 'Nothing'./ , optCwd :: !(Maybe FilePath) -- ^ Optional working directory. /Default: 'Nothing'./ } -- | Construct default 'ProcessOptions' for the given command and arguments. defaultOptions :: FilePath -> [String] -> ProcessOptions defaultOptions cmd args = ProcessOptions { optFlushStdin = True , optGracePeriod = GracePeriod 5_000_000 , optBufferSize = 4096 , optCmd = cmd , optArgs = args , optEnv = Nothing , optCwd = Nothing } -- | Microseconds to wait for natural or TERM‑graceful exit. newtype GracePeriod = GracePeriod { getGracePeriodMicros :: Int64 -- ^ Duration in microseconds. } deriving (Show, Eq, Ord) -- | Final outcome of a supervised process. -- -- Note: Exceptions in stdin\/stdout\/stderr workers—including exceptions thrown by user‑supplied producer and consumer -- actions—are treated as worker failures and do not appear in 'srSupervisorException'. Worker failures do not affect -- process classification. data SupervisionResult = SupervisionResult { srExitCode :: !(Maybe ExitCode) -- ^ Best exit code observed, if any. , srTermination :: !Termination -- ^ Classification based on escalation attempts and exit code. , srSupervisorException :: !(Maybe SomeException) -- ^ Synchronous exception raised by the coordinator, if any. } deriving (Show, Generic) instance Eq SupervisionResult where a == b = srExitCode a == srExitCode b && srTermination a == srTermination b && fmap show (srSupervisorException a) == fmap show (srSupervisorException b) -- | Classification of how the process terminated. -- -- Derived solely from escalation attempts and the observed exit code. data Termination = Clean -- ^ Natural exit; no TERM or KILL attempted. | Escalated -- ^ TERM or KILL was attempted. | Indeterminate -- ^ No exit code observed and no conclusive escalation. deriving (Eq, Show) -- | Errors that may occur while starting or supervising a process. data ProcessError = PipeCreationFailed String -- ^ 'createProcess' returned missing pipes. | WorkerDied String String SomeException -- ^ A worker thread (stdin\/stdout\/stderr) failed with an exception. | ProcessStartFailed SomeException -- ^ 'createProcess' itself failed. | InvalidOptions String -- ^ Option validation failed. deriving (Show, Generic) instance Exception ProcessError -- | Run and supervise a single child process. -- -- Starts the configured command, spawns workers for stdin\/stdout\/stderr, -- and runs a coordinator that observes exit status and applies a fixed -- escalation procedure: -- -- natural exit → TERM → wait → KILL → fallback -- -- The coordinator writes exactly one 'SupervisionResult' containing the best -- exit code observed (if any), a termination classification, and a synchronous -- exception if the supervisor itself fails. -- -- The user action receives a 'Process' handle and may interact with the -- running process. When the action finishes or throws, all workers and the -- coordinator are cancelled and the process is torn down. -- -- Stdin\/stdout\/stderr workers forward data and treat benign I\/O errors as -- non‑fatal. Requesting stdin closure via 'requestStdinClose' causes the -- stdin worker to close its handle once pending writes complete. withProcess :: ProcessOptions -> IO (Maybe BS.ByteString) -- ^ Producer for stdin chunks; 'Nothing' closes stdin. -> (BS.ByteString -> IO ()) -- ^ Consumer for stdout chunks. -> (BS.ByteString -> IO ()) -- ^ Consumer for stderr chunks. -> (Process -> IO a) -- ^ User action. -> IO a withProcess opts produceStdin consumeStdout consumeStderr action = do validateOptions opts let cp = (P.proc (optCmd opts) (optArgs opts)) { P.env = optEnv opts, P.cwd = optCwd opts , P.std_in = P.CreatePipe, P.std_out = P.CreatePipe, P.std_err = P.CreatePipe } -- coordination vars finalResult <- newEmptyMVar -- authoritative final result (coordinator writes exactly once) stopStdin <- newEmptyMVar stdinClosed <- newEmptyMVar -- worker sets this when it closes stdin -- protect createProcess + immediate registration mask \restore -> do (mhin, mhout, mherr, ph) <- (P.createProcess cp) `catchSync` (throwIO . ProcessStartFailed) -- validate pipes case (mhin, mhout, mherr) of (Just hin, Just hout, Just herr) -> do -- start coordinator (single authoritative writer) coordA <- async (coordinator opts ph finalResult) -- start workers as Asyncs so we can cancel them deterministically stdinA <- async (writerWorker "stdin" hin produceStdin (optFlushStdin opts) stopStdin stdinClosed) stdoutA <- async (readerWorker "stdout" hout consumeStdout (optBufferSize opts)) stderrA <- async (readerWorker "stderr" herr consumeStderr (optBufferSize opts)) -- release action: cancel workers in safe order, wait for coordinator, cleanup let release = mask \_ -> do -- request stdin worker to stop void $ tryPutMVar stopStdin () -- cancel stdin worker first to interrupt blocked writes cancel stdinA void (waitCatch stdinA) -- close stdin only if worker didn't already close it mClosed <- tryReadMVar stdinClosed case mClosed of Just _ -> pure () Nothing -> do r <- trySync (hClose hin) case r of Left e -> hPutStrLn stderr $ "Process.release: hClose hin failed: " ++ show e Right _ -> pure () -- cancel stdout/stderr workers and wait for them cancel stdoutA cancel stderrA void (waitCatch stdoutA) void (waitCatch stderrA) -- wait for coordinator to finish (it will publish finalResult) void (waitCatch coordA) -- close remaining handles (best-effort) r1 <- trySync (hClose hout) case r1 of Left e -> hPutStrLn stderr $ "Process.release: hClose hout failed: " ++ show e Right _ -> pure () r2 <- trySync (hClose herr) case r2 of Left e -> hPutStrLn stderr $ "Process.release: hClose herr failed: " ++ show e Right _ -> pure () -- run user action under bracket so release runs bracket (pure ()) (const release) \_ -> do restore (action (Process finalResult stopStdin ph)) `catch` \(e :: SomeException) -> do -- ensure workers cancelled; release will also run cancel coordA cancel stdinA cancel stdoutA cancel stderrA throwIO e _ -> throwIO $ PipeCreationFailed "createProcess returned missing pipe(s)" ------------------------------------------------------------------------------- -- Note [Async Exceptions] -- -- Async exceptions interact with the supervisor as follows: -- -- - User action: unmasked. Async exceptions are delivered immediately and -- trigger teardown. -- -- - Workers (stdin/stdout/stderr): unmasked. Async exceptions are delivered -- immediately; workers terminate without affecting classification. -- -- - Coordinator: runs under 'mask'. Async exceptions are deferred in masked -- regions and may be delivered only inside 'restore' (e.g. during -- waitForProcess, timeout, terminateProcess). Async exceptions delivered -- to the coordinator during teardown are caught by the outer handler and -- do not count as supervisor failures. -- -- - publishFinal: executed in a masked region. Async exceptions are deferred, -- ensuring the final result is published exactly once. -- -- - catchSync: never handles async exceptions; they always propagate. -- -- - Teardown: fully masked. Cleanup is *not* interruptible at blocking -- operations. Async exceptions cannot interrupt teardown. -- -- In summary: async exceptions interrupt the user action and workers -- immediately, may interrupt the coordinator only at restore points, and can -- never interrupt final-result publication or teardown. ------------------------------------------------------------------------------- -- Coordinator: -- Owns all exit observation and classification. -- Applies the fixed escalation sequence (natural exit → TERM → wait → KILL → fallback). -- Publishes the final result exactly once. -- -- Workers never call waitForProcess; this avoids multi-threaded exit races. -- -- Note: The coordinator may publish 'Indeterminate' if no exit is observed -- within the escalation sequence, even if the process eventually exits after -- teardown. This is inherent to the design: the supervisor reports the best -- information available at the time of finalisation. coordinator :: ProcessOptions -> P.ProcessHandle -> MVar SupervisionResult -- finalResult (coordinator writes authoritative final result here) -> IO () coordinator opts ph finalResult = mask \restore -> (do let gp = getGracePeriodMicros (optGracePeriod opts) -- classify termination based on whether TERM/KILL were attempted and whether an exit code is known classifyTerm didSendTerm didKill mExit = case (didSendTerm || didKill, didKill, mExit) of (False, False, Just _) -> Clean (True, _, Just _) -> Escalated (_, True, Nothing) -> Escalated _ -> Indeterminate -- 1) First chance: wait for natural exit within grace period mExit1 <- restore (timeout (fromIntegral gp) (P.waitForProcess ph)) case mExit1 of Just ec -> publishFinal (Just ec) (classifyTerm False False (Just ec)) Nothing Nothing -> do -- 2) No exit observed: attempt graceful termination termResult <- restore (trySync (P.terminateProcess ph)) let didSendTerm = either (const False) (const True) termResult -- 3) Wait again for exit within grace period mExit2 <- restore (timeout (fromIntegral gp) (P.waitForProcess ph)) case mExit2 of Just ec2 -> publishFinal (Just ec2) (classifyTerm didSendTerm False (Just ec2)) Nothing Nothing -> do -- 4) Still no exit: attempt SIGKILL if safe didKill <- restore (safeKillUsingHandle ph) -- 5) Wait short time for exit to appear mExit3 <- restore (timeout 1_000_000 (P.waitForProcess ph)) case mExit3 of Just ec3 -> publishFinal (Just ec3) (classifyTerm didSendTerm didKill (Just ec3)) Nothing Nothing -> do -- 6) No exit ever observed: fall back to getProcessExitCode mExit <- restore (P.getProcessExitCode ph) publishFinal mExit (classifyTerm didSendTerm didKill mExit) Nothing ) `catch` \e -> if isAsync e then publishFinal Nothing Indeterminate Nothing else publishFinal Nothing Indeterminate (Just e) where publishFinal mExit term mExc = -- publication itself should be uninterruptible void $ tryPutMVar finalResult (SupervisionResult mExit term mExc) ------------------------------------------------------------------------------- -- Workers ------------------------------------------------------------------------------- -- writerWorker: -- - Continuously reads chunks from the producer and writes them to the Handle. -- - Honors the stop signal by closing the Handle once pending writes complete. -- - Treats benign I/O errors as non-fatal. -- - Benign read errors from the producer are treated as EOF. -- -- EOF is signalled by the producer returning 'Nothing'. -- -- Exception semantics: -- The 'produce' action may throw synchronous exceptions to signal failure. -- Async exceptions thrown into the worker (e.g. via 'cancel') are treated as -- cancellation and do not propagate; the worker exits quietly. writerWorker :: String -> Handle -> IO (Maybe BS.ByteString) -> Bool -> MVar () -> MVar () -> IO () writerWorker name h produce shouldFlush stopSignal handleClosed = loop `catch` ignoreAsync where loop = tryReadMVar stopSignal >>= \case Just _ -> -- Stop requested before reading input closeHandle Nothing -> do mChunk <- produce `catchSync` \e -> if isBenignReadError e then pure Nothing else throwIO (WorkerDied name "read" e) case mChunk of Nothing -> -- EOF from producer closeHandle Just chunk -> do -- Stop might have been requested during or after the read, but since we have the chunk, we're writing it. (BS.hPut h chunk >> when shouldFlush (hFlush h)) `catchSync` \e -> if isBenignWriteError e then pure () else throwIO (WorkerDied name "write" e) loop closeHandle :: IO () closeHandle = do void $ trySync (hClose h) void $ tryPutMVar handleClosed () ignoreAsync :: SomeException -> IO () ignoreAsync e = case fromException e of Just (_ :: SomeAsyncException) -> pure () Nothing -> throwIO e -- readerWorker: -- - Continuously reads chunks from the Handle and writes them to the consumer. -- - Uses a fixed 10ms wait to avoid busy looping on empty reads. -- - Treats benign I/O errors as non-fatal. -- -- The worker must handle the fact that the Handle layer may occasionally -- return an empty chunk without indicating EOF. These empty reads are not -- forwarded; EOF is checked explicitly and the worker retries after a brief -- wait. -- -- Exception semantics: -- The 'consume' action may throw synchronous exceptions to signal failure. -- Async exceptions thrown into the worker (e.g. via 'cancel') are treated as -- cancellation and do not propagate; the worker exits quietly. -- -- Note [hWaitForInput Timeout] -- hWaitForInput may return False if the timeout expires before data becomes -- available. This is not an error and does not indicate EOF; it simply -- means “no data yet”. The worker ignores the timeout result and continues -- polling after the delay. readerWorker :: String -> Handle -> (BS.ByteString -> IO ()) -> Int -> IO () readerWorker name h consume bufSize = loop `catch` ignoreAsync where loop = do chunk <- BS.hGetSome h bufSize `catchSync` \e -> if isBenignReadError e then pure BS.empty else throwIO (WorkerDied name "read" e) if BS.null chunk then do eof <- hIsEOF h if eof then pure () else do _ <- hWaitForInput h retryDelayMs loop else do consume chunk `catchSync` \e -> throwIO (WorkerDied name "write" e) loop retryDelayMs :: Int retryDelayMs = 10 ignoreAsync :: SomeException -> IO () ignoreAsync e = case fromException e of Just (_ :: SomeAsyncException) -> pure () Nothing -> throwIO e ------------------------------------------------------------------------------- -- API helpers ------------------------------------------------------------------------------- -- | Block until the final supervision result is available. waitProcess :: Process -> IO SupervisionResult waitProcess = readMVar . pFinalResult -- | Non‑blocking check for the final supervision result. -- -- Returns 'Nothing' if the result has not yet been published. pollResult :: Process -> IO (Maybe SupervisionResult) pollResult = tryReadMVar . pFinalResult -- | Return the PID of the child process, if available. -- -- The PID is advisory and may be recycled by the operating system. getAdvisoryPid :: Process -> IO (Maybe CPid) getAdvisoryPid = P.getPid . pProcessHandle -- | Send a signal to the child process if it is still running. -- -- Uses the advisory PID and does nothing if the process has already exited. -- -- The PID is advisory and may have been recycled. sendSignal :: Process -> Signal -> IO () sendSignal p sig = P.getProcessExitCode (pProcessHandle p) >>= \case Just _ -> pure () Nothing -> getAdvisoryPid p >>= \case Nothing -> pure () Just pid -> void $ trySync (signalProcess sig pid) -- | Ask the stdin worker to close its handle once pending writes complete. requestStdinClose :: Process -> IO () requestStdinClose p = void $ tryPutMVar (pStopStdin p) () -- safeKillUsingHandle: -- Attempts SIGKILL if the process still appears alive. -- Re-checks exit status before signalling to reduce PID-recycling races. -- -- Note: -- PID recycling cannot be eliminated without pidfd support. safeKillUsingHandle :: P.ProcessHandle -> IO Bool safeKillUsingHandle ph = -- If process already exited, nothing to do P.getProcessExitCode ph >>= \case Just _ -> pure False Nothing -> do -- Ask ProcessHandle for pid (may be Nothing) mpid <- P.getPid ph case mpid of Nothing -> pure False Just pid -> do -- Re-check handle liveness before signalling P.getProcessExitCode ph >>= \case Just _ -> pure False Nothing -> trySync (signalProcess sigKILL pid) >>= \case Left _ -> pure False Right _ -> pure True -- XX once P.getPidFd and pidfdSendSignal :: Fd -> Signal -> IO () exists: --safeKillUsingHandle ph = do -- mExit <- P.getProcessExitCode ph -- case mExit of -- Just _ -> pure False -- Nothing -> -- case P.getPidFd ph of -- Nothing -> pure False -- Just pidfd -> do -- r <- trySync (pidfdSendSignal pidfd sigKILL) -- pure (either (const False) (const True) r) ------------------------------------------------------------------------------- -- Error helpers and benign classification ------------------------------------------------------------------------------- isAsync :: SomeException -> Bool isAsync e = isJust (asyncExceptionFromException e :: Maybe SomeAsyncException) -- | Run an action, handling only synchronous exceptions. -- -- Async exceptions are rethrown; the handler sees only non-async exceptions. catchSync :: IO a -> (SomeException -> IO a) -> IO a catchSync act h = mask \restore -> restore act `catch` \e -> if isAsync e then throwIO e else restore (h e) -- | Like 'try', but rethrows async exceptions instead of returning them. -- -- Returns 'Left' only for non-async exceptions. trySync :: IO a -> IO (Either SomeException a) trySync act = mask \restore -> do r <- try (restore act) case r of Left e | isAsync e -> throwIO e | otherwise -> pure (Left e) Right x -> pure (Right x) -- Conservative benign classification: -- - Reads: EOF or ResourceVanished may be benign (EOF). -- - Writes: treat EOF/ResourceVanished conservatively; if you need errno-level EPIPE detection, -- convert the handler to an IO-based check and inspect errno. isBenignReadError :: SomeException -> Bool isBenignReadError se = case fromException se of Just (ioe :: IOException) -> isEOFError ioe || ioeGetErrorType ioe == ResourceVanished Nothing -> False isBenignWriteError :: SomeException -> Bool isBenignWriteError se = case fromException se of Just ioe -> case ioe_errno ioe of Just errno -> Errno errno == ePIPE Nothing -> False Nothing -> False ------------------------------------------------------------------------------- -- Misc helpers ------------------------------------------------------------------------------- -- | Validate 'ProcessOptions' and throw 'InvalidOptions' on invalid values. -- -- Checks command presence, buffer size bounds, and non-negative grace period. validateOptions :: ProcessOptions -> IO () validateOptions opts = do let gp = getGracePeriodMicros (optGracePeriod opts) buf = optBufferSize opts maxBuf = 16 * 1024 * 1024 -- 16MB, arbitrary but sane upper bound when (null $ optCmd opts) $ throwIO $ InvalidOptions "optCmd cannot be empty" when (buf <= 0) $ throwIO $ InvalidOptions "optBufferSize must be > 0" when (buf > maxBuf) $ throwIO $ InvalidOptions "optBufferSize is unreasonably large" when (gp < 0) $ throwIO $ InvalidOptions "optGracePeriod must be non-negative" when (gp > fromIntegral (maxBound :: Int)) $ throwIO $ InvalidOptions "optGracePeriod too large for this platform"