posts `for_` publish

Forking and ContT (I)


Introduction

This is the first article in a series about continuations, forking, and monad transformers. Next article.

Motivation

When using StateT or ReaderT over IO, we sometimes would like to fork and still remain in this “monadic context”, but alas:

main = flip evalStateT (0 :: Int) $ do
  modify (+1)
  liftIO $ forkIO $ do
    s <- get
    --   ^ type error: we are in IO,
    --     no state to `get`. d'oh!
    print s
  ..

For StateT this makes a good deal of sense - what does “forking” mean for the state? But that leaves two questions: 1) What about ReaderT? For that, the semantics should be unproblematic; 2) What if we do not even want to continue the stateful computation in main? I.e. we want to “move” the StateT Int IO instead of forking it.

Is this possible without any hassle of manually tracking state/environment and passing it between threads?

Continuations are a well-known tool for turning your ordinary control flow into headache-inducing control rapids. They can be seen as the “goto” of functional programming. For some background knowledge, I can recommend the following sources:

Considering just implementations in haskell, we can find the following monads:

  1. newtype ContT r m a = ContT { runContT :: (a -> m r) -> m r } from the transformers package;
  2. newtype CPS3 m a = CPS3 { unCPS3 :: (a -> m Void) -> m Void } mentioned in [6];
  3. newtype Codensity m a = Codensity { runCodensity :: forall b. (a -> m b) -> m b } e.g. in the kan-extensions package.
  4. newtype LogicT m a = LogicT { unLogicT :: forall r. (a -> m r -> m r) -> m r -> m r } from the logict package and the underlying paper [2].
  5. data FFree f a where Pure :: a -> FFree f a; Impure :: f x -> (x -> FFree f a) -> FFree f a from the “Freer Monads, More Extensible Effects” paper [8]

This article will focus on the deterministic, undelimited, but forkable continuations, and for now we will stick with the simplest/most permissive definition 1).

Forking is in haskell represented by the primitive forkIO :: IO () -> IO ThreadId operation. In the context of continuations however we can come up with more than one notion:

forkCont1       :: ContT () IO () -> ContT () IO ()  -- closest to forkIO
forkCont2       :: ContT () IO Bool                  -- more of a "classic" fork
moveToNewThread :: ContT () IO ()                 -- call continuation in forkIO

forkCont1 = liftIO . void . forkIO . evalContT
forkCont2 = ContT $ \c -> void (forkIO $ c False) >> c True
moveToNewThread  = ContT $ \c -> void $ forkIO $ c ()

These are not new inventions, for example this thread on reddit [3] implements at least the first two (their implementation is different, but intended semantics are same).

Being essentially the equivalent of forkIO, the first of these is not too interesting. Instead, we will have a closer look at moveToNewThread.

The moveToNewThread Primitive

moveToNewThread :: ContT () IO ()
moveToNewThread  = ContT $ \c -> void $ forkIO $ c ()

To clarify, some equivalences

evalContT $ do
  x <- k
  moveToNewThread
  l x
~
do
  x <- evalContT k
  void $ forkIO $ evalContT (l x)

reminder: evalContT :: Monad m => ContT r m r -> m r, i.e. the type of the whole expression must be IO ()

evalContT $ do
  moveToNewThread
  x <- k
  moveToNewThread
  l x
~
void $ forkIO $ do
  x <- evalContT k
  void $ forkIO $ do
    evalContT (l x)

This last example shows a bit how moveToNewThread “flattens” the notation of moving an execution to a different thread (similarly to how e.g. EitherT flattens a series of actions that return Eithers).

Below are some use-cases for this expressiveness. However, there is at least one more curious property to this, having to do with transformer stacks and lifting. For this, we look back to plain forkIO and how it interacts with lift (we will use StateT Int as some simple, specific transformer):

       forkIO ::                 IO () ->            IO ThreadId
lift . forkIO :: MonadTrans t => IO () -> t          IO ThreadId -- in general
lift . forkIO ::                 IO () -> StateT Int IO ThreadId -- specialized

Where the argument unfortunately still is IO () and not StateT Int IO (), so this does not work too well. For forkCont1, the base monad is ContT () IO, but the problem is essentially the same: forking “loses the state”. However:

lift forkCont2       :: MonadTrans t => t          (ContT () IO) Bool
lift moveToNewThread :: MonadTrans t => t          (ContT () IO) ()
-- or, specialized to our stack:
lift forkCont2       ::                 StateT Int (ContT () IO) Bool
lift moveToNewThread ::                 StateT Int (ContT () IO) ()

Isn’t that neat? If the base of our stack is ContT () IO, we can lift “forks” (and “moves”). This gives some expressiveness to transformer stacks I was not aware of previously. So while in general ContT can easily become more confusing than useful, this expressiveness is sufficient motivation to consider this a bit more closely. So how does this behave?

