From 726f659e41382227296bb257e54693c5bc307d40 Mon Sep 17 00:00:00 2001 From: tv Date: Mon, 9 Feb 2026 03:50:35 +0100 Subject: WIP --- Process/Supervisor.hs | 574 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 574 insertions(+) create mode 100644 Process/Supervisor.hs (limited to 'Process/Supervisor.hs') diff --git a/Process/Supervisor.hs b/Process/Supervisor.hs new file mode 100644 index 0000000..9c2111f --- /dev/null +++ b/Process/Supervisor.hs @@ -0,0 +1,574 @@ +-- | +-- Module : Process.Supervisor +-- Description : Primitives for supervising a single child process. +-- +-- This module defines the types and operations used by 'withProcess' to run +-- and supervise one child process. It provides: +-- +-- - configuration via 'ProcessOptions', +-- - the 'Process' handle and supervision result types, +-- - and functions for waiting, polling, signalling, and requesting +-- stdin closure. +-- +-- The supervision behaviour itself is documented on 'withProcess'. +-- +-- == Limitations +-- +-- - The exposed PID is advisory and may be recycled. +-- - No pidfd support (pending upstream @process@ APIs). +-- - No supervision trees, restart strategies, or backpressure. + +module Process.Supervisor + ( -- * Core types + Process + , ProcessOptions(..) + , GracePeriod(..) + , SupervisionResult(..) + , Termination(..) + , ProcessError(..) + + -- * Running and supervising a process + , defaultOptions + , withProcess + + -- * Interacting with a running process + , waitProcess + , pollResult + , getAdvisoryPid + , sendSignal + , requestStdinClose + ) where + +import Control.Concurrent (MVar, newEmptyMVar, tryPutMVar, tryReadMVar, readMVar) +import Control.Concurrent.Async (async, cancel, waitCatch) +import Control.Exception + ( bracket, catch, try, SomeException, throwIO, Exception + , fromException, mask, asyncExceptionFromException, SomeAsyncException + ) +import Control.Monad (void, when) +import Data.ByteString qualified as BS +import Data.Int (Int64) +import Data.Maybe (isJust) +import Foreign.C.Error (Errno(Errno), ePIPE) +import GHC.Generics (Generic) +import GHC.IO.Exception (IOErrorType(..), IOException(..)) +import System.Exit (ExitCode(..)) +import System.IO (Handle, hClose, hFlush, hIsEOF, hPutStrLn, hWaitForInput, stderr) +import System.IO.Error (ioeGetErrorType, isEOFError) +import System.Posix.Signals (Signal, signalProcess, sigKILL) +import System.Posix.Types (CPid) +import System.Process qualified as P +import System.Timeout (timeout) + +-- | Handle for a supervised child process. +-- +-- Created by 'withProcess' and valid for the duration of the user action. +-- Provides access to the final result and basic interaction primitives. +data Process = Process + { pFinalResult :: !(MVar SupervisionResult) -- ^ Final result, written exactly once by the coordinator. + , pStopStdin :: !(MVar ()) -- ^ Request flag for closing stdin. + , pProcessHandle :: !P.ProcessHandle -- ^ Underlying process handle. + } + +-- | Configuration for running a supervised process. +-- +-- These options control process creation and worker behaviour. They do not +-- affect the escalation or classification logic used by 'withProcess'. +-- +-- Default values can obtained via 'defaultOptions'. +data ProcessOptions = ProcessOptions + { optFlushStdin :: !Bool -- ^ Flush after each stdin write. /Default: 'True'./ + , optGracePeriod :: !GracePeriod -- ^ Grace period for natural exit and TERM‑graceful exit. /Default: 5 seconds./ + , optBufferSize :: !Int -- ^ Chunk size for stdout\/stderr reads. /Default: 4096 bytes./ + , optCmd :: !FilePath -- ^ Executable to run. + , optArgs :: ![String] -- ^ Arguments to pass. + , optEnv :: !(Maybe [(String, String)]) -- ^ Optional environment override. /Default: 'Nothing'./ + , optCwd :: !(Maybe FilePath) -- ^ Optional working directory. /Default: 'Nothing'./ + } + +-- | Construct default 'ProcessOptions' for the given command and arguments. +defaultOptions :: FilePath -> [String] -> ProcessOptions +defaultOptions cmd args = ProcessOptions + { optFlushStdin = True + , optGracePeriod = GracePeriod 5_000_000 + , optBufferSize = 4096 + , optCmd = cmd + , optArgs = args + , optEnv = Nothing + , optCwd = Nothing + } + +-- | Microseconds to wait for natural or TERM‑graceful exit. +newtype GracePeriod = GracePeriod + { getGracePeriodMicros :: Int64 -- ^ Duration in microseconds. + } deriving (Show, Eq, Ord) + +-- | Final outcome of a supervised process. +-- +-- Note: Exceptions in stdin\/stdout\/stderr workers—including exceptions thrown by user‑supplied producer and consumer +-- actions—are treated as worker failures and do not appear in 'srSupervisorException'. Worker failures do not affect +-- process classification. +data SupervisionResult = SupervisionResult + { srExitCode :: !(Maybe ExitCode) -- ^ Best exit code observed, if any. + , srTermination :: !Termination -- ^ Classification based on escalation attempts and exit code. + , srSupervisorException :: !(Maybe SomeException) -- ^ Synchronous exception raised by the coordinator, if any. + } deriving (Show, Generic) + +instance Eq SupervisionResult where + a == b = + srExitCode a == srExitCode b && + srTermination a == srTermination b && + fmap show (srSupervisorException a) == fmap show (srSupervisorException b) + +-- | Classification of how the process terminated. +-- +-- Derived solely from escalation attempts and the observed exit code. +data Termination + = Clean -- ^ Natural exit; no TERM or KILL attempted. + | Escalated -- ^ TERM or KILL was attempted. + | Indeterminate -- ^ No exit code observed and no conclusive escalation. + deriving (Eq, Show) + +-- | Errors that may occur while starting or supervising a process. +data ProcessError + = PipeCreationFailed String -- ^ 'createProcess' returned missing pipes. + | WorkerDied String String SomeException -- ^ A worker thread (stdin\/stdout\/stderr) failed with an exception. + | ProcessStartFailed SomeException -- ^ 'createProcess' itself failed. + | InvalidOptions String -- ^ Option validation failed. + deriving (Show, Generic) + +instance Exception ProcessError + +-- | Run and supervise a single child process. +-- +-- Starts the configured command, spawns workers for stdin\/stdout\/stderr, +-- and runs a coordinator that observes exit status and applies a fixed +-- escalation procedure: +-- +-- natural exit → TERM → wait → KILL → fallback +-- +-- The coordinator writes exactly one 'SupervisionResult' containing the best +-- exit code observed (if any), a termination classification, and a synchronous +-- exception if the supervisor itself fails. +-- +-- The user action receives a 'Process' handle and may interact with the +-- running process. When the action finishes or throws, all workers and the +-- coordinator are cancelled and the process is torn down. +-- +-- Stdin\/stdout\/stderr workers forward data and treat benign I\/O errors as +-- non‑fatal. Requesting stdin closure via 'requestStdinClose' causes the +-- stdin worker to close its handle once pending writes complete. +withProcess + :: ProcessOptions + -> IO (Maybe BS.ByteString) -- ^ Producer for stdin chunks; 'Nothing' closes stdin. + -> (BS.ByteString -> IO ()) -- ^ Consumer for stdout chunks. + -> (BS.ByteString -> IO ()) -- ^ Consumer for stderr chunks. + -> (Process -> IO a) -- ^ User action. + -> IO a +withProcess opts produceStdin consumeStdout consumeStderr action = do + validateOptions opts + let cp = (P.proc (optCmd opts) (optArgs opts)) + { P.env = optEnv opts, P.cwd = optCwd opts + , P.std_in = P.CreatePipe, P.std_out = P.CreatePipe, P.std_err = P.CreatePipe + } + + -- coordination vars + finalResult <- newEmptyMVar -- authoritative final result (coordinator writes exactly once) + stopStdin <- newEmptyMVar + stdinClosed <- newEmptyMVar -- worker sets this when it closes stdin + + -- protect createProcess + immediate registration + mask \restore -> do + (mhin, mhout, mherr, ph) <- (P.createProcess cp) `catchSync` (throwIO . ProcessStartFailed) + + -- validate pipes + case (mhin, mhout, mherr) of + (Just hin, Just hout, Just herr) -> do + -- start coordinator (single authoritative writer) + coordA <- async (coordinator opts ph finalResult) + + -- start workers as Asyncs so we can cancel them deterministically + stdinA <- async (writerWorker "stdin" hin produceStdin (optFlushStdin opts) stopStdin stdinClosed) + stdoutA <- async (readerWorker "stdout" hout consumeStdout (optBufferSize opts)) + stderrA <- async (readerWorker "stderr" herr consumeStderr (optBufferSize opts)) + + -- release action: cancel workers in safe order, wait for coordinator, cleanup + let release = mask \_ -> do + -- request stdin worker to stop + void $ tryPutMVar stopStdin () + + -- cancel stdin worker first to interrupt blocked writes + cancel stdinA + void (waitCatch stdinA) + + -- close stdin only if worker didn't already close it + mClosed <- tryReadMVar stdinClosed + case mClosed of + Just _ -> pure () + Nothing -> do + r <- trySync (hClose hin) + case r of + Left e -> hPutStrLn stderr $ "Process.release: hClose hin failed: " ++ show e + Right _ -> pure () + + -- cancel stdout/stderr workers and wait for them + cancel stdoutA + cancel stderrA + void (waitCatch stdoutA) + void (waitCatch stderrA) + + -- wait for coordinator to finish (it will publish finalResult) + void (waitCatch coordA) + + -- close remaining handles (best-effort) + r1 <- trySync (hClose hout) + case r1 of + Left e -> hPutStrLn stderr $ "Process.release: hClose hout failed: " ++ show e + Right _ -> pure () + r2 <- trySync (hClose herr) + case r2 of + Left e -> hPutStrLn stderr $ "Process.release: hClose herr failed: " ++ show e + Right _ -> pure () + + -- run user action under bracket so release runs + bracket (pure ()) (const release) \_ -> do + restore (action (Process finalResult stopStdin ph)) `catch` \(e :: SomeException) -> do + -- ensure workers cancelled; release will also run + cancel coordA + cancel stdinA + cancel stdoutA + cancel stderrA + throwIO e + + _ -> throwIO $ PipeCreationFailed "createProcess returned missing pipe(s)" + +------------------------------------------------------------------------------- +-- Note [Async Exceptions] +-- +-- Async exceptions interact with the supervisor as follows: +-- +-- - User action: unmasked. Async exceptions are delivered immediately and +-- trigger teardown. +-- +-- - Workers (stdin/stdout/stderr): unmasked. Async exceptions are delivered +-- immediately; workers terminate without affecting classification. +-- +-- - Coordinator: runs under 'mask'. Async exceptions are deferred in masked +-- regions and may be delivered only inside 'restore' (e.g. during +-- waitForProcess, timeout, terminateProcess). Async exceptions delivered +-- to the coordinator during teardown are caught by the outer handler and +-- do not count as supervisor failures. +-- +-- - publishFinal: executed in a masked region. Async exceptions are deferred, +-- ensuring the final result is published exactly once. +-- +-- - catchSync: never handles async exceptions; they always propagate. +-- +-- - Teardown: fully masked. Cleanup is *not* interruptible at blocking +-- operations. Async exceptions cannot interrupt teardown. +-- +-- In summary: async exceptions interrupt the user action and workers +-- immediately, may interrupt the coordinator only at restore points, and can +-- never interrupt final-result publication or teardown. +------------------------------------------------------------------------------- + +-- Coordinator: +-- Owns all exit observation and classification. +-- Applies the fixed escalation sequence (natural exit → TERM → wait → KILL → fallback). +-- Publishes the final result exactly once. +-- +-- Workers never call waitForProcess; this avoids multi-threaded exit races. +-- +-- Note: The coordinator may publish 'Indeterminate' if no exit is observed +-- within the escalation sequence, even if the process eventually exits after +-- teardown. This is inherent to the design: the supervisor reports the best +-- information available at the time of finalisation. +coordinator + :: ProcessOptions + -> P.ProcessHandle + -> MVar SupervisionResult -- finalResult (coordinator writes authoritative final result here) + -> IO () +coordinator opts ph finalResult = + mask \restore -> + (do + let gp = getGracePeriodMicros (optGracePeriod opts) + -- classify termination based on whether TERM/KILL were attempted and whether an exit code is known + classifyTerm didSendTerm didKill mExit = + case (didSendTerm || didKill, didKill, mExit) of + (False, False, Just _) -> Clean + (True, _, Just _) -> Escalated + (_, True, Nothing) -> Escalated + _ -> Indeterminate + + -- 1) First chance: wait for natural exit within grace period + mExit1 <- restore (timeout (fromIntegral gp) (P.waitForProcess ph)) + case mExit1 of + Just ec -> publishFinal (Just ec) (classifyTerm False False (Just ec)) Nothing + Nothing -> do + -- 2) No exit observed: attempt graceful termination + termResult <- restore (trySync (P.terminateProcess ph)) + let didSendTerm = either (const False) (const True) termResult + + -- 3) Wait again for exit within grace period + mExit2 <- restore (timeout (fromIntegral gp) (P.waitForProcess ph)) + case mExit2 of + Just ec2 -> publishFinal (Just ec2) (classifyTerm didSendTerm False (Just ec2)) Nothing + Nothing -> do + -- 4) Still no exit: attempt SIGKILL if safe + didKill <- restore (safeKillUsingHandle ph) + -- 5) Wait short time for exit to appear + mExit3 <- restore (timeout 1_000_000 (P.waitForProcess ph)) + case mExit3 of + Just ec3 -> publishFinal (Just ec3) (classifyTerm didSendTerm didKill (Just ec3)) Nothing + Nothing -> do + -- 6) No exit ever observed: fall back to getProcessExitCode + mExit <- restore (P.getProcessExitCode ph) + publishFinal mExit (classifyTerm didSendTerm didKill mExit) Nothing + ) + `catch` \e -> + if isAsync e + then publishFinal Nothing Indeterminate Nothing + else publishFinal Nothing Indeterminate (Just e) + + where + publishFinal mExit term mExc = + -- publication itself should be uninterruptible + void $ tryPutMVar finalResult (SupervisionResult mExit term mExc) + +------------------------------------------------------------------------------- +-- Workers +------------------------------------------------------------------------------- + +-- writerWorker: +-- - Continuously reads chunks from the producer and writes them to the Handle. +-- - Honors the stop signal by closing the Handle once pending writes complete. +-- - Treats benign I/O errors as non-fatal. +-- - Benign read errors from the producer are treated as EOF. +-- +-- EOF is signalled by the producer returning 'Nothing'. +-- +-- Exception semantics: +-- The 'produce' action may throw synchronous exceptions to signal failure. +-- Async exceptions thrown into the worker (e.g. via 'cancel') are treated as +-- cancellation and do not propagate; the worker exits quietly. +writerWorker :: String -> Handle -> IO (Maybe BS.ByteString) -> Bool -> MVar () -> MVar () -> IO () +writerWorker name h produce shouldFlush stopSignal handleClosed = + loop `catch` ignoreAsync + where + loop = + tryReadMVar stopSignal >>= \case + Just _ -> + -- Stop requested before reading input + closeHandle + Nothing -> do + mChunk <- produce `catchSync` \e -> if isBenignReadError e then pure Nothing else throwIO (WorkerDied name "read" e) + case mChunk of + Nothing -> + -- EOF from producer + closeHandle + Just chunk -> do + -- Stop might have been requested during or after the read, but since we have the chunk, we're writing it. + (BS.hPut h chunk >> when shouldFlush (hFlush h)) `catchSync` \e -> + if isBenignWriteError e then pure () else throwIO (WorkerDied name "write" e) + loop + + closeHandle :: IO () + closeHandle = do + void $ trySync (hClose h) + void $ tryPutMVar handleClosed () + + ignoreAsync :: SomeException -> IO () + ignoreAsync e = case fromException e of + Just (_ :: SomeAsyncException) -> pure () + Nothing -> throwIO e + +-- readerWorker: +-- - Continuously reads chunks from the Handle and writes them to the consumer. +-- - Uses a fixed 10ms wait to avoid busy looping on empty reads. +-- - Treats benign I/O errors as non-fatal. +-- +-- The worker must handle the fact that the Handle layer may occasionally +-- return an empty chunk without indicating EOF. These empty reads are not +-- forwarded; EOF is checked explicitly and the worker retries after a brief +-- wait. +-- +-- Exception semantics: +-- The 'consume' action may throw synchronous exceptions to signal failure. +-- Async exceptions thrown into the worker (e.g. via 'cancel') are treated as +-- cancellation and do not propagate; the worker exits quietly. +-- +-- Note [hWaitForInput Timeout] +-- hWaitForInput may return False if the timeout expires before data becomes +-- available. This is not an error and does not indicate EOF; it simply +-- means “no data yet”. The worker ignores the timeout result and continues +-- polling after the delay. +readerWorker :: String -> Handle -> (BS.ByteString -> IO ()) -> Int -> IO () +readerWorker name h consume bufSize = + loop `catch` ignoreAsync + where + loop = do + chunk <- BS.hGetSome h bufSize `catchSync` \e -> if isBenignReadError e then pure BS.empty else throwIO (WorkerDied name "read" e) + if BS.null chunk + then do + eof <- hIsEOF h + if eof + then pure () + else do + _ <- hWaitForInput h retryDelayMs + loop + else do + consume chunk `catchSync` \e -> throwIO (WorkerDied name "write" e) + loop + + retryDelayMs :: Int + retryDelayMs = 10 + + ignoreAsync :: SomeException -> IO () + ignoreAsync e = case fromException e of + Just (_ :: SomeAsyncException) -> pure () + Nothing -> throwIO e + +------------------------------------------------------------------------------- +-- API helpers +------------------------------------------------------------------------------- + +-- | Block until the final supervision result is available. +waitProcess :: Process -> IO SupervisionResult +waitProcess = readMVar . pFinalResult + +-- | Non‑blocking check for the final supervision result. +-- +-- Returns 'Nothing' if the result has not yet been published. +pollResult :: Process -> IO (Maybe SupervisionResult) +pollResult = tryReadMVar . pFinalResult + +-- | Return the PID of the child process, if available. +-- +-- The PID is advisory and may be recycled by the operating system. +getAdvisoryPid :: Process -> IO (Maybe CPid) +getAdvisoryPid = P.getPid . pProcessHandle + +-- | Send a signal to the child process if it is still running. +-- +-- Uses the advisory PID and does nothing if the process has already exited. +-- +-- The PID is advisory and may have been recycled. +sendSignal :: Process -> Signal -> IO () +sendSignal p sig = P.getProcessExitCode (pProcessHandle p) >>= \case + Just _ -> pure () + Nothing -> getAdvisoryPid p >>= \case + Nothing -> pure () + Just pid -> void $ trySync (signalProcess sig pid) + +-- | Ask the stdin worker to close its handle once pending writes complete. +requestStdinClose :: Process -> IO () +requestStdinClose p = void $ tryPutMVar (pStopStdin p) () + +-- safeKillUsingHandle: +-- Attempts SIGKILL if the process still appears alive. +-- Re-checks exit status before signalling to reduce PID-recycling races. +-- +-- Note: +-- PID recycling cannot be eliminated without pidfd support. +safeKillUsingHandle :: P.ProcessHandle -> IO Bool +safeKillUsingHandle ph = + -- If process already exited, nothing to do + P.getProcessExitCode ph >>= \case + Just _ -> pure False + Nothing -> do + -- Ask ProcessHandle for pid (may be Nothing) + mpid <- P.getPid ph + case mpid of + Nothing -> pure False + Just pid -> do + -- Re-check handle liveness before signalling + P.getProcessExitCode ph >>= \case + Just _ -> pure False + Nothing -> trySync (signalProcess sigKILL pid) >>= \case + Left _ -> pure False + Right _ -> pure True + +-- XX once P.getPidFd and pidfdSendSignal :: Fd -> Signal -> IO () exists: +--safeKillUsingHandle ph = do +-- mExit <- P.getProcessExitCode ph +-- case mExit of +-- Just _ -> pure False +-- Nothing -> +-- case P.getPidFd ph of +-- Nothing -> pure False +-- Just pidfd -> do +-- r <- trySync (pidfdSendSignal pidfd sigKILL) +-- pure (either (const False) (const True) r) + +------------------------------------------------------------------------------- +-- Error helpers and benign classification +------------------------------------------------------------------------------- + +isAsync :: SomeException -> Bool +isAsync e = isJust (asyncExceptionFromException e :: Maybe SomeAsyncException) + +-- | Run an action, handling only synchronous exceptions. +-- +-- Async exceptions are rethrown; the handler sees only non-async exceptions. +catchSync :: IO a -> (SomeException -> IO a) -> IO a +catchSync act h = mask \restore -> + restore act `catch` \e -> + if isAsync e + then throwIO e + else restore (h e) + +-- | Like 'try', but rethrows async exceptions instead of returning them. +-- +-- Returns 'Left' only for non-async exceptions. +trySync :: IO a -> IO (Either SomeException a) +trySync act = mask \restore -> do + r <- try (restore act) + case r of + Left e | isAsync e -> throwIO e + | otherwise -> pure (Left e) + Right x -> pure (Right x) + +-- Conservative benign classification: +-- - Reads: EOF or ResourceVanished may be benign (EOF). +-- - Writes: treat EOF/ResourceVanished conservatively; if you need errno-level EPIPE detection, +-- convert the handler to an IO-based check and inspect errno. +isBenignReadError :: SomeException -> Bool +isBenignReadError se = case fromException se of + Just (ioe :: IOException) -> isEOFError ioe || ioeGetErrorType ioe == ResourceVanished + Nothing -> False + +isBenignWriteError :: SomeException -> Bool +isBenignWriteError se = case fromException se of + Just ioe -> + case ioe_errno ioe of + Just errno -> Errno errno == ePIPE + Nothing -> False + Nothing -> False + +------------------------------------------------------------------------------- +-- Misc helpers +------------------------------------------------------------------------------- + +-- | Validate 'ProcessOptions' and throw 'InvalidOptions' on invalid values. +-- +-- Checks command presence, buffer size bounds, and non-negative grace period. +validateOptions :: ProcessOptions -> IO () +validateOptions opts = do + let gp = getGracePeriodMicros (optGracePeriod opts) + buf = optBufferSize opts + maxBuf = 16 * 1024 * 1024 -- 16MB, arbitrary but sane upper bound + + when (null $ optCmd opts) $ + throwIO $ InvalidOptions "optCmd cannot be empty" + + when (buf <= 0) $ + throwIO $ InvalidOptions "optBufferSize must be > 0" + + when (buf > maxBuf) $ + throwIO $ InvalidOptions "optBufferSize is unreasonably large" + + when (gp < 0) $ + throwIO $ InvalidOptions "optGracePeriod must be non-negative" + + when (gp > fromIntegral (maxBound :: Int)) $ + throwIO $ InvalidOptions "optGracePeriod too large for this platform" -- cgit v1.2.3