Added bounded dup streams.
authorLuke Hoersten <luke@hoersten.org>
Tue, 09 Dec 2014 13:37:21 -0600
changeset 2 1760b7d150cf
parent 1 e31281e9da96
child 3 400d49213290
Added bounded dup streams.
src/System/IO/Streams/Concurrent/Unagi.hs
src/System/IO/Streams/Concurrent/Unagi/Bounded.hs
unagi-streams.cabal
--- a/src/System/IO/Streams/Concurrent/Unagi.hs	Mon Dec 08 22:38:48 2014 -0600
+++ b/src/System/IO/Streams/Concurrent/Unagi.hs	Tue Dec 09 13:37:21 2014 -0600
@@ -2,24 +2,17 @@
 
 module System.IO.Streams.Concurrent.Unagi
        ( -- * Channel conversions
-           inputToChan
+         inputToChan
        , chanToInput
        , chanToOutput
-       , concurrentMerge
        , makeChanPipe
        ) where
 
 
 ------------------------------------------------------------------------------
 import           Control.Applicative           ((<$>), (<*>))
-import           Control.Concurrent            (forkIO)
 import           Control.Concurrent.Chan.Unagi (InChan, OutChan, newChan,
                                                 readChan, writeChan)
-import           Control.Concurrent.MVar       (modifyMVar, newEmptyMVar,
-                                                newMVar, putMVar, takeMVar)
-import           Control.Exception             (SomeException, mask, throwIO,
-                                                try)
-import           Control.Monad                 (forM_)
 import           Prelude                       hiding (read)
 import           System.IO.Streams.Internal    (InputStream, OutputStream,
                                                 makeInputStream,
@@ -53,45 +46,6 @@
 chanToOutput = makeOutputStream . writeChan
 
 
-------------------------------------------------------------------------------
--- | Concurrently merges a list of 'InputStream's, combining values in the
--- order they become available.
---
--- Note: does /not/ forward individual end-of-stream notifications, the
--- produced stream does not yield end-of-stream until all of the input streams
--- have finished.
---
--- This traps exceptions in each concurrent thread and re-raises them in the
--- current thread.
-concurrentMerge :: [InputStream a] -> IO (InputStream a)
-concurrentMerge iss = do
-    mv    <- newEmptyMVar
-    nleft <- newMVar $! length iss
-    mask $ \restore -> forM_ iss $ \is -> forkIO $ do
-        let producer = do
-              emb <- try $ restore $ read is
-              case emb of
-                  Left exc      -> do putMVar mv (Left (exc :: SomeException))
-                                      producer
-                  Right Nothing -> putMVar mv $! Right Nothing
-                  Right x       -> putMVar mv (Right x) >> producer
-        producer
-    makeInputStream $ chunk mv nleft
-
-  where
-    chunk mv nleft = do
-        emb <- takeMVar mv
-        case emb of
-            Left exc      -> throwIO exc
-            Right Nothing -> do x <- modifyMVar nleft $ \n ->
-                                     let !n' = n - 1
-                                     in return $! (n', n')
-                                if x > 0
-                                  then chunk mv nleft
-                                  else return Nothing
-            Right x       -> return x
-
-
 --------------------------------------------------------------------------------
 -- | Create a new pair of streams using an underlying 'Chan'. Everything written
 -- to the 'OutputStream' will appear as-is on the 'InputStream'.
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/System/IO/Streams/Concurrent/Unagi/Bounded.hs	Tue Dec 09 13:37:21 2014 -0600
@@ -0,0 +1,72 @@
+{-# LANGUAGE BangPatterns #-}
+
+module System.IO.Streams.Concurrent.Unagi.Bounded
+       ( -- * Channel conversions
+         inputToChan
+       , chanToInput
+       , chanToOutput
+       , makeChanPipe
+       , dupStream
+       , DupHandle
+       ) where
+
+
+------------------------------------------------------------------------------
+import           Control.Applicative                   (pure, (<$>), (<*>))
+import           Control.Concurrent.Chan.Unagi.Bounded (InChan, OutChan,
+                                                        dupChan, newChan,
+                                                        readChan, writeChan)
+import           Control.Monad                         ((>=>))
+import           Prelude                               hiding (read)
+import           System.IO.Streams.Internal            (InputStream,
+                                                        OutputStream,
+                                                        makeInputStream,
+                                                        makeOutputStream, read)
+
+
+newtype DupHandle a = DupHandle { unDupHandle :: InChan (Maybe a) }
+
+------------------------------------------------------------------------------
+-- | Writes the contents of an input stream to a channel until the input stream
+-- yields end-of-stream.
+inputToChan :: InputStream a -> InChan (Maybe a) -> IO ()
+inputToChan is ch = go
+  where
+    go = do
+        mb <- read is
+        writeChan ch mb
+        maybe (return $! ()) (const go) mb
+
+
+------------------------------------------------------------------------------
+-- | Turns an 'OutChan' into an input stream.
+--
+chanToInput :: OutChan (Maybe a) -> IO (InputStream a)
+chanToInput ch = makeInputStream $! readChan ch
+
+
+------------------------------------------------------------------------------
+-- | Turns an 'InChan' into an output stream.
+--
+chanToOutput :: InChan (Maybe a) -> IO (OutputStream a)
+chanToOutput = makeOutputStream . writeChan
+
+
+--------------------------------------------------------------------------------
+-- | Create a new pair of streams using an underlying 'Chan'. Everything written
+-- to the 'OutputStream' will appear as-is on the 'InputStream'.
+--
+-- Since reading from the 'InputStream' and writing to the 'OutputStream' are
+-- blocking calls, be sure to do so in different threads.
+makeChanPipe :: Int -> IO (InputStream a, OutputStream a, DupHandle a)
+makeChanPipe size = do
+    (inChan, outChan) <- newChan size
+    (,,) <$> chanToInput outChan <*> chanToOutput inChan <*> pure (DupHandle inChan)
+
+
+------------------------------------------------------------------------------
+-- | Use a 'DupHandle' to replicate everything written on the
+-- associated 'OutputStream' to the 'InputStream'.
+--
+dupStream :: DupHandle a -> IO (InputStream a)
+dupStream = dupChan . unDupHandle >=> chanToInput
--- a/unagi-streams.cabal	Mon Dec 08 22:38:48 2014 -0600
+++ b/unagi-streams.cabal	Tue Dec 09 13:37:21 2014 -0600
@@ -1,5 +1,5 @@
 name:                unagi-streams
-version:             0.1.0.0
+version:             0.1.1.0
 synopsis:            Unagi Chan IO-Streams
 description:         IO-Streams implemented underneath with Unagi
                      channels. This library is a straight port of Greg Collins' IO-Streams
@@ -15,6 +15,7 @@
 library
   exposed-modules:
                 System.IO.Streams.Concurrent.Unagi
+              , System.IO.Streams.Concurrent.Unagi.Bounded
   build-depends:
                 base       >= 4.7 && < 4.8
               , unagi-chan >= 0.2 && < 0.3