diff options
| author | Luke Hoersten <[email protected]> | 2014-12-09 13:37:21 -0600 |
|---|---|---|
| committer | Luke Hoersten <[email protected]> | 2014-12-09 13:37:21 -0600 |
| commit | 9f53721dad732c5c389fd27b155c08dc603d1191 (patch) | |
| tree | 9d238b5ef6fb81e650ad08decf1cf7a645fdfe9c /src/System/IO/Streams/Concurrent/Unagi.hs | |
| parent | 9bd1b211a9e78fd6d0ad79fd122d7be774d12899 (diff) | |
Added bounded dup streams.
Diffstat (limited to 'src/System/IO/Streams/Concurrent/Unagi.hs')
| -rw-r--r-- | src/System/IO/Streams/Concurrent/Unagi.hs | 48 |
1 files changed, 1 insertions, 47 deletions
diff --git a/src/System/IO/Streams/Concurrent/Unagi.hs b/src/System/IO/Streams/Concurrent/Unagi.hs index 90ffa45..7ee3fd5 100644 --- a/src/System/IO/Streams/Concurrent/Unagi.hs +++ b/src/System/IO/Streams/Concurrent/Unagi.hs @@ -2,24 +2,17 @@ module System.IO.Streams.Concurrent.Unagi ( -- * Channel conversions - inputToChan + inputToChan , chanToInput , chanToOutput - , concurrentMerge , makeChanPipe ) where ------------------------------------------------------------------------------ import Control.Applicative ((<$>), (<*>)) -import Control.Concurrent (forkIO) import Control.Concurrent.Chan.Unagi (InChan, OutChan, newChan, readChan, writeChan) -import Control.Concurrent.MVar (modifyMVar, newEmptyMVar, - newMVar, putMVar, takeMVar) -import Control.Exception (SomeException, mask, throwIO, - try) -import Control.Monad (forM_) import Prelude hiding (read) import System.IO.Streams.Internal (InputStream, OutputStream, makeInputStream, @@ -53,45 +46,6 @@ chanToOutput :: InChan (Maybe a) -> IO (OutputStream a) chanToOutput = makeOutputStream . writeChan ------------------------------------------------------------------------------- --- | Concurrently merges a list of 'InputStream's, combining values in the --- order they become available. --- --- Note: does /not/ forward individual end-of-stream notifications, the --- produced stream does not yield end-of-stream until all of the input streams --- have finished. --- --- This traps exceptions in each concurrent thread and re-raises them in the --- current thread. -concurrentMerge :: [InputStream a] -> IO (InputStream a) -concurrentMerge iss = do - mv <- newEmptyMVar - nleft <- newMVar $! length iss - mask $ \restore -> forM_ iss $ \is -> forkIO $ do - let producer = do - emb <- try $ restore $ read is - case emb of - Left exc -> do putMVar mv (Left (exc :: SomeException)) - producer - Right Nothing -> putMVar mv $! Right Nothing - Right x -> putMVar mv (Right x) >> producer - producer - makeInputStream $ chunk mv nleft - - where - chunk mv nleft = do - emb <- takeMVar mv - case emb of - Left exc -> throwIO exc - Right Nothing -> do x <- modifyMVar nleft $ \n -> - let !n' = n - 1 - in return $! (n', n') - if x > 0 - then chunk mv nleft - else return Nothing - Right x -> return x - - -------------------------------------------------------------------------------- -- | Create a new pair of streams using an underlying 'Chan'. Everything written -- to the 'OutputStream' will appear as-is on the 'InputStream'. |
