src/System/IO/Streams/Concurrent/Unagi.hs
changeset 2 1760b7d150cf
parent 0 e77b750dfbda
child 4 1baf8e3b8ef2
equal deleted inserted replaced
1:e31281e9da96 2:1760b7d150cf
     1 {-# LANGUAGE BangPatterns #-}
     1 {-# LANGUAGE BangPatterns #-}
     2 
     2 
     3 module System.IO.Streams.Concurrent.Unagi
     3 module System.IO.Streams.Concurrent.Unagi
     4        ( -- * Channel conversions
     4        ( -- * Channel conversions
     5            inputToChan
     5          inputToChan
     6        , chanToInput
     6        , chanToInput
     7        , chanToOutput
     7        , chanToOutput
     8        , concurrentMerge
       
     9        , makeChanPipe
     8        , makeChanPipe
    10        ) where
     9        ) where
    11 
    10 
    12 
    11 
    13 ------------------------------------------------------------------------------
    12 ------------------------------------------------------------------------------
    14 import           Control.Applicative           ((<$>), (<*>))
    13 import           Control.Applicative           ((<$>), (<*>))
    15 import           Control.Concurrent            (forkIO)
       
    16 import           Control.Concurrent.Chan.Unagi (InChan, OutChan, newChan,
    14 import           Control.Concurrent.Chan.Unagi (InChan, OutChan, newChan,
    17                                                 readChan, writeChan)
    15                                                 readChan, writeChan)
    18 import           Control.Concurrent.MVar       (modifyMVar, newEmptyMVar,
       
    19                                                 newMVar, putMVar, takeMVar)
       
    20 import           Control.Exception             (SomeException, mask, throwIO,
       
    21                                                 try)
       
    22 import           Control.Monad                 (forM_)
       
    23 import           Prelude                       hiding (read)
    16 import           Prelude                       hiding (read)
    24 import           System.IO.Streams.Internal    (InputStream, OutputStream,
    17 import           System.IO.Streams.Internal    (InputStream, OutputStream,
    25                                                 makeInputStream,
    18                                                 makeInputStream,
    26                                                 makeOutputStream, read)
    19                                                 makeOutputStream, read)
    27 
    20 
    51 --
    44 --
    52 chanToOutput :: InChan (Maybe a) -> IO (OutputStream a)
    45 chanToOutput :: InChan (Maybe a) -> IO (OutputStream a)
    53 chanToOutput = makeOutputStream . writeChan
    46 chanToOutput = makeOutputStream . writeChan
    54 
    47 
    55 
    48 
    56 ------------------------------------------------------------------------------
       
    57 -- | Concurrently merges a list of 'InputStream's, combining values in the
       
    58 -- order they become available.
       
    59 --
       
    60 -- Note: does /not/ forward individual end-of-stream notifications, the
       
    61 -- produced stream does not yield end-of-stream until all of the input streams
       
    62 -- have finished.
       
    63 --
       
    64 -- This traps exceptions in each concurrent thread and re-raises them in the
       
    65 -- current thread.
       
    66 concurrentMerge :: [InputStream a] -> IO (InputStream a)
       
    67 concurrentMerge iss = do
       
    68     mv    <- newEmptyMVar
       
    69     nleft <- newMVar $! length iss
       
    70     mask $ \restore -> forM_ iss $ \is -> forkIO $ do
       
    71         let producer = do
       
    72               emb <- try $ restore $ read is
       
    73               case emb of
       
    74                   Left exc      -> do putMVar mv (Left (exc :: SomeException))
       
    75                                       producer
       
    76                   Right Nothing -> putMVar mv $! Right Nothing
       
    77                   Right x       -> putMVar mv (Right x) >> producer
       
    78         producer
       
    79     makeInputStream $ chunk mv nleft
       
    80 
       
    81   where
       
    82     chunk mv nleft = do
       
    83         emb <- takeMVar mv
       
    84         case emb of
       
    85             Left exc      -> throwIO exc
       
    86             Right Nothing -> do x <- modifyMVar nleft $ \n ->
       
    87                                      let !n' = n - 1
       
    88                                      in return $! (n', n')
       
    89                                 if x > 0
       
    90                                   then chunk mv nleft
       
    91                                   else return Nothing
       
    92             Right x       -> return x
       
    93 
       
    94 
       
    95 --------------------------------------------------------------------------------
    49 --------------------------------------------------------------------------------
    96 -- | Create a new pair of streams using an underlying 'Chan'. Everything written
    50 -- | Create a new pair of streams using an underlying 'Chan'. Everything written
    97 -- to the 'OutputStream' will appear as-is on the 'InputStream'.
    51 -- to the 'OutputStream' will appear as-is on the 'InputStream'.
    98 --
    52 --
    99 -- Since reading from the 'InputStream' and writing to the 'OutputStream' are
    53 -- Since reading from the 'InputStream' and writing to the 'OutputStream' are