src/System/IO/Streams/Concurrent/Unagi.hs
changeset 0 e77b750dfbda
child 2 1760b7d150cf
equal deleted inserted replaced
-1:000000000000 0:e77b750dfbda
       
     1 {-# LANGUAGE BangPatterns #-}
       
     2 
       
     3 module System.IO.Streams.Concurrent.Unagi
       
     4        ( -- * Channel conversions
       
     5            inputToChan
       
     6        , chanToInput
       
     7        , chanToOutput
       
     8        , concurrentMerge
       
     9        , makeChanPipe
       
    10        ) where
       
    11 
       
    12 
       
    13 ------------------------------------------------------------------------------
       
    14 import           Control.Applicative           ((<$>), (<*>))
       
    15 import           Control.Concurrent            (forkIO)
       
    16 import           Control.Concurrent.Chan.Unagi (InChan, OutChan, newChan,
       
    17                                                 readChan, writeChan)
       
    18 import           Control.Concurrent.MVar       (modifyMVar, newEmptyMVar,
       
    19                                                 newMVar, putMVar, takeMVar)
       
    20 import           Control.Exception             (SomeException, mask, throwIO,
       
    21                                                 try)
       
    22 import           Control.Monad                 (forM_)
       
    23 import           Prelude                       hiding (read)
       
    24 import           System.IO.Streams.Internal    (InputStream, OutputStream,
       
    25                                                 makeInputStream,
       
    26                                                 makeOutputStream, read)
       
    27 
       
    28 
       
    29 
       
    30 ------------------------------------------------------------------------------
       
    31 -- | Writes the contents of an input stream to a channel until the input stream
       
    32 -- yields end-of-stream.
       
    33 inputToChan :: InputStream a -> InChan (Maybe a) -> IO ()
       
    34 inputToChan is ch = go
       
    35   where
       
    36     go = do
       
    37         mb <- read is
       
    38         writeChan ch mb
       
    39         maybe (return $! ()) (const go) mb
       
    40 
       
    41 
       
    42 ------------------------------------------------------------------------------
       
    43 -- | Turns an 'OutChan' into an input stream.
       
    44 --
       
    45 chanToInput :: OutChan (Maybe a) -> IO (InputStream a)
       
    46 chanToInput ch = makeInputStream $! readChan ch
       
    47 
       
    48 
       
    49 ------------------------------------------------------------------------------
       
    50 -- | Turns an 'InChan' into an output stream.
       
    51 --
       
    52 chanToOutput :: InChan (Maybe a) -> IO (OutputStream a)
       
    53 chanToOutput = makeOutputStream . writeChan
       
    54 
       
    55 
       
    56 ------------------------------------------------------------------------------
       
    57 -- | Concurrently merges a list of 'InputStream's, combining values in the
       
    58 -- order they become available.
       
    59 --
       
    60 -- Note: does /not/ forward individual end-of-stream notifications, the
       
    61 -- produced stream does not yield end-of-stream until all of the input streams
       
    62 -- have finished.
       
    63 --
       
    64 -- This traps exceptions in each concurrent thread and re-raises them in the
       
    65 -- current thread.
       
    66 concurrentMerge :: [InputStream a] -> IO (InputStream a)
       
    67 concurrentMerge iss = do
       
    68     mv    <- newEmptyMVar
       
    69     nleft <- newMVar $! length iss
       
    70     mask $ \restore -> forM_ iss $ \is -> forkIO $ do
       
    71         let producer = do
       
    72               emb <- try $ restore $ read is
       
    73               case emb of
       
    74                   Left exc      -> do putMVar mv (Left (exc :: SomeException))
       
    75                                       producer
       
    76                   Right Nothing -> putMVar mv $! Right Nothing
       
    77                   Right x       -> putMVar mv (Right x) >> producer
       
    78         producer
       
    79     makeInputStream $ chunk mv nleft
       
    80 
       
    81   where
       
    82     chunk mv nleft = do
       
    83         emb <- takeMVar mv
       
    84         case emb of
       
    85             Left exc      -> throwIO exc
       
    86             Right Nothing -> do x <- modifyMVar nleft $ \n ->
       
    87                                      let !n' = n - 1
       
    88                                      in return $! (n', n')
       
    89                                 if x > 0
       
    90                                   then chunk mv nleft
       
    91                                   else return Nothing
       
    92             Right x       -> return x
       
    93 
       
    94 
       
    95 --------------------------------------------------------------------------------
       
    96 -- | Create a new pair of streams using an underlying 'Chan'. Everything written
       
    97 -- to the 'OutputStream' will appear as-is on the 'InputStream'.
       
    98 --
       
    99 -- Since reading from the 'InputStream' and writing to the 'OutputStream' are
       
   100 -- blocking calls, be sure to do so in different threads.
       
   101 makeChanPipe :: IO (InputStream a, OutputStream a)
       
   102 makeChanPipe = do
       
   103     (inChan, outChan) <- newChan
       
   104     (,) <$> chanToInput outChan <*> chanToOutput inChan