src/System/IO/Streams/Concurrent/Unagi.hs
author Luke Hoersten <luke@hoersten.org>
Mon, 08 Dec 2014 22:38:48 -0600
changeset 1 e31281e9da96
parent 0 e77b750dfbda
child 2 1760b7d150cf
permissions -rw-r--r--
Minor update
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
0
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
     1
{-# LANGUAGE BangPatterns #-}
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
     2
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
     3
module System.IO.Streams.Concurrent.Unagi
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
     4
       ( -- * Channel conversions
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
     5
           inputToChan
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
     6
       , chanToInput
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
     7
       , chanToOutput
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
     8
       , concurrentMerge
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
     9
       , makeChanPipe
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    10
       ) where
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    11
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    12
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    13
------------------------------------------------------------------------------
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    14
import           Control.Applicative           ((<$>), (<*>))
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    15
import           Control.Concurrent            (forkIO)
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    16
import           Control.Concurrent.Chan.Unagi (InChan, OutChan, newChan,
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    17
                                                readChan, writeChan)
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    18
import           Control.Concurrent.MVar       (modifyMVar, newEmptyMVar,
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    19
                                                newMVar, putMVar, takeMVar)
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    20
import           Control.Exception             (SomeException, mask, throwIO,
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    21
                                                try)
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    22
import           Control.Monad                 (forM_)
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    23
import           Prelude                       hiding (read)
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    24
import           System.IO.Streams.Internal    (InputStream, OutputStream,
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    25
                                                makeInputStream,
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    26
                                                makeOutputStream, read)
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    27
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    28
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    29
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    30
------------------------------------------------------------------------------
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    31
-- | Writes the contents of an input stream to a channel until the input stream
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    32
-- yields end-of-stream.
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    33
inputToChan :: InputStream a -> InChan (Maybe a) -> IO ()
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    34
inputToChan is ch = go
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    35
  where
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    36
    go = do
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    37
        mb <- read is
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    38
        writeChan ch mb
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    39
        maybe (return $! ()) (const go) mb
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    40
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    41
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    42
------------------------------------------------------------------------------
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    43
-- | Turns an 'OutChan' into an input stream.
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    44
--
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    45
chanToInput :: OutChan (Maybe a) -> IO (InputStream a)
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    46
chanToInput ch = makeInputStream $! readChan ch
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    47
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    48
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    49
------------------------------------------------------------------------------
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    50
-- | Turns an 'InChan' into an output stream.
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    51
--
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    52
chanToOutput :: InChan (Maybe a) -> IO (OutputStream a)
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    53
chanToOutput = makeOutputStream . writeChan
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    54
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    55
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    56
------------------------------------------------------------------------------
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    57
-- | Concurrently merges a list of 'InputStream's, combining values in the
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    58
-- order they become available.
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    59
--
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    60
-- Note: does /not/ forward individual end-of-stream notifications, the
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    61
-- produced stream does not yield end-of-stream until all of the input streams
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    62
-- have finished.
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    63
--
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    64
-- This traps exceptions in each concurrent thread and re-raises them in the
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    65
-- current thread.
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    66
concurrentMerge :: [InputStream a] -> IO (InputStream a)
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    67
concurrentMerge iss = do
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    68
    mv    <- newEmptyMVar
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    69
    nleft <- newMVar $! length iss
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    70
    mask $ \restore -> forM_ iss $ \is -> forkIO $ do
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    71
        let producer = do
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    72
              emb <- try $ restore $ read is
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    73
              case emb of
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    74
                  Left exc      -> do putMVar mv (Left (exc :: SomeException))
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    75
                                      producer
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    76
                  Right Nothing -> putMVar mv $! Right Nothing
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    77
                  Right x       -> putMVar mv (Right x) >> producer
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    78
        producer
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    79
    makeInputStream $ chunk mv nleft
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    80
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    81
  where
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    82
    chunk mv nleft = do
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    83
        emb <- takeMVar mv
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    84
        case emb of
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    85
            Left exc      -> throwIO exc
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    86
            Right Nothing -> do x <- modifyMVar nleft $ \n ->
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    87
                                     let !n' = n - 1
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    88
                                     in return $! (n', n')
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    89
                                if x > 0
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    90
                                  then chunk mv nleft
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    91
                                  else return Nothing
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    92
            Right x       -> return x
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    93
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    94
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    95
--------------------------------------------------------------------------------
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    96
-- | Create a new pair of streams using an underlying 'Chan'. Everything written
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    97
-- to the 'OutputStream' will appear as-is on the 'InputStream'.
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    98
--
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
    99
-- Since reading from the 'InputStream' and writing to the 'OutputStream' are
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
   100
-- blocking calls, be sure to do so in different threads.
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
   101
makeChanPipe :: IO (InputStream a, OutputStream a)
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
   102
makeChanPipe = do
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
   103
    (inChan, outChan) <- newChan
e77b750dfbda Initial commit.
Luke Hoersten <luke@hoersten.org>
parents:
diff changeset
   104
    (,) <$> chanToInput outChan <*> chanToOutput inChan