Initial commit.
authorLuke Hoersten <luke@hoersten.org>
Mon, 08 Dec 2014 22:31:25 -0600
changeset 0 e77b750dfbda
child 1 e31281e9da96
Initial commit.
.hgignore
LICENSE
Setup.hs
src/System/IO/Streams/Concurrent/Unagi.hs
unagi-streams.cabal
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/.hgignore	Mon Dec 08 22:31:25 2014 -0600
@@ -0,0 +1,10 @@
+syntax: regexp
+~$
+\#.*\#$
+\.\#
+\.DS_Store$
+\.cabal-sandbox/
+\.hpc/
+\.tix$
+cabal\.sandbox\.config$
+dist/
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/LICENSE	Mon Dec 08 22:31:25 2014 -0600
@@ -0,0 +1,28 @@
+Copyright (c) 2014, Luke Hoersten
+
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+
+Redistributions of source code must retain the above copyright notice, this
+list of conditions and the following disclaimer.
+
+Redistributions in binary form must reproduce the above copyright notice, this
+list of conditions and the following disclaimer in the documentation and/or
+other materials provided with the distribution.
+
+Neither the names of Google, Erudify, nor the names of other contributors may
+be used to endorse or promote products derived from this software without
+specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
+FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/Setup.hs	Mon Dec 08 22:31:25 2014 -0600
@@ -0,0 +1,2 @@
+import Distribution.Simple
+main = defaultMain
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/src/System/IO/Streams/Concurrent/Unagi.hs	Mon Dec 08 22:31:25 2014 -0600
@@ -0,0 +1,104 @@
+{-# LANGUAGE BangPatterns #-}
+
+module System.IO.Streams.Concurrent.Unagi
+       ( -- * Channel conversions
+           inputToChan
+       , chanToInput
+       , chanToOutput
+       , concurrentMerge
+       , makeChanPipe
+       ) where
+
+
+------------------------------------------------------------------------------
+import           Control.Applicative           ((<$>), (<*>))
+import           Control.Concurrent            (forkIO)
+import           Control.Concurrent.Chan.Unagi (InChan, OutChan, newChan,
+                                                readChan, writeChan)
+import           Control.Concurrent.MVar       (modifyMVar, newEmptyMVar,
+                                                newMVar, putMVar, takeMVar)
+import           Control.Exception             (SomeException, mask, throwIO,
+                                                try)
+import           Control.Monad                 (forM_)
+import           Prelude                       hiding (read)
+import           System.IO.Streams.Internal    (InputStream, OutputStream,
+                                                makeInputStream,
+                                                makeOutputStream, read)
+
+
+
+------------------------------------------------------------------------------
+-- | Writes the contents of an input stream to a channel until the input stream
+-- yields end-of-stream.
+inputToChan :: InputStream a -> InChan (Maybe a) -> IO ()
+inputToChan is ch = go
+  where
+    go = do
+        mb <- read is
+        writeChan ch mb
+        maybe (return $! ()) (const go) mb
+
+
+------------------------------------------------------------------------------
+-- | Turns an 'OutChan' into an input stream.
+--
+chanToInput :: OutChan (Maybe a) -> IO (InputStream a)
+chanToInput ch = makeInputStream $! readChan ch
+
+
+------------------------------------------------------------------------------
+-- | Turns an 'InChan' into an output stream.
+--
+chanToOutput :: InChan (Maybe a) -> IO (OutputStream a)
+chanToOutput = makeOutputStream . writeChan
+
+
+------------------------------------------------------------------------------
+-- | Concurrently merges a list of 'InputStream's, combining values in the
+-- order they become available.
+--
+-- Note: does /not/ forward individual end-of-stream notifications, the
+-- produced stream does not yield end-of-stream until all of the input streams
+-- have finished.
+--
+-- This traps exceptions in each concurrent thread and re-raises them in the
+-- current thread.
+concurrentMerge :: [InputStream a] -> IO (InputStream a)
+concurrentMerge iss = do
+    mv    <- newEmptyMVar
+    nleft <- newMVar $! length iss
+    mask $ \restore -> forM_ iss $ \is -> forkIO $ do
+        let producer = do
+              emb <- try $ restore $ read is
+              case emb of
+                  Left exc      -> do putMVar mv (Left (exc :: SomeException))
+                                      producer
+                  Right Nothing -> putMVar mv $! Right Nothing
+                  Right x       -> putMVar mv (Right x) >> producer
+        producer
+    makeInputStream $ chunk mv nleft
+
+  where
+    chunk mv nleft = do
+        emb <- takeMVar mv
+        case emb of
+            Left exc      -> throwIO exc
+            Right Nothing -> do x <- modifyMVar nleft $ \n ->
+                                     let !n' = n - 1
+                                     in return $! (n', n')
+                                if x > 0
+                                  then chunk mv nleft
+                                  else return Nothing
+            Right x       -> return x
+
+
+--------------------------------------------------------------------------------
+-- | Create a new pair of streams using an underlying 'Chan'. Everything written
+-- to the 'OutputStream' will appear as-is on the 'InputStream'.
+--
+-- Since reading from the 'InputStream' and writing to the 'OutputStream' are
+-- blocking calls, be sure to do so in different threads.
+makeChanPipe :: IO (InputStream a, OutputStream a)
+makeChanPipe = do
+    (inChan, outChan) <- newChan
+    (,) <$> chanToInput outChan <*> chanToOutput inChan
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/unagi-streams.cabal	Mon Dec 08 22:31:25 2014 -0600
@@ -0,0 +1,28 @@
+name:                unagi-streams
+version:             0.1.0.0
+synopsis:            Unagi Chan IO-Streams
+description:         IO-Streams implemented underneath with Unagi
+                     channels. This library is a straight port of Greg Collins' IO-Streams
+                     Chan implementation.
+License:             BSD3
+License-file:        LICENSE
+author:              Luke Hoersten
+maintainer:          [email protected]
+-- copyright:
+category:            System
+build-type:          Simple
+-- extra-source-files:
+cabal-version:       >=1.10
+
+library
+  exposed-modules:
+                System.IO.Streams.Concurrent.Unagi
+  -- other-modules:
+  -- other-extensions:
+  build-depends:
+                base       >= 4.7 && < 4.8
+              , unagi-chan >= 0.2 && < 0.3
+              , io-streams >= 1.2 && < 1.3
+
+  hs-source-dirs:      src/
+  default-language:    Haskell2010