iteratee Concurrency with for free

John Lato jwlato at gmail.com
Thu Jul 7 10:24:32 BST 2011


This is very nice; thanks very much for sharing.  I'd certainly be willing
to mainline this if you like, however I've been thinking about adding an
iteratee-concurrent package with support for bounded TChans.  These would be
a good fit for that package as well.  Let me know your thoughts on this.

Best,
John

On Thu, Jul 7, 2011 at 4:35 AM, Michael Baikov <manpacket at gmail.com> wrote:

> If you have long Iteratee chain and each link in that chain  takes
> approximately the same (reasonably large) time to process incoming
> chunk before feeding it further and those links do not share any
> complex state  between them - you can gain some concurrency for free
> just by applying several transformers.
>
> Just by applying one parIi was able to get ~2x performance boost in
> threaded mode in a program which first
> does some complicated parsing and then does some complicating dumping
> of the results.
>
> The second program I tested this on gained ~40% performance, but
> that's because it was broken into uneven links.
>
>
>
>
> First let's import some stuff
>
> > module Data.Iteratee.Parallel ( psequence_ , psequence , parE , parI )
> where
>
> > import Control.Monad.IO.Class
> > import Control.Monad.Trans.Class
> > import Data.Iteratee as I hiding (mapM_, zip, filter)
> > import qualified Data.ListLike as LL
>
> > import Control.Concurrent
> > import Control.Exception
> > import Control.Monad (join, zipWithM)
> > import Data.Maybe (catMaybes)
> > import Data.Time.Clock (getCurrentTime, diffUTCTime)
>
> > -- | Transform usual Iteratee into parallel composable one, introducing
> one step extra delay
> > -- EEEEx - time spent in Enumerator working on x'th packet
> > -- IIIIx - time spent in Iteratee working on x'th packet
> > -- z - last packet, y = (z-1)'th packet
>
> this diagram  looks awful in variable width font :)
>
> > -- regulular Iteratee: EEEE0 - IIII0,  EEEE1 - IIII1,  EEEE2 - IIII2
>    .. EEEEz -> IIIIz
> > -- parallel  Iteratee: EEEE0,  EEEE1,  EEEE2,       .. EEEEz
> > --                          \_ IIII0\_ IIII1\_      .. IIIIy\__ IIIIz
> > parI :: (Nullable s) => Iteratee s IO a -> Iteratee s IO a -- {{{
> > parI = liftI . firstStep
> >     where
> >
> >         -- first step, here we fork separete thread for the next chain
> and at the
> >         -- same time ask for more date from the previous chain
> >         firstStep iter chunk = do
> >             var <- liftIO newEmptyMVar
> >             _   <- sideStep var chunk iter
> >             liftI $ go var
> >
> >         -- somewhere in the middle, we are getting iteratee from previous
> step,
> >         -- feeding it with some new data, asking for more data and
> starting
> >         -- more processing in separete thread
> >         go var chunk@(Chunk _) = do
> >             iter <- liftIO $ takeMVar var
> >             _    <- sideStep var chunk iter
> >             liftI $ go var
> >
> >         -- final step - no more data, so  we need to inform our consumer
> about it
> >         go var e = do
> >             iter <- liftIO $ takeMVar var
> >             join . lift $ enumChunk e iter
> >
> >         -- forks away from the main computation, return results via MVar
> >         sideStep var chunk iter = liftIO . forkIO $ runIter iter onDone
> onCont
> >             where
> >                 onDone a s = putMVar var $ idone a s
> >                 onCont k _ = runIter (k chunk) onDone onFina
> >                 onFina k e = putMVar var $ icont k e
> > -- }}}
>
>
> > -- | Transform usual Iteratee into parallel composable one, introducing
> one step extra delay, see 'parI'
> > parE :: (Nullable s1, Nullable s2) => Enumeratee s1 s2 IO r -> Enumeratee
> s1 s2 IO r -- {{{
> > parE outer inner = parI (outer inner)
> > -- }}}
>
>
> > -- | Enumerate a list of iteratees over a single stream simultaneously
> > -- and discard the results. Each iteratee runs in a separete forkIO
> thread, passes all
> > -- errors from iteratees up.
> > psequence_ = I.sequence_ . map parI
>
>
> > -- | Enumerate a list of iteratees over a single stream simultaneously
> > -- and keeps the results. Each iteratee runs in a separete forkIO thread,
> passes all
> > -- errors from iteratees up.
> > psequence = I.sequence . map parI
>
>
>
> > -- some tests -- {{{
>
>
> tests looks ugly, but that's because i modified them a bit from the
> original code i wrote.
>
> > data FeedPacket = FeedPacket Int deriving (Show)
> >
> > _unusedOK :: [IO ()]
> > _unusedOK = [testSeq, testParE, testParI]
> >
> > testParE :: IO ()
> > testParE = bm $ mkTestPackets >>= processList (slowChain . slowChain $
> dumpAny "")
> >
> > testParI :: IO ()
> > testParI = bm $ mkTestPackets >>= processList (slowChain . slowChain $
> parI $ slowDumpAny "")
> >
> > testSeq :: IO ()
> > testSeq = bm $ mkTestPackets >>= processList ( is)
> >     where
> >         is = psequence_ [i1, i2]
> >         i1 = joinI . slowEnum $ dumpAny "first "
> >         i2 = joinI . slowEnum $ dumpAny "last  "
> >
> > bm :: IO a -> IO ()
> > bm action = do
> >     before <- getCurrentTime
> >     _      <- action
> >     after  <- getCurrentTime
> >     print $ after `diffUTCTime` before
> >
> > --  some test helpers {{{
> >
> > slowEnum :: Enumeratee [FeedPacket] [FeedPacket] IO a
> > slowEnum iter = liftI $ go iter
> >     where
> >         go k c@(Chunk _) = do
> >             liftIO $ threadDelay (1000000 :: Int)
> >             k' <- liftIO $ enumChunk c k
> >             liftI $ go k'
> >         go k e = idone k e
> >
> > mkTestPackets :: IO ([FeedPacket])
> > mkTestPackets =  return $ map FeedPacket [1..10]
> >
> >
> > processList :: Iteratee [FeedPacket] IO () -> [FeedPacket] -> IO ()
> > processList iter [] = enumEof iter >> return ()
> > processList iter (p:ps) = do
> >     putStrLn $ "<<< " ++ show p
> >     enumChunk (Chunk [p]) iter >>= flip processList ps
> >
> > dumpAny :: (Show a, Nullable a) => String -> Iteratee a IO ()
> > dumpAny str = liftI go
> >     where
> >         go c@(Chunk _) = liftIO (putStrLn $ str ++ ">>> " ++ show c) >>
> liftI go
> >         go e = idone () e
> >
> >
> > slowDumpAny :: (Show a, Nullable a) => String -> Iteratee a IO ()
> > slowDumpAny str = liftI go
> >     where
> >         go c@(Chunk _) = do
> >             liftIO $ threadDelay (1000000 :: Int)
> >             liftIO (putStrLn $ str ++ ">>> " ++ show c)
> >             liftI go
> >         go e = idone () e
> >
> > slowChain :: Iteratee [FeedPacket] IO a -> Iteratee [FeedPacket] IO a
> > slowChain = joinI . parE slowEnum
>
>
>
> > -- }}}
> > -- }}}
>
> _______________________________________________
> Iteratee mailing list
> Iteratee at projects.haskell.org
> http://projects.haskell.org/cgi-bin/mailman/listinfo/iteratee
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://projects.haskell.org/pipermail/iteratee/attachments/20110707/8505c5e4/attachment-0001.htm>


More information about the Iteratee mailing list