src.nth.io/

summaryrefslogtreecommitdiff
path: root/src/System/IO/Streams/Concurrent/Unagi.hs
blob: 141902f6f5558f2eb66399e82736c5148b5698f8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
{-# LANGUAGE BangPatterns #-}

module System.IO.Streams.Concurrent.Unagi
       ( -- * Channel conversions
         inputToChan
       , chanToInput
       , chanToOutput
       , makeChanPipe
       , chanToPipe
       , dupStream
       ) where


------------------------------------------------------------------------------
import           Control.Applicative           ((<$>), (<*>))
import           Control.Concurrent.Chan.Unagi (InChan, OutChan, dupChan,
                                                newChan, readChan, writeChan)
import           Control.Monad                 ((>=>))
import           Prelude                       hiding (read)
import           System.IO.Streams.Internal    (InputStream, OutputStream,
                                                makeInputStream,
                                                makeOutputStream, read)



------------------------------------------------------------------------------
-- | Writes the contents of an input stream to a channel until the input stream
-- yields end-of-stream.
inputToChan :: InputStream a -> InChan (Maybe a) -> IO ()
inputToChan is ch = go
  where
    go = do
        mb <- read is
        writeChan ch mb
        maybe (return $! ()) (const go) mb


------------------------------------------------------------------------------
-- | Turns an 'OutChan' into an input stream.
--
chanToInput :: OutChan (Maybe a) -> IO (InputStream a)
chanToInput ch = makeInputStream $! readChan ch


------------------------------------------------------------------------------
-- | Turns an 'InChan' into an output stream.
--
chanToOutput :: InChan (Maybe a) -> IO (OutputStream a)
chanToOutput = makeOutputStream . writeChan


--------------------------------------------------------------------------------
-- | Create a new pair of streams using an underlying 'Chan'. Everything written
-- to the 'OutputStream' will appear as-is on the 'InputStream'.
--
-- Since reading from the 'InputStream' and writing to the 'OutputStream' are
-- blocking calls, be sure to do so in different threads.
makeChanPipe :: IO (InputStream a, OutputStream a)
makeChanPipe = do
    (inChan, outChan) <- newChan
    (,) <$> chanToInput outChan <*> chanToOutput inChan


--------------------------------------------------------------------------------
-- | Create a new pair of streams form the given 'Chan'. Everything written
-- to the 'OutputStream' will appear as-is on the 'InputStream'.
--
-- Since reading from the 'InputStream' and writing to the 'OutputStream' are
-- blocking calls, be sure to do so in different threads.
chanToPipe :: (InChan (Maybe a), OutChan (Maybe a)) -> IO (InputStream a, OutputStream a)
chanToPipe (inChan, outChan) = (,) <$> chanToInput outChan <*> chanToOutput inChan


--------------------------------------------------------------------------------
-- | Create a new input stream duplicated from the 'InChan'
--
dupStream :: InChan (Maybe a) -> IO (InputStream a)
dupStream = dupChan >=> chanToInput