Initial commit.
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/.hgignore Mon Dec 08 22:31:25 2014 -0600
@@ -0,0 +1,10 @@
+syntax: regexp
+~$
+\#.*\#$
+\.\#
+\.DS_Store$
+\.cabal-sandbox/
+\.hpc/
+\.tix$
+cabal\.sandbox\.config$
+dist/
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/LICENSE Mon Dec 08 22:31:25 2014 -0600
@@ -0,0 +1,28 @@
+Copyright (c) 2014, Luke Hoersten
+
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions are met:
+
+Redistributions of source code must retain the above copyright notice, this
+list of conditions and the following disclaimer.
+
+Redistributions in binary form must reproduce the above copyright notice, this
+list of conditions and the following disclaimer in the documentation and/or
+other materials provided with the distribution.
+
+Neither the names of Google, Erudify, nor the names of other contributors may
+be used to endorse or promote products derived from this software without
+specific prior written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
+ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
+WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
+DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
+FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
+CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/Setup.hs Mon Dec 08 22:31:25 2014 -0600
@@ -0,0 +1,2 @@
+import Distribution.Simple
+main = defaultMain
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/src/System/IO/Streams/Concurrent/Unagi.hs Mon Dec 08 22:31:25 2014 -0600
@@ -0,0 +1,104 @@
+{-# LANGUAGE BangPatterns #-}
+
+module System.IO.Streams.Concurrent.Unagi
+ ( -- * Channel conversions
+ inputToChan
+ , chanToInput
+ , chanToOutput
+ , concurrentMerge
+ , makeChanPipe
+ ) where
+
+
+------------------------------------------------------------------------------
+import Control.Applicative ((<$>), (<*>))
+import Control.Concurrent (forkIO)
+import Control.Concurrent.Chan.Unagi (InChan, OutChan, newChan,
+ readChan, writeChan)
+import Control.Concurrent.MVar (modifyMVar, newEmptyMVar,
+ newMVar, putMVar, takeMVar)
+import Control.Exception (SomeException, mask, throwIO,
+ try)
+import Control.Monad (forM_)
+import Prelude hiding (read)
+import System.IO.Streams.Internal (InputStream, OutputStream,
+ makeInputStream,
+ makeOutputStream, read)
+
+
+
+------------------------------------------------------------------------------
+-- | Writes the contents of an input stream to a channel until the input stream
+-- yields end-of-stream.
+inputToChan :: InputStream a -> InChan (Maybe a) -> IO ()
+inputToChan is ch = go
+ where
+ go = do
+ mb <- read is
+ writeChan ch mb
+ maybe (return $! ()) (const go) mb
+
+
+------------------------------------------------------------------------------
+-- | Turns an 'OutChan' into an input stream.
+--
+chanToInput :: OutChan (Maybe a) -> IO (InputStream a)
+chanToInput ch = makeInputStream $! readChan ch
+
+
+------------------------------------------------------------------------------
+-- | Turns an 'InChan' into an output stream.
+--
+chanToOutput :: InChan (Maybe a) -> IO (OutputStream a)
+chanToOutput = makeOutputStream . writeChan
+
+
+------------------------------------------------------------------------------
+-- | Concurrently merges a list of 'InputStream's, combining values in the
+-- order they become available.
+--
+-- Note: does /not/ forward individual end-of-stream notifications, the
+-- produced stream does not yield end-of-stream until all of the input streams
+-- have finished.
+--
+-- This traps exceptions in each concurrent thread and re-raises them in the
+-- current thread.
+concurrentMerge :: [InputStream a] -> IO (InputStream a)
+concurrentMerge iss = do
+ mv <- newEmptyMVar
+ nleft <- newMVar $! length iss
+ mask $ \restore -> forM_ iss $ \is -> forkIO $ do
+ let producer = do
+ emb <- try $ restore $ read is
+ case emb of
+ Left exc -> do putMVar mv (Left (exc :: SomeException))
+ producer
+ Right Nothing -> putMVar mv $! Right Nothing
+ Right x -> putMVar mv (Right x) >> producer
+ producer
+ makeInputStream $ chunk mv nleft
+
+ where
+ chunk mv nleft = do
+ emb <- takeMVar mv
+ case emb of
+ Left exc -> throwIO exc
+ Right Nothing -> do x <- modifyMVar nleft $ \n ->
+ let !n' = n - 1
+ in return $! (n', n')
+ if x > 0
+ then chunk mv nleft
+ else return Nothing
+ Right x -> return x
+
+
+--------------------------------------------------------------------------------
+-- | Create a new pair of streams using an underlying 'Chan'. Everything written
+-- to the 'OutputStream' will appear as-is on the 'InputStream'.
+--
+-- Since reading from the 'InputStream' and writing to the 'OutputStream' are
+-- blocking calls, be sure to do so in different threads.
+makeChanPipe :: IO (InputStream a, OutputStream a)
+makeChanPipe = do
+ (inChan, outChan) <- newChan
+ (,) <$> chanToInput outChan <*> chanToOutput inChan
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/unagi-streams.cabal Mon Dec 08 22:31:25 2014 -0600
@@ -0,0 +1,28 @@
+name: unagi-streams
+version: 0.1.0.0
+synopsis: Unagi Chan IO-Streams
+description: IO-Streams implemented underneath with Unagi
+ channels. This library is a straight port of Greg Collins' IO-Streams
+ Chan implementation.
+License: BSD3
+License-file: LICENSE
+author: Luke Hoersten
+maintainer: [email protected]
+-- copyright:
+category: System
+build-type: Simple
+-- extra-source-files:
+cabal-version: >=1.10
+
+library
+ exposed-modules:
+ System.IO.Streams.Concurrent.Unagi
+ -- other-modules:
+ -- other-extensions:
+ build-depends:
+ base >= 4.7 && < 4.8
+ , unagi-chan >= 0.2 && < 0.3
+ , io-streams >= 1.2 && < 1.3
+
+ hs-source-dirs: src/
+ default-language: Haskell2010