# HG changeset patch # User Luke Hoersten # Date 1418153841 21600 # Node ID 1760b7d150cf285ee5134f29e76a390793b67bf1 # Parent e31281e9da96f4dc5e882fa93265f765cd53851e Added bounded dup streams. diff -r e31281e9da96 -r 1760b7d150cf src/System/IO/Streams/Concurrent/Unagi.hs --- a/src/System/IO/Streams/Concurrent/Unagi.hs Mon Dec 08 22:38:48 2014 -0600 +++ b/src/System/IO/Streams/Concurrent/Unagi.hs Tue Dec 09 13:37:21 2014 -0600 @@ -2,24 +2,17 @@ module System.IO.Streams.Concurrent.Unagi ( -- * Channel conversions - inputToChan + inputToChan , chanToInput , chanToOutput - , concurrentMerge , makeChanPipe ) where ------------------------------------------------------------------------------ import Control.Applicative ((<$>), (<*>)) -import Control.Concurrent (forkIO) import Control.Concurrent.Chan.Unagi (InChan, OutChan, newChan, readChan, writeChan) -import Control.Concurrent.MVar (modifyMVar, newEmptyMVar, - newMVar, putMVar, takeMVar) -import Control.Exception (SomeException, mask, throwIO, - try) -import Control.Monad (forM_) import Prelude hiding (read) import System.IO.Streams.Internal (InputStream, OutputStream, makeInputStream, @@ -53,45 +46,6 @@ chanToOutput = makeOutputStream . writeChan ------------------------------------------------------------------------------- --- | Concurrently merges a list of 'InputStream's, combining values in the --- order they become available. --- --- Note: does /not/ forward individual end-of-stream notifications, the --- produced stream does not yield end-of-stream until all of the input streams --- have finished. --- --- This traps exceptions in each concurrent thread and re-raises them in the --- current thread. -concurrentMerge :: [InputStream a] -> IO (InputStream a) -concurrentMerge iss = do - mv <- newEmptyMVar - nleft <- newMVar $! length iss - mask $ \restore -> forM_ iss $ \is -> forkIO $ do - let producer = do - emb <- try $ restore $ read is - case emb of - Left exc -> do putMVar mv (Left (exc :: SomeException)) - producer - Right Nothing -> putMVar mv $! Right Nothing - Right x -> putMVar mv (Right x) >> producer - producer - makeInputStream $ chunk mv nleft - - where - chunk mv nleft = do - emb <- takeMVar mv - case emb of - Left exc -> throwIO exc - Right Nothing -> do x <- modifyMVar nleft $ \n -> - let !n' = n - 1 - in return $! (n', n') - if x > 0 - then chunk mv nleft - else return Nothing - Right x -> return x - - -------------------------------------------------------------------------------- -- | Create a new pair of streams using an underlying 'Chan'. Everything written -- to the 'OutputStream' will appear as-is on the 'InputStream'. diff -r e31281e9da96 -r 1760b7d150cf src/System/IO/Streams/Concurrent/Unagi/Bounded.hs --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/System/IO/Streams/Concurrent/Unagi/Bounded.hs Tue Dec 09 13:37:21 2014 -0600 @@ -0,0 +1,72 @@ +{-# LANGUAGE BangPatterns #-} + +module System.IO.Streams.Concurrent.Unagi.Bounded + ( -- * Channel conversions + inputToChan + , chanToInput + , chanToOutput + , makeChanPipe + , dupStream + , DupHandle + ) where + + +------------------------------------------------------------------------------ +import Control.Applicative (pure, (<$>), (<*>)) +import Control.Concurrent.Chan.Unagi.Bounded (InChan, OutChan, + dupChan, newChan, + readChan, writeChan) +import Control.Monad ((>=>)) +import Prelude hiding (read) +import System.IO.Streams.Internal (InputStream, + OutputStream, + makeInputStream, + makeOutputStream, read) + + +newtype DupHandle a = DupHandle { unDupHandle :: InChan (Maybe a) } + +------------------------------------------------------------------------------ +-- | Writes the contents of an input stream to a channel until the input stream +-- yields end-of-stream. +inputToChan :: InputStream a -> InChan (Maybe a) -> IO () +inputToChan is ch = go + where + go = do + mb <- read is + writeChan ch mb + maybe (return $! ()) (const go) mb + + +------------------------------------------------------------------------------ +-- | Turns an 'OutChan' into an input stream. +-- +chanToInput :: OutChan (Maybe a) -> IO (InputStream a) +chanToInput ch = makeInputStream $! readChan ch + + +------------------------------------------------------------------------------ +-- | Turns an 'InChan' into an output stream. +-- +chanToOutput :: InChan (Maybe a) -> IO (OutputStream a) +chanToOutput = makeOutputStream . writeChan + + +-------------------------------------------------------------------------------- +-- | Create a new pair of streams using an underlying 'Chan'. Everything written +-- to the 'OutputStream' will appear as-is on the 'InputStream'. +-- +-- Since reading from the 'InputStream' and writing to the 'OutputStream' are +-- blocking calls, be sure to do so in different threads. +makeChanPipe :: Int -> IO (InputStream a, OutputStream a, DupHandle a) +makeChanPipe size = do + (inChan, outChan) <- newChan size + (,,) <$> chanToInput outChan <*> chanToOutput inChan <*> pure (DupHandle inChan) + + +------------------------------------------------------------------------------ +-- | Use a 'DupHandle' to replicate everything written on the +-- associated 'OutputStream' to the 'InputStream'. +-- +dupStream :: DupHandle a -> IO (InputStream a) +dupStream = dupChan . unDupHandle >=> chanToInput diff -r e31281e9da96 -r 1760b7d150cf unagi-streams.cabal --- a/unagi-streams.cabal Mon Dec 08 22:38:48 2014 -0600 +++ b/unagi-streams.cabal Tue Dec 09 13:37:21 2014 -0600 @@ -1,5 +1,5 @@ name: unagi-streams -version: 0.1.0.0 +version: 0.1.1.0 synopsis: Unagi Chan IO-Streams description: IO-Streams implemented underneath with Unagi channels. This library is a straight port of Greg Collins' IO-Streams @@ -15,6 +15,7 @@ library exposed-modules: System.IO.Streams.Concurrent.Unagi + , System.IO.Streams.Concurrent.Unagi.Bounded build-depends: base >= 4.7 && < 4.8 , unagi-chan >= 0.2 && < 0.3