changeset 0 e77b750dfbda
child 2 1760b7d150cf
equal deleted inserted replaced
-1:000000000000 0:e77b750dfbda
     1 {-# LANGUAGE BangPatterns #-}
     3 module System.IO.Streams.Concurrent.Unagi
     4        ( -- * Channel conversions
     5            inputToChan
     6        , chanToInput
     7        , chanToOutput
     8        , concurrentMerge
     9        , makeChanPipe
    10        ) where
    13 ------------------------------------------------------------------------------
    14 import           Control.Applicative           ((<$>), (<*>))
    15 import           Control.Concurrent            (forkIO)
    16 import           Control.Concurrent.Chan.Unagi (InChan, OutChan, newChan,
    17                                                 readChan, writeChan)
    18 import           Control.Concurrent.MVar       (modifyMVar, newEmptyMVar,
    19                                                 newMVar, putMVar, takeMVar)
    20 import           Control.Exception             (SomeException, mask, throwIO,
    21                                                 try)
    22 import           Control.Monad                 (forM_)
    23 import           Prelude                       hiding (read)
    24 import           System.IO.Streams.Internal    (InputStream, OutputStream,
    25                                                 makeInputStream,
    26                                                 makeOutputStream, read)
    30 ------------------------------------------------------------------------------
    31 -- | Writes the contents of an input stream to a channel until the input stream
    32 -- yields end-of-stream.
    33 inputToChan :: InputStream a -> InChan (Maybe a) -> IO ()
    34 inputToChan is ch = go
    35   where
    36     go = do
    37         mb <- read is
    38         writeChan ch mb
    39         maybe (return $! ()) (const go) mb
    42 ------------------------------------------------------------------------------
    43 -- | Turns an 'OutChan' into an input stream.
    44 --
    45 chanToInput :: OutChan (Maybe a) -> IO (InputStream a)
    46 chanToInput ch = makeInputStream $! readChan ch
    49 ------------------------------------------------------------------------------
    50 -- | Turns an 'InChan' into an output stream.
    51 --
    52 chanToOutput :: InChan (Maybe a) -> IO (OutputStream a)
    53 chanToOutput = makeOutputStream . writeChan
    56 ------------------------------------------------------------------------------
    57 -- | Concurrently merges a list of 'InputStream's, combining values in the
    58 -- order they become available.
    59 --
    60 -- Note: does /not/ forward individual end-of-stream notifications, the
    61 -- produced stream does not yield end-of-stream until all of the input streams
    62 -- have finished.
    63 --
    64 -- This traps exceptions in each concurrent thread and re-raises them in the
    65 -- current thread.
    66 concurrentMerge :: [InputStream a] -> IO (InputStream a)
    67 concurrentMerge iss = do
    68     mv    <- newEmptyMVar
    69     nleft <- newMVar $! length iss
    70     mask $ \restore -> forM_ iss $ \is -> forkIO $ do
    71         let producer = do
    72               emb <- try $ restore $ read is
    73               case emb of
    74                   Left exc      -> do putMVar mv (Left (exc :: SomeException))
    75                                       producer
    76                   Right Nothing -> putMVar mv $! Right Nothing
    77                   Right x       -> putMVar mv (Right x) >> producer
    78         producer
    79     makeInputStream $ chunk mv nleft
    81   where
    82     chunk mv nleft = do
    83         emb <- takeMVar mv
    84         case emb of
    85             Left exc      -> throwIO exc
    86             Right Nothing -> do x <- modifyMVar nleft $ \n ->
    87                                      let !n' = n - 1
    88                                      in return $! (n', n')
    89                                 if x > 0
    90                                   then chunk mv nleft
    91                                   else return Nothing
    92             Right x       -> return x
    95 --------------------------------------------------------------------------------
    96 -- | Create a new pair of streams using an underlying 'Chan'. Everything written
    97 -- to the 'OutputStream' will appear as-is on the 'InputStream'.
    98 --
    99 -- Since reading from the 'InputStream' and writing to the 'OutputStream' are
   100 -- blocking calls, be sure to do so in different threads.
   101 makeChanPipe :: IO (InputStream a, OutputStream a)
   102 makeChanPipe = do
   103     (inChan, outChan) <- newChan
   104     (,) <$> chanToInput outChan <*> chanToOutput inChan