# HG changeset patch # User Luke Hoersten # Date 1418099485 21600 # Node ID e77b750dfbdabe1dd4e3c33b95a0ce63399801d5 Initial commit. diff -r 000000000000 -r e77b750dfbda .hgignore --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/.hgignore Mon Dec 08 22:31:25 2014 -0600 @@ -0,0 +1,10 @@ +syntax: regexp +~$ +\#.*\#$ +\.\# +\.DS_Store$ +\.cabal-sandbox/ +\.hpc/ +\.tix$ +cabal\.sandbox\.config$ +dist/ diff -r 000000000000 -r e77b750dfbda LICENSE --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/LICENSE Mon Dec 08 22:31:25 2014 -0600 @@ -0,0 +1,28 @@ +Copyright (c) 2014, Luke Hoersten + +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions are met: + +Redistributions of source code must retain the above copyright notice, this +list of conditions and the following disclaimer. + +Redistributions in binary form must reproduce the above copyright notice, this +list of conditions and the following disclaimer in the documentation and/or +other materials provided with the distribution. + +Neither the names of Google, Erudify, nor the names of other contributors may +be used to endorse or promote products derived from this software without +specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED +WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER +CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, +OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. diff -r 000000000000 -r e77b750dfbda Setup.hs --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/Setup.hs Mon Dec 08 22:31:25 2014 -0600 @@ -0,0 +1,2 @@ +import Distribution.Simple +main = defaultMain diff -r 000000000000 -r e77b750dfbda src/System/IO/Streams/Concurrent/Unagi.hs --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/src/System/IO/Streams/Concurrent/Unagi.hs Mon Dec 08 22:31:25 2014 -0600 @@ -0,0 +1,104 @@ +{-# LANGUAGE BangPatterns #-} + +module System.IO.Streams.Concurrent.Unagi + ( -- * Channel conversions + inputToChan + , chanToInput + , chanToOutput + , concurrentMerge + , makeChanPipe + ) where + + +------------------------------------------------------------------------------ +import Control.Applicative ((<$>), (<*>)) +import Control.Concurrent (forkIO) +import Control.Concurrent.Chan.Unagi (InChan, OutChan, newChan, + readChan, writeChan) +import Control.Concurrent.MVar (modifyMVar, newEmptyMVar, + newMVar, putMVar, takeMVar) +import Control.Exception (SomeException, mask, throwIO, + try) +import Control.Monad (forM_) +import Prelude hiding (read) +import System.IO.Streams.Internal (InputStream, OutputStream, + makeInputStream, + makeOutputStream, read) + + + +------------------------------------------------------------------------------ +-- | Writes the contents of an input stream to a channel until the input stream +-- yields end-of-stream. +inputToChan :: InputStream a -> InChan (Maybe a) -> IO () +inputToChan is ch = go + where + go = do + mb <- read is + writeChan ch mb + maybe (return $! ()) (const go) mb + + +------------------------------------------------------------------------------ +-- | Turns an 'OutChan' into an input stream. +-- +chanToInput :: OutChan (Maybe a) -> IO (InputStream a) +chanToInput ch = makeInputStream $! readChan ch + + +------------------------------------------------------------------------------ +-- | Turns an 'InChan' into an output stream. +-- +chanToOutput :: InChan (Maybe a) -> IO (OutputStream a) +chanToOutput = makeOutputStream . writeChan + + +------------------------------------------------------------------------------ +-- | Concurrently merges a list of 'InputStream's, combining values in the +-- order they become available. +-- +-- Note: does /not/ forward individual end-of-stream notifications, the +-- produced stream does not yield end-of-stream until all of the input streams +-- have finished. +-- +-- This traps exceptions in each concurrent thread and re-raises them in the +-- current thread. +concurrentMerge :: [InputStream a] -> IO (InputStream a) +concurrentMerge iss = do + mv <- newEmptyMVar + nleft <- newMVar $! length iss + mask $ \restore -> forM_ iss $ \is -> forkIO $ do + let producer = do + emb <- try $ restore $ read is + case emb of + Left exc -> do putMVar mv (Left (exc :: SomeException)) + producer + Right Nothing -> putMVar mv $! Right Nothing + Right x -> putMVar mv (Right x) >> producer + producer + makeInputStream $ chunk mv nleft + + where + chunk mv nleft = do + emb <- takeMVar mv + case emb of + Left exc -> throwIO exc + Right Nothing -> do x <- modifyMVar nleft $ \n -> + let !n' = n - 1 + in return $! (n', n') + if x > 0 + then chunk mv nleft + else return Nothing + Right x -> return x + + +-------------------------------------------------------------------------------- +-- | Create a new pair of streams using an underlying 'Chan'. Everything written +-- to the 'OutputStream' will appear as-is on the 'InputStream'. +-- +-- Since reading from the 'InputStream' and writing to the 'OutputStream' are +-- blocking calls, be sure to do so in different threads. +makeChanPipe :: IO (InputStream a, OutputStream a) +makeChanPipe = do + (inChan, outChan) <- newChan + (,) <$> chanToInput outChan <*> chanToOutput inChan diff -r 000000000000 -r e77b750dfbda unagi-streams.cabal --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/unagi-streams.cabal Mon Dec 08 22:31:25 2014 -0600 @@ -0,0 +1,28 @@ +name: unagi-streams +version: 0.1.0.0 +synopsis: Unagi Chan IO-Streams +description: IO-Streams implemented underneath with Unagi + channels. This library is a straight port of Greg Collins' IO-Streams + Chan implementation. +License: BSD3 +License-file: LICENSE +author: Luke Hoersten +maintainer: luke@hoersten.org +-- copyright: +category: System +build-type: Simple +-- extra-source-files: +cabal-version: >=1.10 + +library + exposed-modules: + System.IO.Streams.Concurrent.Unagi + -- other-modules: + -- other-extensions: + build-depends: + base >= 4.7 && < 4.8 + , unagi-chan >= 0.2 && < 0.3 + , io-streams >= 1.2 && < 1.3 + + hs-source-dirs: src/ + default-language: Haskell2010