Added bounded dup streams.
--- a/src/System/IO/Streams/Concurrent/Unagi.hs Mon Dec 08 22:38:48 2014 -0600
+++ b/src/System/IO/Streams/Concurrent/Unagi.hs Tue Dec 09 13:37:21 2014 -0600
@@ -2,24 +2,17 @@
module System.IO.Streams.Concurrent.Unagi
( -- * Channel conversions
- inputToChan
+ inputToChan
, chanToInput
, chanToOutput
- , concurrentMerge
, makeChanPipe
) where
------------------------------------------------------------------------------
import Control.Applicative ((<$>), (<*>))
-import Control.Concurrent (forkIO)
import Control.Concurrent.Chan.Unagi (InChan, OutChan, newChan,
readChan, writeChan)
-import Control.Concurrent.MVar (modifyMVar, newEmptyMVar,
- newMVar, putMVar, takeMVar)
-import Control.Exception (SomeException, mask, throwIO,
- try)
-import Control.Monad (forM_)
import Prelude hiding (read)
import System.IO.Streams.Internal (InputStream, OutputStream,
makeInputStream,
@@ -53,45 +46,6 @@
chanToOutput = makeOutputStream . writeChan
-------------------------------------------------------------------------------
--- | Concurrently merges a list of 'InputStream's, combining values in the
--- order they become available.
---
--- Note: does /not/ forward individual end-of-stream notifications, the
--- produced stream does not yield end-of-stream until all of the input streams
--- have finished.
---
--- This traps exceptions in each concurrent thread and re-raises them in the
--- current thread.
-concurrentMerge :: [InputStream a] -> IO (InputStream a)
-concurrentMerge iss = do
- mv <- newEmptyMVar
- nleft <- newMVar $! length iss
- mask $ \restore -> forM_ iss $ \is -> forkIO $ do
- let producer = do
- emb <- try $ restore $ read is
- case emb of
- Left exc -> do putMVar mv (Left (exc :: SomeException))
- producer
- Right Nothing -> putMVar mv $! Right Nothing
- Right x -> putMVar mv (Right x) >> producer
- producer
- makeInputStream $ chunk mv nleft
-
- where
- chunk mv nleft = do
- emb <- takeMVar mv
- case emb of
- Left exc -> throwIO exc
- Right Nothing -> do x <- modifyMVar nleft $ \n ->
- let !n' = n - 1
- in return $! (n', n')
- if x > 0
- then chunk mv nleft
- else return Nothing
- Right x -> return x
-
-
--------------------------------------------------------------------------------
-- | Create a new pair of streams using an underlying 'Chan'. Everything written
-- to the 'OutputStream' will appear as-is on the 'InputStream'.
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/System/IO/Streams/Concurrent/Unagi/Bounded.hs Tue Dec 09 13:37:21 2014 -0600
@@ -0,0 +1,72 @@
+{-# LANGUAGE BangPatterns #-}
+
+module System.IO.Streams.Concurrent.Unagi.Bounded
+ ( -- * Channel conversions
+ inputToChan
+ , chanToInput
+ , chanToOutput
+ , makeChanPipe
+ , dupStream
+ , DupHandle
+ ) where
+
+
+------------------------------------------------------------------------------
+import Control.Applicative (pure, (<$>), (<*>))
+import Control.Concurrent.Chan.Unagi.Bounded (InChan, OutChan,
+ dupChan, newChan,
+ readChan, writeChan)
+import Control.Monad ((>=>))
+import Prelude hiding (read)
+import System.IO.Streams.Internal (InputStream,
+ OutputStream,
+ makeInputStream,
+ makeOutputStream, read)
+
+
+newtype DupHandle a = DupHandle { unDupHandle :: InChan (Maybe a) }
+
+------------------------------------------------------------------------------
+-- | Writes the contents of an input stream to a channel until the input stream
+-- yields end-of-stream.
+inputToChan :: InputStream a -> InChan (Maybe a) -> IO ()
+inputToChan is ch = go
+ where
+ go = do
+ mb <- read is
+ writeChan ch mb
+ maybe (return $! ()) (const go) mb
+
+
+------------------------------------------------------------------------------
+-- | Turns an 'OutChan' into an input stream.
+--
+chanToInput :: OutChan (Maybe a) -> IO (InputStream a)
+chanToInput ch = makeInputStream $! readChan ch
+
+
+------------------------------------------------------------------------------
+-- | Turns an 'InChan' into an output stream.
+--
+chanToOutput :: InChan (Maybe a) -> IO (OutputStream a)
+chanToOutput = makeOutputStream . writeChan
+
+
+--------------------------------------------------------------------------------
+-- | Create a new pair of streams using an underlying 'Chan'. Everything written
+-- to the 'OutputStream' will appear as-is on the 'InputStream'.
+--
+-- Since reading from the 'InputStream' and writing to the 'OutputStream' are
+-- blocking calls, be sure to do so in different threads.
+makeChanPipe :: Int -> IO (InputStream a, OutputStream a, DupHandle a)
+makeChanPipe size = do
+ (inChan, outChan) <- newChan size
+ (,,) <$> chanToInput outChan <*> chanToOutput inChan <*> pure (DupHandle inChan)
+
+
+------------------------------------------------------------------------------
+-- | Use a 'DupHandle' to replicate everything written on the
+-- associated 'OutputStream' to the 'InputStream'.
+--
+dupStream :: DupHandle a -> IO (InputStream a)
+dupStream = dupChan . unDupHandle >=> chanToInput
--- a/unagi-streams.cabal Mon Dec 08 22:38:48 2014 -0600
+++ b/unagi-streams.cabal Tue Dec 09 13:37:21 2014 -0600
@@ -1,5 +1,5 @@
name: unagi-streams
-version: 0.1.0.0
+version: 0.1.1.0
synopsis: Unagi Chan IO-Streams
description: IO-Streams implemented underneath with Unagi
channels. This library is a straight port of Greg Collins' IO-Streams
@@ -15,6 +15,7 @@
library
exposed-modules:
System.IO.Streams.Concurrent.Unagi
+ , System.IO.Streams.Concurrent.Unagi.Bounded
build-depends:
base >= 4.7 && < 4.8
, unagi-chan >= 0.2 && < 0.3