iteratee Concurrency with for free
Michael Baikov
manpacket at gmail.com
Thu Jul 7 11:12:06 BST 2011
Sure, you can add this to the library.
Bounded TChans can be useful to add asynchrony in concurrent
iteratee/enumeratee processing, but it also will add one more
dependency to the package. This code has no complex dependences and
can be put in somewhere into Data.Iteratee.Parallel with slight
adjustments (for example i am not sure that exception on the final
step will not cause any problems).
I tried (but not very had :) ) implement parI (actually the first
thing i implemented was psequence_) using Chans with synchronization
at each step, but got memory leaks so i came with this solution, which
looks nicer and simpler to me.
O Thu, Jul 7, 2011 at 6:24 PM, John Lato <jwlato at gmail.com> wrote:
> This is very nice; thanks very much for sharing. I'd certainly be willing
> to mainline this if you like, however I've been thinking about adding an
> iteratee-concurrent package with support for bounded TChans. These would be
> a good fit for that package as well. Let me know your thoughts on this.
> Best,
> John
> On Thu, Jul 7, 2011 at 4:35 AM, Michael Baikov <manpacket at gmail.com> wrote:
>>
>> If you have long Iteratee chain and each link in that chain takes
>> approximately the same (reasonably large) time to process incoming
>> chunk before feeding it further and those links do not share any
>> complex state between them - you can gain some concurrency for free
>> just by applying several transformers.
>>
>> Just by applying one parIi was able to get ~2x performance boost in
>> threaded mode in a program which first
>> does some complicated parsing and then does some complicating dumping
>> of the results.
>>
>> The second program I tested this on gained ~40% performance, but
>> that's because it was broken into uneven links.
>>
>>
>>
>>
>> First let's import some stuff
>>
>> > module Data.Iteratee.Parallel ( psequence_ , psequence , parE , parI )
>> > where
>>
>> > import Control.Monad.IO.Class
>> > import Control.Monad.Trans.Class
>> > import Data.Iteratee as I hiding (mapM_, zip, filter)
>> > import qualified Data.ListLike as LL
>>
>> > import Control.Concurrent
>> > import Control.Exception
>> > import Control.Monad (join, zipWithM)
>> > import Data.Maybe (catMaybes)
>> > import Data.Time.Clock (getCurrentTime, diffUTCTime)
>>
>> > -- | Transform usual Iteratee into parallel composable one, introducing
>> > one step extra delay
>> > -- EEEEx - time spent in Enumerator working on x'th packet
>> > -- IIIIx - time spent in Iteratee working on x'th packet
>> > -- z - last packet, y = (z-1)'th packet
>>
>> this diagram looks awful in variable width font :)
>>
>> > -- regulular Iteratee: EEEE0 - IIII0, EEEE1 - IIII1, EEEE2 - IIII2
>> > .. EEEEz -> IIIIz
>> > -- parallel Iteratee: EEEE0, EEEE1, EEEE2, .. EEEEz
>> > -- \_ IIII0\_ IIII1\_ .. IIIIy\__ IIIIz
>> > parI :: (Nullable s) => Iteratee s IO a -> Iteratee s IO a -- {{{
>> > parI = liftI . firstStep
>> > where
>> >
>> > -- first step, here we fork separete thread for the next chain
>> > and at the
>> > -- same time ask for more date from the previous chain
>> > firstStep iter chunk = do
>> > var <- liftIO newEmptyMVar
>> > _ <- sideStep var chunk iter
>> > liftI $ go var
>> >
>> > -- somewhere in the middle, we are getting iteratee from
>> > previous step,
>> > -- feeding it with some new data, asking for more data and
>> > starting
>> > -- more processing in separete thread
>> > go var chunk@(Chunk _) = do
>> > iter <- liftIO $ takeMVar var
>> > _ <- sideStep var chunk iter
>> > liftI $ go var
>> >
>> > -- final step - no more data, so we need to inform our consumer
>> > about it
>> > go var e = do
>> > iter <- liftIO $ takeMVar var
>> > join . lift $ enumChunk e iter
>> >
>> > -- forks away from the main computation, return results via MVar
>> > sideStep var chunk iter = liftIO . forkIO $ runIter iter onDone
>> > onCont
>> > where
>> > onDone a s = putMVar var $ idone a s
>> > onCont k _ = runIter (k chunk) onDone onFina
>> > onFina k e = putMVar var $ icont k e
>> > -- }}}
>>
>>
>> > -- | Transform usual Iteratee into parallel composable one, introducing
>> > one step extra delay, see 'parI'
>> > parE :: (Nullable s1, Nullable s2) => Enumeratee s1 s2 IO r ->
>> > Enumeratee s1 s2 IO r -- {{{
>> > parE outer inner = parI (outer inner)
>> > -- }}}
>>
>>
>> > -- | Enumerate a list of iteratees over a single stream simultaneously
>> > -- and discard the results. Each iteratee runs in a separete forkIO
>> > thread, passes all
>> > -- errors from iteratees up.
>> > psequence_ = I.sequence_ . map parI
>>
>>
>> > -- | Enumerate a list of iteratees over a single stream simultaneously
>> > -- and keeps the results. Each iteratee runs in a separete forkIO
>> > thread, passes all
>> > -- errors from iteratees up.
>> > psequence = I.sequence . map parI
>>
>>
>>
>> > -- some tests -- {{{
>>
>>
>> tests looks ugly, but that's because i modified them a bit from the
>> original code i wrote.
>>
>> > data FeedPacket = FeedPacket Int deriving (Show)
>> >
>> > _unusedOK :: [IO ()]
>> > _unusedOK = [testSeq, testParE, testParI]
>> >
>> > testParE :: IO ()
>> > testParE = bm $ mkTestPackets >>= processList (slowChain . slowChain $
>> > dumpAny "")
>> >
>> > testParI :: IO ()
>> > testParI = bm $ mkTestPackets >>= processList (slowChain . slowChain $
>> > parI $ slowDumpAny "")
>> >
>> > testSeq :: IO ()
>> > testSeq = bm $ mkTestPackets >>= processList ( is)
>> > where
>> > is = psequence_ [i1, i2]
>> > i1 = joinI . slowEnum $ dumpAny "first "
>> > i2 = joinI . slowEnum $ dumpAny "last "
>> >
>> > bm :: IO a -> IO ()
>> > bm action = do
>> > before <- getCurrentTime
>> > _ <- action
>> > after <- getCurrentTime
>> > print $ after `diffUTCTime` before
>> >
>> > -- some test helpers {{{
>> >
>> > slowEnum :: Enumeratee [FeedPacket] [FeedPacket] IO a
>> > slowEnum iter = liftI $ go iter
>> > where
>> > go k c@(Chunk _) = do
>> > liftIO $ threadDelay (1000000 :: Int)
>> > k' <- liftIO $ enumChunk c k
>> > liftI $ go k'
>> > go k e = idone k e
>> >
>> > mkTestPackets :: IO ([FeedPacket])
>> > mkTestPackets = return $ map FeedPacket [1..10]
>> >
>> >
>> > processList :: Iteratee [FeedPacket] IO () -> [FeedPacket] -> IO ()
>> > processList iter [] = enumEof iter >> return ()
>> > processList iter (p:ps) = do
>> > putStrLn $ "<<< " ++ show p
>> > enumChunk (Chunk [p]) iter >>= flip processList ps
>> >
>> > dumpAny :: (Show a, Nullable a) => String -> Iteratee a IO ()
>> > dumpAny str = liftI go
>> > where
>> > go c@(Chunk _) = liftIO (putStrLn $ str ++ ">>> " ++ show c) >>
>> > liftI go
>> > go e = idone () e
>> >
>> >
>> > slowDumpAny :: (Show a, Nullable a) => String -> Iteratee a IO ()
>> > slowDumpAny str = liftI go
>> > where
>> > go c@(Chunk _) = do
>> > liftIO $ threadDelay (1000000 :: Int)
>> > liftIO (putStrLn $ str ++ ">>> " ++ show c)
>> > liftI go
>> > go e = idone () e
>> >
>> > slowChain :: Iteratee [FeedPacket] IO a -> Iteratee [FeedPacket] IO a
>> > slowChain = joinI . parE slowEnum
>>
>>
>>
>> > -- }}}
>> > -- }}}
>>
>> _______________________________________________
>> Iteratee mailing list
>> Iteratee at projects.haskell.org
>> http://projects.haskell.org/cgi-bin/mailman/listinfo/iteratee
>
>
More information about the Iteratee
mailing list