evalContT $ flip (evalStateT s) $ do
  x <- k
  lift moveToNewThread
  l x
~
do
  (x, s') <- evalContT $ runStateT k s
  void $ forkIO $ do
    evalContT $ evalStateT (l x) s'

Note how we have to manually carry over the intermediate state s' to the forked thread when using forkIO, while we only need one top-level evalStateT with moveToNewThread.

Applications, in ascending complexity:

First application: Blocking, but not

This is a straight-forward translation of using callbacks to cope with blocking operations: We can conditionally fork if we are about to block the current thread. For example, a new version of readMVar:

readMVarMoving :: MVar a -> ContT () IO a
readMVarMoving mvar = liftIO (tryReadMVar mvar) >>= \case
  Just a  -> pure a                       -- phew, avoided blocking, and forking
  Nothing -> moveToNewThread >> liftIO (readMVar mvar)

So while we still block, we don’t block the “initial” thread (the one calling evalContT or similar). In an event processing loop, we might write something like

events `forM_` \case
  ImportantEvent x -> evalContT $ do
    y <- readMVarMoving relevantResource
    z <- lightweightProcess x y
    reply z

where the MVar will never block this whole loop, and we only fork when it is necessary. However there is a downside: If we use readMVarMoving multiple times, we create a new thread for each time we block. We could just fork once and then block without worrying about blocking the main loop. Which of these two approaches ultimately results in less forks globally depends on the rate at which the tryReadMVar succeeds (i.e. on contention of our resources in general). Plus, threads are cheap in haskell, so the gains might be irrelevant either way.

I am not aware of any library that provides such versions of common blocking functions (yet).

Second application: Moving between Threads

We can create worker threads that evaluate continuations passed to them, e.g. using

contProcessor :: Chan (IO ()) -> IO ()
contProcessor = forever . join . readChan

createContWorker :: IO (Chan (IO ()))
createContWorker = do
  chan <- newChan
  _    <- forkIO $ contProcessor chan
  pure chan

moveToProcessor :: Chan (IO ()) -> ContT () IO ()
moveToProcessor chan = ContT $ \c -> writeChan chan (c ())

So moveToProcessor behaves like moveToNewThread but instead of a fresh thread, in moves to a specific worker thread. This might come in handy if we have thread-specific resources (FFI stuff). We might also use this construct as a synchronization/critical section/resource ownership mechanism, although I have a hard time coming up with examples where this would have advantages over simpler approaches to locking resources. Maybe something with exception-handling around the resources that “belong” to this worker thread? But exception-handling deserves its own, large chapter (read: its own article).

Third Application: Moving with Transformer-Stacks

(some liftIOs omitted)

evalContT $ flip evalStateT (0 :: Int) $ do
  modify (+1)
  get >>= \x -> putStrLn ("from main thread: our current state is " ++ show x)
  modify (+1)
  lift $ moveToNewThread
  modify (+1)
  get >>= \x -> putStrLn ("from new  thread: our current state is " ++ show x)

will output

from main thread: our current state is 1
from new  thread: our current state is 3

Fourth Application: Forking with Transformer-Stacks

We can also truly fork (can you guess how this behaves?) (liftIOs omitted)

main :: IO ()
main = evalContT $ flip evalStateT (0 :: Int) $ do
  lift forkCont2 >>= \case
    False -> do
      modify (+ 1)
      threadDelay 1000000
    True -> do
      modify (+ 2)
      threadDelay 2000000
  get >>= print

yeah?

Solution: It prints 1 after 1 second and 2 after 2 seconds. I.e. the state indeed splits, somewhat in the fashion of non-determinism monads such as LogicT. This may be undesired and become confusing quickly, so it might make sense to restrict the usage around this and either:

  1. Only use this around “harmless” stacks; e.g. ReaderT is fine;
  2. Don’t use the “true forking” functions, but permit those that call the continuation exactly once (moveToNewThread, moveToProcessor), which are still completely fine.

Fifth Application: Stateful Computations in Worker-Threads

This is essentially a combination of applications two and three. (liftIOs omitted)

contProcessor :: Chan (IO ()) -> IO ()
createContWorker :: IO (Chan (IO ()))
moveToProcessor :: Chan (IO ()) -> ContT () IO ()
-- implemented as above

main :: IO ()
main = do
  chan <- createContWorker
  evalContT $ flip evalStateT (0 :: Int) $ do
    modify (+1)
    get >>= \x -> putStrLn ("from main thread: our current state is " ++ show x)
    modify (+1)
    lift $ moveToProcessor chan
    modify (+1)
    get >>= \x -> putStrLn ("from worker thread: our current state is " ++ show x)
from main thread: our current state is 1
from worker thread: our current state is 3

The interesting bit about this is that: The worker thread exposes and runs a plain ContT () IO () interface, but we still are able to pass it a stateful computation via lifting. And there is nothing stopping us from expanding on this some more:

Generalizations and Conclusion

We can apply at least two generalizations. Firstly the typical “mtl” approach, using this class:

-- capture any monad stacks with a base of `ContT () IO`.
class MonadIO m => MonadContIO m where
  liftContIO :: ContT () IO a -> m a

Secondly we can generalize over forkIO. Together, we get:

forkCont2With :: MonadContIO m => (IO () -> IO ()) -> m Bool
forkCont2With forker = liftContIO $ ContT $ \c -> forker (c False) >> c True
moveContWith :: MonadContIO m => (IO () -> IO ()) -> m ()
moveContWith forker = liftContIO $ ContT $ \c -> forker (c ())

Now moveToNewThread can be implemented as moveContWith (void . forkIO).

To answer the motivation:

Summary

main = flip evalStateT (0 :: Int) $ do
  modify (+1)
  liftIO $ forkIO $ do
    s <- get
    --   ^ type error: we are in IO,
    --     no state to `get`. d'oh!
    print s
  ..

is fixed using

main = do
  evalContT $ flip (evalStateT (0 :: Int)) $ do
    modify (+1)
    moveToNewThread
    s <- get -- it compiles, so it must work! (TM of haskell hype industries)
    liftIO $ print s
  ..

And:

  • forkCont2With and moveContWith fork/move arbitrary monad stacks with the ContT () IO base.
  • Can fork state, for good or worse. That is, we accidentally got an answer to the “what does forking mean for state” question.
  • We somewhat have to assume that we are working with undelimited continuations, i.e. mixing of fork/move and callCC/resetT probably can become rather confusing (to be discussed).
  • Exceptions will blow up the current thread only. An evalContT wrapped in some catch will actually not catch an exception thrown after a moveToNewThread (to be discussed).
  • The “host” of continuations (in the simplest case forkIO) requires only the power of running plain IO (), even when our “client” monad stack contains arbitrary transformers. The host is independent of those transformers (and is blissfully unaware of their internal state).
  • As a trivial consequence, a host can run a series of continuations that have different monads (stacks).

Outlook

(or should I say: To be Continued With:)

The next article in this series will introduce an implementation for withLifted.

Addenda

forkSequential

moveToNewThread and forkCont2 share certain functionality, so a natural question is if we can express one in terms of the other. In that direction, we define:

forkSequential :: ContT r IO Bool
forkSequential = ContT $ \c -> c True >> c False

and with that

forkCont2 :: ContT () IO Bool
forkCont2 = do
  isChild <- forkSequential
  when isChild moveToNewThread
  pure isChild

And we can also go from 2 to n:

forkSequentialN :: Int -> ContT () IO Int
forkSequentialN n = ContT $ forM_ [1 .. n]

What about the ContT transformer?

How does t (ContT () IO) fare versus ContT () (t IO)? The latter seems nicer in that it does not force us to switch the base of our transformer stack. Unfortunately, this does not work. Just consider how we would have to define our simplest forking function:

moveToNewThreadMIO :: MonadIO m => ContT () m ()
moveToNewThreadMIO = ContT $ \c -> liftIO $ void $ forkIO $ _ $ c ()
--                                         problem:         ^ :: m () -> IO ()

Replacing m with t IO does not help:

moveToNewThreadMIO :: ? => ContT () (t IO) ()
moveToNewThreadMIO = ContT $ \c -> liftIO $ void $ forkIO $ _ $ c ()
-- i can't think of any constraints that would enable       ^:: t IO () -> IO ()

withLifted

The riddle consists of the following signature:

withLifted
  :: MonadContIO m => (forall r . (a -> IO r) -> IO r) -> (a -> m b) -> m b

We are looking for an implementation so that

mockWith :: String -> (() -> IO a) -> IO a
mockWith s c = do
  putStrLn $ "acquire " ++ s
  x <- c ()
  putStrLn $ "release " ++ s
  pure x

main :: IO ()
main = evalContT $ flip evalStateT (0 :: Int) $ do
  withLifted (mockWith "a") $ \() -> do
    withLifted (mockWith "b") $ \() -> do
      modify (+ 1)
  get >>= liftIO . print

prints

acquire a
acquire b
release b
release a
1

That is: We want to lift a “with-function” to an arbitrary transformer stack if the base monad is ContT () IO. If you take into account that the “host” only needs to “speak IO”, and the challenge here is to pass a monad stack through an “IO gate”, this is an interesting but solvable riddle.

Hint: You don’t even need forkIO, but you need an MVar.

References

Posted by Lennart Spitzner on 2018-09-09. Feedback to (blog at thisdomain) welcome!
Tags: . Source.