--- a/src/System/IO/Streams/Concurrent/Unagi.hs Mon Dec 08 22:38:48 2014 -0600
+++ b/src/System/IO/Streams/Concurrent/Unagi.hs Tue Dec 09 13:37:21 2014 -0600
@@ -2,24 +2,17 @@
module System.IO.Streams.Concurrent.Unagi
( -- * Channel conversions
- inputToChan
+ inputToChan
, chanToInput
, chanToOutput
- , concurrentMerge
, makeChanPipe
) where
------------------------------------------------------------------------------
import Control.Applicative ((<$>), (<*>))
-import Control.Concurrent (forkIO)
import Control.Concurrent.Chan.Unagi (InChan, OutChan, newChan,
readChan, writeChan)
-import Control.Concurrent.MVar (modifyMVar, newEmptyMVar,
- newMVar, putMVar, takeMVar)
-import Control.Exception (SomeException, mask, throwIO,
- try)
-import Control.Monad (forM_)
import Prelude hiding (read)
import System.IO.Streams.Internal (InputStream, OutputStream,
makeInputStream,
@@ -53,45 +46,6 @@
chanToOutput = makeOutputStream . writeChan
-------------------------------------------------------------------------------
--- | Concurrently merges a list of 'InputStream's, combining values in the
--- order they become available.
---
--- Note: does /not/ forward individual end-of-stream notifications, the
--- produced stream does not yield end-of-stream until all of the input streams
--- have finished.
---
--- This traps exceptions in each concurrent thread and re-raises them in the
--- current thread.
-concurrentMerge :: [InputStream a] -> IO (InputStream a)
-concurrentMerge iss = do
- mv <- newEmptyMVar
- nleft <- newMVar $! length iss
- mask $ \restore -> forM_ iss $ \is -> forkIO $ do
- let producer = do
- emb <- try $ restore $ read is
- case emb of
- Left exc -> do putMVar mv (Left (exc :: SomeException))
- producer
- Right Nothing -> putMVar mv $! Right Nothing
- Right x -> putMVar mv (Right x) >> producer
- producer
- makeInputStream $ chunk mv nleft
-
- where
- chunk mv nleft = do
- emb <- takeMVar mv
- case emb of
- Left exc -> throwIO exc
- Right Nothing -> do x <- modifyMVar nleft $ \n ->
- let !n' = n - 1
- in return $! (n', n')
- if x > 0
- then chunk mv nleft
- else return Nothing
- Right x -> return x
-
-
--------------------------------------------------------------------------------
-- | Create a new pair of streams using an underlying 'Chan'. Everything written
-- to the 'OutputStream' will appear as-is on the 'InputStream'.