From fda89c96529db0c61f98e29028d68ca15cdbda1e Mon Sep 17 00:00:00 2001 From: tv Date: Mon, 9 Feb 2026 03:50:35 +0100 Subject: WIP --- Process/Supervisor.hs | 446 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 446 insertions(+) create mode 100644 Process/Supervisor.hs (limited to 'Process') diff --git a/Process/Supervisor.hs b/Process/Supervisor.hs new file mode 100644 index 0000000..bc2a5a7 --- /dev/null +++ b/Process/Supervisor.hs @@ -0,0 +1,446 @@ +{-| +Module : Process.Supervisor +Description : Hardened supervisor with single-coordinator state machine to avoid races. +-} +module Process.Supervisor + ( Process + , ProcessOptions(..) + , ProcessError(..) + , SupervisionResult(..) + , Termination(..) + , GracePeriod(..) + , defaultOptions + , withProcess + , waitProcess + , pollResult + , getAdvisoryPid + , sendSignal + , requestStdinClose + ) where + +import Control.Concurrent (MVar, newEmptyMVar, tryPutMVar, tryReadMVar, readMVar) +import Control.Concurrent.Async (async, cancel, waitCatch) +import Control.Exception + ( bracket, catch, try, SomeException, throwIO, Exception + , fromException, mask, asyncExceptionFromException, SomeAsyncException + ) +import Control.Monad (void, when) +import Data.ByteString qualified as BS +import Data.Int (Int64) +import Data.Maybe (isJust) +import Foreign.C.Error (Errno(Errno), ePIPE) +import GHC.Generics (Generic) +import GHC.IO.Exception (IOErrorType(..), IOException(..)) +import System.Exit (ExitCode(..)) +import System.IO (Handle, hClose, hFlush, hIsEOF, hPutStrLn, hWaitForInput, stderr) +import System.IO.Error (ioeGetErrorType, isEOFError) +import System.Posix.Signals (Signal, signalProcess, sigKILL) +import System.Posix.Types (CPid) +import System.Process qualified as P +import System.Timeout (timeout) + +------------------------------------------------------------------------------- +-- Types +------------------------------------------------------------------------------- + +data Termination = Clean | Escalated | Indeterminate + deriving (Show, Eq, Generic) + +data SupervisionResult = SupervisionResult + { srExitCode :: !(Maybe ExitCode) + , srTermination :: !Termination + -- optional exception observed by the coordinator; may be used for diagnostics + , srException :: !(Maybe SomeException) + } deriving (Show, Generic) + +-- Provide Eq by comparing exit code, termination, and stringified exception +instance Eq SupervisionResult where + a == b = + srExitCode a == srExitCode b && + srTermination a == srTermination b && + fmap show (srException a) == fmap show (srException b) + +newtype GracePeriod = GracePeriod { getGracePeriodMicros :: Int64 } + deriving (Show, Eq, Ord) + +data ProcessError + = PipeCreationFailed String + | WorkerDied String String SomeException + | ProcessStartFailed SomeException + | InvalidOptions String + deriving (Show, Generic) + +instance Exception ProcessError + +data Process = Process + { phExitVar :: !(MVar SupervisionResult) -- authoritative final result (written only by coordinator) + , phStopIn :: !(MVar ()) -- request stdin close + , phHandle :: !P.ProcessHandle + } + +data ProcessOptions = ProcessOptions + { optFlushStdin :: !Bool + , optGracePeriod :: !GracePeriod + , optBufferSize :: !Int + , optCmd :: !FilePath + , optArgs :: ![String] + , optEnv :: !(Maybe [(String, String)]) + , optCwd :: !(Maybe FilePath) + } + +defaultOptions :: FilePath -> [String] -> ProcessOptions +defaultOptions cmd args = ProcessOptions + { optFlushStdin = True + , optGracePeriod = GracePeriod 5_000_000 + , optBufferSize = 4096 + , optCmd = cmd + , optArgs = args + , optEnv = Nothing + , optCwd = Nothing + } + +------------------------------------------------------------------------------- +-- withProcess: start process, workers, coordinator +------------------------------------------------------------------------------- + +withProcess + :: ProcessOptions + -> IO (Maybe BS.ByteString) -- get input to feed to child's stdin + -> (BS.ByteString -> IO ()) -- stdout consumer + -> (BS.ByteString -> IO ()) -- stderr consumer + -> (Process -> IO a) -- user action + -> IO a +withProcess opts getIn putOut putErr action = do + validateOptions opts + let cp = (P.proc (optCmd opts) (optArgs opts)) + { P.env = optEnv opts, P.cwd = optCwd opts + , P.std_in = P.CreatePipe, P.std_out = P.CreatePipe, P.std_err = P.CreatePipe + } + + -- coordination vars + finalVar <- newEmptyMVar -- authoritative final result (coordinator writes exactly once) + stopIn <- newEmptyMVar + stdinClosed <- newEmptyMVar -- worker sets this when it closes stdin + + -- protect createProcess + immediate registration + mask \restore -> do + (mhin, mhout, mherr, ph) <- (P.createProcess cp) `catchSync` (throwIO . ProcessStartFailed) + + -- validate pipes + case (mhin, mhout, mherr) of + (Just hin, Just hout, Just herr) -> do + -- start coordinator (single authoritative writer) + coordA <- async (coordinator opts ph finalVar) + + -- start workers as Asyncs so we can cancel them deterministically + stdinA <- async (stdinWorker (optFlushStdin opts) hin getIn stopIn stdinClosed) + stdoutA <- async (streamWorker "stdout" hout (optBufferSize opts) putOut) + stderrA <- async (streamWorker "stderr" herr (optBufferSize opts) putErr) + + -- release action: cancel workers in safe order, wait for coordinator, cleanup + let release = mask \_ -> do + -- request stdin worker to stop + void $ tryPutMVar stopIn () + + -- cancel stdin worker first to interrupt blocked writes + cancel stdinA + void (waitCatch stdinA) + + -- close stdin only if worker didn't already close it + mClosed <- tryReadMVar stdinClosed + case mClosed of + Just _ -> pure () + Nothing -> do + r <- trySync (hClose hin) + case r of + Left e -> hPutStrLn stderr $ "Process.release: hClose hin failed: " ++ show e + Right _ -> pure () + + -- cancel stdout/stderr workers and wait for them + cancel stdoutA + cancel stderrA + void (waitCatch stdoutA) + void (waitCatch stderrA) + + -- wait for coordinator to finish (it will publish finalVar) + void (waitCatch coordA) + + -- close remaining handles (best-effort) + r1 <- trySync (hClose hout) + case r1 of + Left e -> hPutStrLn stderr $ "Process.release: hClose hout failed: " ++ show e + Right _ -> pure () + r2 <- trySync (hClose herr) + case r2 of + Left e -> hPutStrLn stderr $ "Process.release: hClose herr failed: " ++ show e + Right _ -> pure () + + -- run user action under bracket so release runs + bracket (pure ()) (const release) \_ -> do + restore (action (Process finalVar stopIn ph)) `catch` \(e :: SomeException) -> do + -- ensure workers cancelled; release will also run + cancel coordA + cancel stdinA + cancel stdoutA + cancel stderrA + throwIO e + + _ -> throwIO $ PipeCreationFailed "createProcess returned missing pipe(s)" + +------------------------------------------------------------------------------- +-- Coordinator state machine +-- +-- Responsibilities: +-- - Observe process exit directly via waitForProcess with timeouts +-- - Enforce grace periods deterministically +-- - Perform TERM -> wait -> KILL escalation using ProcessHandle APIs +-- - Atomically publish authoritative final result into finalVar +------------------------------------------------------------------------------- + +coordinator + :: ProcessOptions + -> P.ProcessHandle + -> MVar SupervisionResult -- finalVar (coordinator writes authoritative final result here) + -> IO () +coordinator opts ph finalVar = + mask \restore -> do + let GracePeriod gpMicros = optGracePeriod opts + publishFinal mExit term mExc = + -- publication itself should be uninterruptible + void $ tryPutMVar finalVar (SupervisionResult mExit term mExc) + + -- classify termination based on whether TERM/KILL were attempted and whether an exit code is known + classifyTerm didSendTerm didKill mExit = + case (didSendTerm || didKill, didKill, mExit) of + (False, False, Just _) -> Clean + (True, _, Just _) -> Escalated + (_, True, Nothing) -> Escalated + _ -> Indeterminate + + -- 1) First chance: wait for natural exit within grace period + mExit1 <- restore (timeout (fromIntegral gpMicros) (P.waitForProcess ph)) + case mExit1 of + Just ec -> publishFinal (Just ec) (classifyTerm False False (Just ec)) Nothing + Nothing -> do + -- 2) No exit observed: attempt graceful termination + termResult <- restore (trySync (P.terminateProcess ph)) + let didSendTerm = either (const False) (const True) termResult + + -- 3) Wait again for exit within grace period + mExit2 <- restore (timeout (fromIntegral gpMicros) (P.waitForProcess ph)) + case mExit2 of + Just ec2 -> publishFinal (Just ec2) (classifyTerm didSendTerm False (Just ec2)) Nothing + Nothing -> do + -- 4) Still no exit: attempt SIGKILL if safe + didKill <- restore (safeKillUsingHandle ph) + -- 5) Wait short time for exit to appear + mExit3 <- restore (timeout 1_000_000 (P.waitForProcess ph)) + case mExit3 of + Just ec3 -> publishFinal (Just ec3) (classifyTerm didSendTerm didKill (Just ec3)) Nothing + Nothing -> do + -- 6) No exit ever observed: fall back to getProcessExitCode + mExit <- restore (P.getProcessExitCode ph) + publishFinal mExit (classifyTerm didSendTerm didKill mExit) Nothing + +------------------------------------------------------------------------------- +-- Reaper: removed in favor of a single deterministic coordinator that owns +-- waitForProcess and classification. This avoids timing races between threads. +------------------------------------------------------------------------------- + +------------------------------------------------------------------------------- +-- Workers +------------------------------------------------------------------------------- + +-- stdinWorker: +-- - Repeatedly obtains input chunks from getIn and writes them to the child's stdin. +-- - Honors requestStdinClose cooperatively: if stopVar is set at any point +-- before a write, no further writes occur and stdin is closed exactly once. +-- - When getIn returns Nothing, stdin is closed immediately to signal EOF. +-- - getIn is never interrupted; if a stop is requested while getIn is running, +-- the returned chunk (if any) is discarded and stdin is closed immediately. +stdinWorker :: Bool -> Handle -> IO (Maybe BS.ByteString) -> MVar () -> MVar () -> IO () +stdinWorker flushStdin h getIn stopVar stdinClosed = loop `catch` handleAsync where + loop = + tryReadMVar stopVar >>= \case + Just _ -> + -- Stop requested before reading input + closeStdin + Nothing -> do + mbs <- getIn `catchSync` \e -> if isBenignReadError e then pure Nothing else throwIO (WorkerDied "stdin" "input" e) + case mbs of + Nothing -> + -- EOF from producer + closeStdin + Just bs -> do + -- Stop might have been requested during or after the read, but since we have the chunk, we're writing it. + (BS.hPut h bs >> when flushStdin (hFlush h)) `catchSync` \e -> + if isBenignWriteError e then pure () else throwIO (WorkerDied "stdin" "output" e) + loop + + -- Close stdin and record that it was closed + closeStdin = do + void $ trySync (hClose h) + void $ tryPutMVar stdinClosed () + + handleAsync :: SomeException -> IO () + handleAsync e = case fromException e of + Just (_ :: SomeAsyncException) -> pure () + Nothing -> throwIO e + +-- streamWorker: read loop that treats empty read as non-EOF and waits for input to avoid busy loop. +streamWorker :: String -> Handle -> Int -> (BS.ByteString -> IO ()) -> IO () +streamWorker name h bufSize putAct = loop `catch` handleAsync where + loop = do + chunk <- BS.hGetSome h bufSize `catchSync` \e -> if isBenignReadError e then pure BS.empty else throwIO (WorkerDied name "read" e) + if BS.null chunk + then do + eof <- hIsEOF h + if eof + then pure () + else do + -- wait briefly for input to avoid busy loop; hWaitForInput returns immediately if input available + _ <- hWaitForInput h 10 -- wait up to 10ms + loop + else do + putAct chunk `catchSync` \e -> throwIO (WorkerDied name "write" e) + loop + + handleAsync :: SomeException -> IO () + handleAsync e = case fromException e of + Just (_ :: SomeAsyncException) -> pure () + Nothing -> throwIO e + +------------------------------------------------------------------------------- +-- API helpers +------------------------------------------------------------------------------- + +sendSignal :: Process -> Signal -> IO () +sendSignal p sig = P.getProcessExitCode (phHandle p) >>= \case + Just _ -> pure () + Nothing -> getAdvisoryPid p >>= \case + Nothing -> pure () + Just pid -> void $ trySync (signalProcess sig pid) + +waitProcess :: Process -> IO SupervisionResult +waitProcess p = readMVar (phExitVar p) + +pollResult :: Process -> IO (Maybe SupervisionResult) +pollResult p = tryReadMVar (phExitVar p) + +-- | Return the OS PID associated with the underlying process handle, if any. +-- +-- This PID is strictly advisory: +-- +-- * It may be 'Nothing' if the backend does not expose a PID. +-- * It may refer to a process that has already exited. +-- * Due to PID recycling, it may even refer to an unrelated process +-- if the original child has exited and the OS has reused the PID. +-- +-- Callers MUST NOT treat this PID as a stable or authoritative identity. +-- It is suitable only for best-effort diagnostics or signalling in +-- contexts where PID recycling is acceptable and documented. +getAdvisoryPid :: Process -> IO (Maybe CPid) +getAdvisoryPid = P.getPid . phHandle + +requestStdinClose :: Process -> IO () +requestStdinClose p = void $ tryPutMVar (phStopIn p) () + +------------------------------------------------------------------------------- +-- safeKill: re-check handle liveness and use ProcessHandle's PID if available. +-- This reduces but cannot eliminate PID-recycling risk; documented limitation remains. +------------------------------------------------------------------------------- + +-- `safeKillUsingHandle` attempts to send `SIGKILL` to the process associated with the `ProcessHandle`. +-- Due to OS‑level PID recycling, there is an unavoidable race: if the child exits and its PID is reused by another +-- process between checks, the signal may be delivered to the wrong process. This is a fundamental limitation of +-- PID‑based signalling and cannot be fully eliminated without kernel support. +safeKillUsingHandle :: P.ProcessHandle -> IO Bool +safeKillUsingHandle ph = + -- If process already exited, nothing to do + P.getProcessExitCode ph >>= \case + Just _ -> pure False + Nothing -> do + -- Ask ProcessHandle for pid (may be Nothing) + mpid <- P.getPid ph + case mpid of + Nothing -> pure False + Just pid -> do + -- Re-check handle liveness before signalling + P.getProcessExitCode ph >>= \case + Just _ -> pure False + Nothing -> trySync (signalProcess sigKILL pid) >>= \case + Left _ -> pure False + Right _ -> pure True + +-- XX once P.getPidFd and pidfdSendSignal :: Fd -> Signal -> IO () exists: +--safeKillUsingHandle ph = do +-- mExit <- P.getProcessExitCode ph +-- case mExit of +-- Just _ -> pure False +-- Nothing -> +-- case P.getPidFd ph of +-- Nothing -> pure False +-- Just pidfd -> do +-- r <- trySync (pidfdSendSignal pidfd sigKILL) +-- pure (either (const False) (const True) r) + +------------------------------------------------------------------------------- +-- Error helpers and benign classification +------------------------------------------------------------------------------- + +isAsync :: SomeException -> Bool +isAsync e = isJust (asyncExceptionFromException e :: Maybe SomeAsyncException) + +catchSync :: IO a -> (SomeException -> IO a) -> IO a +catchSync act h = mask \restore -> + restore act `catch` \e -> + if isAsync e + then throwIO e + else restore (h e) + +trySync :: IO a -> IO (Either SomeException a) +trySync act = mask \restore -> do + r <- try (restore act) + case r of + Left e | isAsync e -> throwIO e + | otherwise -> pure (Left e) + Right x -> pure (Right x) + +-- Conservative benign classification: +-- - Reads: EOF or ResourceVanished may be benign (EOF). +-- - Writes: treat EOF/ResourceVanished conservatively; if you need errno-level EPIPE detection, +-- convert the handler to an IO-based check and inspect errno. +isBenignReadError :: SomeException -> Bool +isBenignReadError se = case fromException se of + Just (ioe :: IOException) -> isEOFError ioe || ioeGetErrorType ioe == ResourceVanished + Nothing -> False + +isBenignWriteError :: SomeException -> Bool +isBenignWriteError se = case fromException se of + Just ioe -> + case ioe_errno ioe of + Just errno -> Errno errno == ePIPE + Nothing -> False + Nothing -> False + +------------------------------------------------------------------------------- +-- Misc helpers +------------------------------------------------------------------------------- + +validateOptions :: ProcessOptions -> IO () +validateOptions opts = do + let gp = getGracePeriodMicros (optGracePeriod opts) + buf = optBufferSize opts + maxBuf = 16 * 1024 * 1024 -- 16MB, arbitrary but sane upper bound + + when (null $ optCmd opts) $ + throwIO $ InvalidOptions "optCmd cannot be empty" + + when (buf <= 0) $ + throwIO $ InvalidOptions "optBufferSize must be > 0" + + when (buf > maxBuf) $ + throwIO $ InvalidOptions "optBufferSize is unreasonably large" + + when (gp < 0) $ + throwIO $ InvalidOptions "optGracePeriod must be non-negative" -- cgit v1.2.3