src/System/IO/Streams/Concurrent/Unagi/Bounded.hs
changeset 2 1760b7d150cf
child 3 400d49213290
equal deleted inserted replaced
1:e31281e9da96 2:1760b7d150cf
       
     1 {-# LANGUAGE BangPatterns #-}
       
     2 
       
     3 module System.IO.Streams.Concurrent.Unagi.Bounded
       
     4        ( -- * Channel conversions
       
     5          inputToChan
       
     6        , chanToInput
       
     7        , chanToOutput
       
     8        , makeChanPipe
       
     9        , dupStream
       
    10        , DupHandle
       
    11        ) where
       
    12 
       
    13 
       
    14 ------------------------------------------------------------------------------
       
    15 import           Control.Applicative                   (pure, (<$>), (<*>))
       
    16 import           Control.Concurrent.Chan.Unagi.Bounded (InChan, OutChan,
       
    17                                                         dupChan, newChan,
       
    18                                                         readChan, writeChan)
       
    19 import           Control.Monad                         ((>=>))
       
    20 import           Prelude                               hiding (read)
       
    21 import           System.IO.Streams.Internal            (InputStream,
       
    22                                                         OutputStream,
       
    23                                                         makeInputStream,
       
    24                                                         makeOutputStream, read)
       
    25 
       
    26 
       
    27 newtype DupHandle a = DupHandle { unDupHandle :: InChan (Maybe a) }
       
    28 
       
    29 ------------------------------------------------------------------------------
       
    30 -- | Writes the contents of an input stream to a channel until the input stream
       
    31 -- yields end-of-stream.
       
    32 inputToChan :: InputStream a -> InChan (Maybe a) -> IO ()
       
    33 inputToChan is ch = go
       
    34   where
       
    35     go = do
       
    36         mb <- read is
       
    37         writeChan ch mb
       
    38         maybe (return $! ()) (const go) mb
       
    39 
       
    40 
       
    41 ------------------------------------------------------------------------------
       
    42 -- | Turns an 'OutChan' into an input stream.
       
    43 --
       
    44 chanToInput :: OutChan (Maybe a) -> IO (InputStream a)
       
    45 chanToInput ch = makeInputStream $! readChan ch
       
    46 
       
    47 
       
    48 ------------------------------------------------------------------------------
       
    49 -- | Turns an 'InChan' into an output stream.
       
    50 --
       
    51 chanToOutput :: InChan (Maybe a) -> IO (OutputStream a)
       
    52 chanToOutput = makeOutputStream . writeChan
       
    53 
       
    54 
       
    55 --------------------------------------------------------------------------------
       
    56 -- | Create a new pair of streams using an underlying 'Chan'. Everything written
       
    57 -- to the 'OutputStream' will appear as-is on the 'InputStream'.
       
    58 --
       
    59 -- Since reading from the 'InputStream' and writing to the 'OutputStream' are
       
    60 -- blocking calls, be sure to do so in different threads.
       
    61 makeChanPipe :: Int -> IO (InputStream a, OutputStream a, DupHandle a)
       
    62 makeChanPipe size = do
       
    63     (inChan, outChan) <- newChan size
       
    64     (,,) <$> chanToInput outChan <*> chanToOutput inChan <*> pure (DupHandle inChan)
       
    65 
       
    66 
       
    67 ------------------------------------------------------------------------------
       
    68 -- | Use a 'DupHandle' to replicate everything written on the
       
    69 -- associated 'OutputStream' to the 'InputStream'.
       
    70 --
       
    71 dupStream :: DupHandle a -> IO (InputStream a)
       
    72 dupStream = dupChan . unDupHandle >=> chanToInput