{-| Module : Process.Supervisor Description : Hardened supervisor with single-coordinator state machine to avoid races. -} module Process.Supervisor ( Process , ProcessOptions(..) , ProcessError(..) , SupervisionResult(..) , Termination(..) , GracePeriod(..) , defaultOptions , withProcess , waitProcess , pollResult , getAdvisoryPid , sendSignal , requestStdinClose ) where import Control.Concurrent (MVar, newEmptyMVar, tryPutMVar, tryReadMVar, readMVar) import Control.Concurrent.Async (async, cancel, waitCatch) import Control.Exception ( bracket, catch, try, SomeException, throwIO, Exception , fromException, mask, asyncExceptionFromException, SomeAsyncException ) import Control.Monad (void, when) import Data.ByteString qualified as BS import Data.Int (Int64) import Data.Maybe (isJust) import Foreign.C.Error (Errno(Errno), ePIPE) import GHC.Generics (Generic) import GHC.IO.Exception (IOErrorType(..), IOException(..)) import System.Exit (ExitCode(..)) import System.IO (Handle, hClose, hFlush, hIsEOF, hPutStrLn, hWaitForInput, stderr) import System.IO.Error (ioeGetErrorType, isEOFError) import System.Posix.Signals (Signal, signalProcess, sigKILL) import System.Posix.Types (CPid) import System.Process qualified as P import System.Timeout (timeout) ------------------------------------------------------------------------------- -- Types ------------------------------------------------------------------------------- data Termination = Clean | Escalated | Indeterminate deriving (Show, Eq, Generic) data SupervisionResult = SupervisionResult { srExitCode :: !(Maybe ExitCode) , srTermination :: !Termination -- optional exception observed by the coordinator; may be used for diagnostics , srException :: !(Maybe SomeException) } deriving (Show, Generic) -- Provide Eq by comparing exit code, termination, and stringified exception instance Eq SupervisionResult where a == b = srExitCode a == srExitCode b && srTermination a == srTermination b && fmap show (srException a) == fmap show (srException b) newtype GracePeriod = GracePeriod { getGracePeriodMicros :: Int64 } deriving (Show, Eq, Ord) data ProcessError = PipeCreationFailed String | WorkerDied String String SomeException | ProcessStartFailed SomeException | InvalidOptions String deriving (Show, Generic) instance Exception ProcessError data Process = Process { phExitVar :: !(MVar SupervisionResult) -- authoritative final result (written only by coordinator) , phStopIn :: !(MVar ()) -- request stdin close , phHandle :: !P.ProcessHandle } data ProcessOptions = ProcessOptions { optFlushStdin :: !Bool , optGracePeriod :: !GracePeriod , optBufferSize :: !Int , optCmd :: !FilePath , optArgs :: ![String] , optEnv :: !(Maybe [(String, String)]) , optCwd :: !(Maybe FilePath) } defaultOptions :: FilePath -> [String] -> ProcessOptions defaultOptions cmd args = ProcessOptions { optFlushStdin = True , optGracePeriod = GracePeriod 5_000_000 , optBufferSize = 4096 , optCmd = cmd , optArgs = args , optEnv = Nothing , optCwd = Nothing } ------------------------------------------------------------------------------- -- withProcess: start process, workers, coordinator ------------------------------------------------------------------------------- withProcess :: ProcessOptions -> IO (Maybe BS.ByteString) -- get input to feed to child's stdin -> (BS.ByteString -> IO ()) -- stdout consumer -> (BS.ByteString -> IO ()) -- stderr consumer -> (Process -> IO a) -- user action -> IO a withProcess opts getIn putOut putErr action = do validateOptions opts let cp = (P.proc (optCmd opts) (optArgs opts)) { P.env = optEnv opts, P.cwd = optCwd opts , P.std_in = P.CreatePipe, P.std_out = P.CreatePipe, P.std_err = P.CreatePipe } -- coordination vars finalVar <- newEmptyMVar -- authoritative final result (coordinator writes exactly once) stopIn <- newEmptyMVar stdinClosed <- newEmptyMVar -- worker sets this when it closes stdin -- protect createProcess + immediate registration mask \restore -> do (mhin, mhout, mherr, ph) <- (P.createProcess cp) `catchSync` (throwIO . ProcessStartFailed) -- validate pipes case (mhin, mhout, mherr) of (Just hin, Just hout, Just herr) -> do -- start coordinator (single authoritative writer) coordA <- async (coordinator opts ph finalVar) -- start workers as Asyncs so we can cancel them deterministically stdinA <- async (stdinWorker (optFlushStdin opts) hin getIn stopIn stdinClosed) stdoutA <- async (streamWorker "stdout" hout (optBufferSize opts) putOut) stderrA <- async (streamWorker "stderr" herr (optBufferSize opts) putErr) -- release action: cancel workers in safe order, wait for coordinator, cleanup let release = mask \_ -> do -- request stdin worker to stop void $ tryPutMVar stopIn () -- cancel stdin worker first to interrupt blocked writes cancel stdinA void (waitCatch stdinA) -- close stdin only if worker didn't already close it mClosed <- tryReadMVar stdinClosed case mClosed of Just _ -> pure () Nothing -> do r <- trySync (hClose hin) case r of Left e -> hPutStrLn stderr $ "Process.release: hClose hin failed: " ++ show e Right _ -> pure () -- cancel stdout/stderr workers and wait for them cancel stdoutA cancel stderrA void (waitCatch stdoutA) void (waitCatch stderrA) -- wait for coordinator to finish (it will publish finalVar) void (waitCatch coordA) -- close remaining handles (best-effort) r1 <- trySync (hClose hout) case r1 of Left e -> hPutStrLn stderr $ "Process.release: hClose hout failed: " ++ show e Right _ -> pure () r2 <- trySync (hClose herr) case r2 of Left e -> hPutStrLn stderr $ "Process.release: hClose herr failed: " ++ show e Right _ -> pure () -- run user action under bracket so release runs bracket (pure ()) (const release) \_ -> do restore (action (Process finalVar stopIn ph)) `catch` \(e :: SomeException) -> do -- ensure workers cancelled; release will also run cancel coordA cancel stdinA cancel stdoutA cancel stderrA throwIO e _ -> throwIO $ PipeCreationFailed "createProcess returned missing pipe(s)" ------------------------------------------------------------------------------- -- Coordinator state machine -- -- Responsibilities: -- - Observe process exit directly via waitForProcess with timeouts -- - Enforce grace periods deterministically -- - Perform TERM -> wait -> KILL escalation using ProcessHandle APIs -- - Atomically publish authoritative final result into finalVar ------------------------------------------------------------------------------- coordinator :: ProcessOptions -> P.ProcessHandle -> MVar SupervisionResult -- finalVar (coordinator writes authoritative final result here) -> IO () coordinator opts ph finalVar = mask \restore -> do let GracePeriod gpMicros = optGracePeriod opts publishFinal mExit term mExc = -- publication itself should be uninterruptible void $ tryPutMVar finalVar (SupervisionResult mExit term mExc) -- classify termination based on whether TERM/KILL were attempted and whether an exit code is known classifyTerm didSendTerm didKill mExit = case (didSendTerm || didKill, didKill, mExit) of (False, False, Just _) -> Clean (True, _, Just _) -> Escalated (_, True, Nothing) -> Escalated _ -> Indeterminate -- 1) First chance: wait for natural exit within grace period mExit1 <- restore (timeout (fromIntegral gpMicros) (P.waitForProcess ph)) case mExit1 of Just ec -> publishFinal (Just ec) (classifyTerm False False (Just ec)) Nothing Nothing -> do -- 2) No exit observed: attempt graceful termination termResult <- restore (trySync (P.terminateProcess ph)) let didSendTerm = either (const False) (const True) termResult -- 3) Wait again for exit within grace period mExit2 <- restore (timeout (fromIntegral gpMicros) (P.waitForProcess ph)) case mExit2 of Just ec2 -> publishFinal (Just ec2) (classifyTerm didSendTerm False (Just ec2)) Nothing Nothing -> do -- 4) Still no exit: attempt SIGKILL if safe didKill <- restore (safeKillUsingHandle ph) -- 5) Wait short time for exit to appear mExit3 <- restore (timeout 1_000_000 (P.waitForProcess ph)) case mExit3 of Just ec3 -> publishFinal (Just ec3) (classifyTerm didSendTerm didKill (Just ec3)) Nothing Nothing -> do -- 6) No exit ever observed: fall back to getProcessExitCode mExit <- restore (P.getProcessExitCode ph) publishFinal mExit (classifyTerm didSendTerm didKill mExit) Nothing ------------------------------------------------------------------------------- -- Reaper: removed in favor of a single deterministic coordinator that owns -- waitForProcess and classification. This avoids timing races between threads. ------------------------------------------------------------------------------- ------------------------------------------------------------------------------- -- Workers ------------------------------------------------------------------------------- -- stdinWorker: -- - Repeatedly obtains input chunks from getIn and writes them to the child's stdin. -- - Honors requestStdinClose cooperatively: if stopVar is set at any point -- before a write, no further writes occur and stdin is closed exactly once. -- - When getIn returns Nothing, stdin is closed immediately to signal EOF. -- - getIn is never interrupted; if a stop is requested while getIn is running, -- the returned chunk (if any) is discarded and stdin is closed immediately. stdinWorker :: Bool -> Handle -> IO (Maybe BS.ByteString) -> MVar () -> MVar () -> IO () stdinWorker flushStdin h getIn stopVar stdinClosed = loop `catch` handleAsync where loop = tryReadMVar stopVar >>= \case Just _ -> -- Stop requested before reading input closeStdin Nothing -> do mbs <- getIn `catchSync` \e -> if isBenignReadError e then pure Nothing else throwIO (WorkerDied "stdin" "input" e) case mbs of Nothing -> -- EOF from producer closeStdin Just bs -> do -- Stop might have been requested during or after the read, but since we have the chunk, we're writing it. (BS.hPut h bs >> when flushStdin (hFlush h)) `catchSync` \e -> if isBenignWriteError e then pure () else throwIO (WorkerDied "stdin" "output" e) loop -- Close stdin and record that it was closed closeStdin = do void $ trySync (hClose h) void $ tryPutMVar stdinClosed () handleAsync :: SomeException -> IO () handleAsync e = case fromException e of Just (_ :: SomeAsyncException) -> pure () Nothing -> throwIO e -- streamWorker: read loop that treats empty read as non-EOF and waits for input to avoid busy loop. streamWorker :: String -> Handle -> Int -> (BS.ByteString -> IO ()) -> IO () streamWorker name h bufSize putAct = loop `catch` handleAsync where loop = do chunk <- BS.hGetSome h bufSize `catchSync` \e -> if isBenignReadError e then pure BS.empty else throwIO (WorkerDied name "read" e) if BS.null chunk then do eof <- hIsEOF h if eof then pure () else do -- wait briefly for input to avoid busy loop; hWaitForInput returns immediately if input available _ <- hWaitForInput h 10 -- wait up to 10ms loop else do putAct chunk `catchSync` \e -> throwIO (WorkerDied name "write" e) loop handleAsync :: SomeException -> IO () handleAsync e = case fromException e of Just (_ :: SomeAsyncException) -> pure () Nothing -> throwIO e ------------------------------------------------------------------------------- -- API helpers ------------------------------------------------------------------------------- sendSignal :: Process -> Signal -> IO () sendSignal p sig = P.getProcessExitCode (phHandle p) >>= \case Just _ -> pure () Nothing -> getAdvisoryPid p >>= \case Nothing -> pure () Just pid -> void $ trySync (signalProcess sig pid) waitProcess :: Process -> IO SupervisionResult waitProcess p = readMVar (phExitVar p) pollResult :: Process -> IO (Maybe SupervisionResult) pollResult p = tryReadMVar (phExitVar p) -- | Return the OS PID associated with the underlying process handle, if any. -- -- This PID is strictly advisory: -- -- * It may be 'Nothing' if the backend does not expose a PID. -- * It may refer to a process that has already exited. -- * Due to PID recycling, it may even refer to an unrelated process -- if the original child has exited and the OS has reused the PID. -- -- Callers MUST NOT treat this PID as a stable or authoritative identity. -- It is suitable only for best-effort diagnostics or signalling in -- contexts where PID recycling is acceptable and documented. getAdvisoryPid :: Process -> IO (Maybe CPid) getAdvisoryPid = P.getPid . phHandle requestStdinClose :: Process -> IO () requestStdinClose p = void $ tryPutMVar (phStopIn p) () ------------------------------------------------------------------------------- -- safeKill: re-check handle liveness and use ProcessHandle's PID if available. -- This reduces but cannot eliminate PID-recycling risk; documented limitation remains. ------------------------------------------------------------------------------- -- `safeKillUsingHandle` attempts to send `SIGKILL` to the process associated with the `ProcessHandle`. -- Due to OS‑level PID recycling, there is an unavoidable race: if the child exits and its PID is reused by another -- process between checks, the signal may be delivered to the wrong process. This is a fundamental limitation of -- PID‑based signalling and cannot be fully eliminated without kernel support. safeKillUsingHandle :: P.ProcessHandle -> IO Bool safeKillUsingHandle ph = -- If process already exited, nothing to do P.getProcessExitCode ph >>= \case Just _ -> pure False Nothing -> do -- Ask ProcessHandle for pid (may be Nothing) mpid <- P.getPid ph case mpid of Nothing -> pure False Just pid -> do -- Re-check handle liveness before signalling P.getProcessExitCode ph >>= \case Just _ -> pure False Nothing -> trySync (signalProcess sigKILL pid) >>= \case Left _ -> pure False Right _ -> pure True -- XX once P.getPidFd and pidfdSendSignal :: Fd -> Signal -> IO () exists: --safeKillUsingHandle ph = do -- mExit <- P.getProcessExitCode ph -- case mExit of -- Just _ -> pure False -- Nothing -> -- case P.getPidFd ph of -- Nothing -> pure False -- Just pidfd -> do -- r <- trySync (pidfdSendSignal pidfd sigKILL) -- pure (either (const False) (const True) r) ------------------------------------------------------------------------------- -- Error helpers and benign classification ------------------------------------------------------------------------------- isAsync :: SomeException -> Bool isAsync e = isJust (asyncExceptionFromException e :: Maybe SomeAsyncException) catchSync :: IO a -> (SomeException -> IO a) -> IO a catchSync act h = mask \restore -> restore act `catch` \e -> if isAsync e then throwIO e else restore (h e) trySync :: IO a -> IO (Either SomeException a) trySync act = mask \restore -> do r <- try (restore act) case r of Left e | isAsync e -> throwIO e | otherwise -> pure (Left e) Right x -> pure (Right x) -- Conservative benign classification: -- - Reads: EOF or ResourceVanished may be benign (EOF). -- - Writes: treat EOF/ResourceVanished conservatively; if you need errno-level EPIPE detection, -- convert the handler to an IO-based check and inspect errno. isBenignReadError :: SomeException -> Bool isBenignReadError se = case fromException se of Just (ioe :: IOException) -> isEOFError ioe || ioeGetErrorType ioe == ResourceVanished Nothing -> False isBenignWriteError :: SomeException -> Bool isBenignWriteError se = case fromException se of Just ioe -> case ioe_errno ioe of Just errno -> Errno errno == ePIPE Nothing -> False Nothing -> False ------------------------------------------------------------------------------- -- Misc helpers ------------------------------------------------------------------------------- validateOptions :: ProcessOptions -> IO () validateOptions opts = do let gp = getGracePeriodMicros (optGracePeriod opts) buf = optBufferSize opts maxBuf = 16 * 1024 * 1024 -- 16MB, arbitrary but sane upper bound when (null $ optCmd opts) $ throwIO $ InvalidOptions "optCmd cannot be empty" when (buf <= 0) $ throwIO $ InvalidOptions "optBufferSize must be > 0" when (buf > maxBuf) $ throwIO $ InvalidOptions "optBufferSize is unreasonably large" when (gp < 0) $ throwIO $ InvalidOptions "optGracePeriod must be non-negative"