src/System/IO/Streams/Concurrent/Unagi.hs
changeset 2 1760b7d150cf
parent 0 e77b750dfbda
child 4 1baf8e3b8ef2
--- a/src/System/IO/Streams/Concurrent/Unagi.hs	Mon Dec 08 22:38:48 2014 -0600
+++ b/src/System/IO/Streams/Concurrent/Unagi.hs	Tue Dec 09 13:37:21 2014 -0600
@@ -2,24 +2,17 @@
 
 module System.IO.Streams.Concurrent.Unagi
        ( -- * Channel conversions
-           inputToChan
+         inputToChan
        , chanToInput
        , chanToOutput
-       , concurrentMerge
        , makeChanPipe
        ) where
 
 
 ------------------------------------------------------------------------------
 import           Control.Applicative           ((<$>), (<*>))
-import           Control.Concurrent            (forkIO)
 import           Control.Concurrent.Chan.Unagi (InChan, OutChan, newChan,
                                                 readChan, writeChan)
-import           Control.Concurrent.MVar       (modifyMVar, newEmptyMVar,
-                                                newMVar, putMVar, takeMVar)
-import           Control.Exception             (SomeException, mask, throwIO,
-                                                try)
-import           Control.Monad                 (forM_)
 import           Prelude                       hiding (read)
 import           System.IO.Streams.Internal    (InputStream, OutputStream,
                                                 makeInputStream,
@@ -53,45 +46,6 @@
 chanToOutput = makeOutputStream . writeChan
 
 
-------------------------------------------------------------------------------
--- | Concurrently merges a list of 'InputStream's, combining values in the
--- order they become available.
---
--- Note: does /not/ forward individual end-of-stream notifications, the
--- produced stream does not yield end-of-stream until all of the input streams
--- have finished.
---
--- This traps exceptions in each concurrent thread and re-raises them in the
--- current thread.
-concurrentMerge :: [InputStream a] -> IO (InputStream a)
-concurrentMerge iss = do
-    mv    <- newEmptyMVar
-    nleft <- newMVar $! length iss
-    mask $ \restore -> forM_ iss $ \is -> forkIO $ do
-        let producer = do
-              emb <- try $ restore $ read is
-              case emb of
-                  Left exc      -> do putMVar mv (Left (exc :: SomeException))
-                                      producer
-                  Right Nothing -> putMVar mv $! Right Nothing
-                  Right x       -> putMVar mv (Right x) >> producer
-        producer
-    makeInputStream $ chunk mv nleft
-
-  where
-    chunk mv nleft = do
-        emb <- takeMVar mv
-        case emb of
-            Left exc      -> throwIO exc
-            Right Nothing -> do x <- modifyMVar nleft $ \n ->
-                                     let !n' = n - 1
-                                     in return $! (n', n')
-                                if x > 0
-                                  then chunk mv nleft
-                                  else return Nothing
-            Right x       -> return x
-
-
 --------------------------------------------------------------------------------
 -- | Create a new pair of streams using an underlying 'Chan'. Everything written
 -- to the 'OutputStream' will appear as-is on the 'InputStream'.