Details

    • Type: New Feature
    • Status: Open
    • Priority: Minor
    • Resolution: Unresolved
    • Affects Version/s: 0.2.0
    • Fix Version/s: None
    • Component/s: Async
    • Labels:
      None
    • OS:
      Linux

      Description

      I needed a Process-based version of the Concurrently applicative from Control.Concurrent.Async. To implement it, I had to do some trivial rewriting of a few primitives to work with Process instead of IO (code below). Would it make sense to include these functions in Control.Distributed.Process.Platform.Async (given that it's already largely modeled after Control.Concurrent.Async)?

      By the way, I couldn't use CH's existing Async to implement these functions because those require the inner type be Serializable, and I wanted to be able to use these in an applicative, which requires that the inner type be unrestricted.

      race :: Process a -> Process b -> Process (Either a b)
      race left right = concurrently' left right collect
          where
            collect m = do
              e <- liftIO $ takeMVar m
      	case e of
                Left ex -> liftIO $ throwIO ex
                Right r -> return r
      
      concurrently :: Process a -> Process b -> Process (a,b)
      concurrently left right = concurrently' left right (collect [])
          where collect [Left a, Right b] _ = return (a,b)
                collect [Right b, Left a] _ = return (a,b)
                collect xs m = do
                  e <- liftIO $ takeMVar m
                  case e of
                    Left ex -> liftIO $ throwIO ex
                    Right r -> collect (r:xs) m
      
      concurrently' :: Process a -> Process b
                    -> (MVar (Either SomeException (Either a b)) -> Process r)
                    -> Process r
      concurrently' left right collect = do
        done <- liftIO newEmptyMVar
        mask $ \restore -> do
          lid <- spawnLocal $ restore (left >>= liftIO . putMVar done . Right . Left)
                                         `catch` (liftIO . putMVar done . Left)
          rid <- spawnLocal $ restore (right >>= liftIO . putMVar done . Right . Right)
                                         `catch` (liftIO . putMVar done . Left)
          let stop = kill lid "process died" >> kill rid "process died"
          r <- restore (collect done) `onException` stop
          stop
          return r
      

        Attachments

          Activity

            People

            • Assignee:
              hyperthunk Tim Watson [Administrator]
              Reporter:
              davidsd davidsd
            • Votes:
              0 Vote for this issue
              Watchers:
              2 Start watching this issue

              Dates

              • Created:
                Updated:

                Time Tracking

                Estimated:
                Original Estimate - 10 minutes
                10m
                Remaining:
                Remaining Estimate - 10 minutes
                10m
                Logged:
                Time Spent - Not Specified
                Not Specified