iteratee Concurrency with for free
John Lato
jwlato at gmail.com
Thu Jul 7 10:24:32 BST 2011
This is very nice; thanks very much for sharing. I'd certainly be willing
to mainline this if you like, however I've been thinking about adding an
iteratee-concurrent package with support for bounded TChans. These would be
a good fit for that package as well. Let me know your thoughts on this.
Best,
John
On Thu, Jul 7, 2011 at 4:35 AM, Michael Baikov <manpacket at gmail.com> wrote:
> If you have long Iteratee chain and each link in that chain takes
> approximately the same (reasonably large) time to process incoming
> chunk before feeding it further and those links do not share any
> complex state between them - you can gain some concurrency for free
> just by applying several transformers.
>
> Just by applying one parIi was able to get ~2x performance boost in
> threaded mode in a program which first
> does some complicated parsing and then does some complicating dumping
> of the results.
>
> The second program I tested this on gained ~40% performance, but
> that's because it was broken into uneven links.
>
>
>
>
> First let's import some stuff
>
> > module Data.Iteratee.Parallel ( psequence_ , psequence , parE , parI )
> where
>
> > import Control.Monad.IO.Class
> > import Control.Monad.Trans.Class
> > import Data.Iteratee as I hiding (mapM_, zip, filter)
> > import qualified Data.ListLike as LL
>
> > import Control.Concurrent
> > import Control.Exception
> > import Control.Monad (join, zipWithM)
> > import Data.Maybe (catMaybes)
> > import Data.Time.Clock (getCurrentTime, diffUTCTime)
>
> > -- | Transform usual Iteratee into parallel composable one, introducing
> one step extra delay
> > -- EEEEx - time spent in Enumerator working on x'th packet
> > -- IIIIx - time spent in Iteratee working on x'th packet
> > -- z - last packet, y = (z-1)'th packet
>
> this diagram looks awful in variable width font :)
>
> > -- regulular Iteratee: EEEE0 - IIII0, EEEE1 - IIII1, EEEE2 - IIII2
> .. EEEEz -> IIIIz
> > -- parallel Iteratee: EEEE0, EEEE1, EEEE2, .. EEEEz
> > -- \_ IIII0\_ IIII1\_ .. IIIIy\__ IIIIz
> > parI :: (Nullable s) => Iteratee s IO a -> Iteratee s IO a -- {{{
> > parI = liftI . firstStep
> > where
> >
> > -- first step, here we fork separete thread for the next chain
> and at the
> > -- same time ask for more date from the previous chain
> > firstStep iter chunk = do
> > var <- liftIO newEmptyMVar
> > _ <- sideStep var chunk iter
> > liftI $ go var
> >
> > -- somewhere in the middle, we are getting iteratee from previous
> step,
> > -- feeding it with some new data, asking for more data and
> starting
> > -- more processing in separete thread
> > go var chunk@(Chunk _) = do
> > iter <- liftIO $ takeMVar var
> > _ <- sideStep var chunk iter
> > liftI $ go var
> >
> > -- final step - no more data, so we need to inform our consumer
> about it
> > go var e = do
> > iter <- liftIO $ takeMVar var
> > join . lift $ enumChunk e iter
> >
> > -- forks away from the main computation, return results via MVar
> > sideStep var chunk iter = liftIO . forkIO $ runIter iter onDone
> onCont
> > where
> > onDone a s = putMVar var $ idone a s
> > onCont k _ = runIter (k chunk) onDone onFina
> > onFina k e = putMVar var $ icont k e
> > -- }}}
>
>
> > -- | Transform usual Iteratee into parallel composable one, introducing
> one step extra delay, see 'parI'
> > parE :: (Nullable s1, Nullable s2) => Enumeratee s1 s2 IO r -> Enumeratee
> s1 s2 IO r -- {{{
> > parE outer inner = parI (outer inner)
> > -- }}}
>
>
> > -- | Enumerate a list of iteratees over a single stream simultaneously
> > -- and discard the results. Each iteratee runs in a separete forkIO
> thread, passes all
> > -- errors from iteratees up.
> > psequence_ = I.sequence_ . map parI
>
>
> > -- | Enumerate a list of iteratees over a single stream simultaneously
> > -- and keeps the results. Each iteratee runs in a separete forkIO thread,
> passes all
> > -- errors from iteratees up.
> > psequence = I.sequence . map parI
>
>
>
> > -- some tests -- {{{
>
>
> tests looks ugly, but that's because i modified them a bit from the
> original code i wrote.
>
> > data FeedPacket = FeedPacket Int deriving (Show)
> >
> > _unusedOK :: [IO ()]
> > _unusedOK = [testSeq, testParE, testParI]
> >
> > testParE :: IO ()
> > testParE = bm $ mkTestPackets >>= processList (slowChain . slowChain $
> dumpAny "")
> >
> > testParI :: IO ()
> > testParI = bm $ mkTestPackets >>= processList (slowChain . slowChain $
> parI $ slowDumpAny "")
> >
> > testSeq :: IO ()
> > testSeq = bm $ mkTestPackets >>= processList ( is)
> > where
> > is = psequence_ [i1, i2]
> > i1 = joinI . slowEnum $ dumpAny "first "
> > i2 = joinI . slowEnum $ dumpAny "last "
> >
> > bm :: IO a -> IO ()
> > bm action = do
> > before <- getCurrentTime
> > _ <- action
> > after <- getCurrentTime
> > print $ after `diffUTCTime` before
> >
> > -- some test helpers {{{
> >
> > slowEnum :: Enumeratee [FeedPacket] [FeedPacket] IO a
> > slowEnum iter = liftI $ go iter
> > where
> > go k c@(Chunk _) = do
> > liftIO $ threadDelay (1000000 :: Int)
> > k' <- liftIO $ enumChunk c k
> > liftI $ go k'
> > go k e = idone k e
> >
> > mkTestPackets :: IO ([FeedPacket])
> > mkTestPackets = return $ map FeedPacket [1..10]
> >
> >
> > processList :: Iteratee [FeedPacket] IO () -> [FeedPacket] -> IO ()
> > processList iter [] = enumEof iter >> return ()
> > processList iter (p:ps) = do
> > putStrLn $ "<<< " ++ show p
> > enumChunk (Chunk [p]) iter >>= flip processList ps
> >
> > dumpAny :: (Show a, Nullable a) => String -> Iteratee a IO ()
> > dumpAny str = liftI go
> > where
> > go c@(Chunk _) = liftIO (putStrLn $ str ++ ">>> " ++ show c) >>
> liftI go
> > go e = idone () e
> >
> >
> > slowDumpAny :: (Show a, Nullable a) => String -> Iteratee a IO ()
> > slowDumpAny str = liftI go
> > where
> > go c@(Chunk _) = do
> > liftIO $ threadDelay (1000000 :: Int)
> > liftIO (putStrLn $ str ++ ">>> " ++ show c)
> > liftI go
> > go e = idone () e
> >
> > slowChain :: Iteratee [FeedPacket] IO a -> Iteratee [FeedPacket] IO a
> > slowChain = joinI . parE slowEnum
>
>
>
> > -- }}}
> > -- }}}
>
> _______________________________________________
> Iteratee mailing list
> Iteratee at projects.haskell.org
> http://projects.haskell.org/cgi-bin/mailman/listinfo/iteratee
>
-------------- next part --------------
An HTML attachment was scrubbed...
URL: <http://projects.haskell.org/pipermail/iteratee/attachments/20110707/8505c5e4/attachment-0001.htm>
More information about the Iteratee
mailing list