Thursday, April 28, 2016

History of Coroutine in Python

A coroutine is a kind of function that can suspend and resume its execution at various pre-defined locations in its code. Subroutines are a special case of coroutines that have just a single entry point and complete their execution by returning to their caller. Python’s coroutines (both the existing generator-based and the newly proposed variety) are not fully general, either, since they can only transfer control back to their caller when suspending their execution, as opposed to switching to some other coroutine as they can in the general case. When coupled with an event loop, coroutines can be used to do asynchronous processing, I/O in particular.

Python’s current coroutine support is based on the enhanced generators from PEP 342, which was adopted into Python 2.5. That PEP changed the yield statement to be an expression, added several new methods for generators (send(), throw(), and close()), and ensured that close() would be called when generators get garbage-collected. That functionality was further enhanced in Python 3.3 with PEP 380, which added the yield from expression to allow a generator to delegate some of its functionality to another generator (i.e. a sub-generator).

At first croutine is based on generator in Python. A generator is a function that produces a sequence of results instead of a single value. When a generator function is called, it returns a generator object without even beginning execution of the function. When next method is called for the first time, the function starts executing until it reaches yield statement. The yielded value is returned by the next call.

def yrange(n):
    i = 0
    x = 1
    while i < n:
        yield i+x
        #return i and pause here waiting for instruction from caller.
        i += 1

if __name__ = “__main__":
    g = yrange(6)
    print(next(g))

Generator will return a value and pause there, all related context is the same when it is resumed again by next() method. Though at first generator is used to create memory-friendly iterator. But its characteristic is very suitable to realize a croutine.

Caller function can use next() as communication pipe to notice generator to run and generator will pause at yield and return value to caller.

But what if caller want to pass an input as parameter for generator to use during its run. E.g. generator provides HTML parsing function, while caller loads the content of a website and pass the content to generator to parse.

By using global variable, it is possible to do this. A modified version of code above:

def yrange(n):
    i = 0
    global x
    while i < n:
        yield i+x
        #return i and pause here waiting for instruction from caller.
        i += 1

if __name__ = “__main__":
    x = 1
    g = yrange(6)
    print(next(g))
    x = 2
    print(next(g))

This solution is quite ugly because usually we want to avoid use of global variable. The safest way for programming is to pass value to function as parameter and return value to caller to avoid use state machine as a control mechanism.

Since Python 2.5, yield is changed as expression and behaves like a bidirectional communication tool. Generator has send(), next(), throw(), close() methods.

PEP342 gives a very detailed explanation about this:

Specification Summary
By adding a few simple methods to the generator-iterator type, and
with two minor syntax adjustments, Python developers will be able
to use generator functions to implement co-routines and other forms
of co-operative multitasking. These methods and adjustments are:

  1. Redefine yield to be an expression, rather than a statement.
    The current yield statement would become a yield expression
    whose value is thrown away. A yield expression’s value is
    None whenever the generator is resumed by a normal next() call.

  2. Add a new send() method for generator-iterators, which resumes
    the generator and “sends” a value that becomes the result of the
    current yield-expression. The send() method returns the next
    value yielded by the generator, or raises StopIteration if the
    generator exits without yielding another value.

  3. Add a new throw() method for generator-iterators, which raises
    an exception at the point where the generator was paused, and
    which returns the next value yielded by the generator, raising
    StopIteration if the generator exits without yielding another
    value. (If the generator does not catch the passed-in exception,
    or raises a different exception, then that exception propagates
    to the caller.)

  4. Add a close() method for generator-iterators, which raises
    GeneratorExit at the point where the generator was paused. If
    the generator then raises StopIteration (by exiting normally, or
    due to already being closed) or GeneratorExit (by not catching
    the exception), close() returns to its caller. If the generator
    yields a value, a RuntimeError is raised. If the generator
    raises any other exception, it is propagated to the caller.
    close() does nothing if the generator has already exited due to
    an exception or normal exit.

  5. Add support to ensure that close() is called when a generator
    iterator is garbage-collected.

  6. Allow yield to be used in try/finally blocks, since garbage
    collection or an explicit close() call would now allow the
    finally clause to execute.

yield can not only pop a value to caller but also accept value from caller by send() method.

yield expression is evaluated as None if next() is called. So basically next() equals send(None).

Another example is as below:

def corout1(n):
    for x in range(n):
        yield x

def jumping_range(up_to):
    """Generator for the sequence of integers from 0 to up_to, exclusive.
    Sending a value into the generator will shift the sequence by that amount.
    """
    index = 0
    global jump
    while index < up_to:
        jump = yield index
        #print('step is {0}'.format(jump))
        if jump is None:
            jump = 1
        index += jump

if __name__ == '__main__':
    jump = None
    iterator = jumping_range(5)
    print(next(iterator))  # 0
    print('step is {0}'.format(jump))
    print(iterator.send(2))  # 2
    print('step is {0}'.format(jump))
    print(next(iterator))  # 3
    print('step is {0}'.format(jump))
    print(iterator.send(-1))  # 2
    print('step is {0}'.format(jump))
    for x in iterator:
        print(x)  # 3, 4
    gen1 = corout1(6)
    print(next(gen1))
    print(gen1.send('sldf')) #you can send any value

Here I use global variable jump as a probe to detect the inner status of the generator.
Output:

In [1]: run test2.py
0
step is None
2
step is 2
3
step is 1
2
step is -1
3
4
0
1

Here jump = yield index is assignment of yield expression value to jump. If next() is called, yield index will be evaluated as None and generator will continue to run until it hits yield expression again. When the generator is paused, assignment won’t be conducted, the assignment of jump is the first step to be run for next iteration.

Now since next() equals send(None), you can even avoid use of next() to keep consistence. Now yield expression behaviors like a transceiver, it first accept value from caller through send() method and when yield expression is hit again, it will throw the value in yield expression to its caller.

This should works fine as long as you fully understand the mechanism of yield in generator.

In PEP380, yield from is introduced to simplify pipeline of coroutines.

coroutine is not limited to communication only with its caller. It can also send data to another coroutine based input from its caller. This forms pipeline of coroutines.

def writer():
    """A coroutine that writes data *sent* to it to fd, socket, etc."""
    z = 'a'
    while True:
        w = (yield z)
        z = chr(97+w)
        print('>> ', w)
def filter(coro):
    coro.send(None)  # prime the coro
    i = 'hello'
    while True:
        try:
            x = (yield i)  # Capture the value that's sent
            i = coro.send(x)  # and pass it to the writer
        except StopIteration:
            pass

if __name__=="__main__":
    g = writer()
    f = filter(g)
    print(f.send(None))  #prime f
    print(f.send(7))    

Output:

In [16]: run test3.py
hello
>>  7
h

In order to capture exception, send data to coroutine and get result from coroutine, there are quite a bit of codes to handle.

yield from will take take of exception handling, communication between coroutines, which simplify the code a lot.

Just like yield, yield from is bidirectional operation. A revised version using yield from is like below:

def writer():
    """A coroutine that writes data *sent* to it to fd, socket, etc."""
    z = 'a'
    while True:
        w = (yield z)
        z = chr(97+w)
        print('>> ', w)
def filter(coro):
    #coro.send(None)  # prime the coro
    i = 'hello'
    i = yield from coro

if __name__=="__main__":
    g = writer()
    f = filter(g)
    print(f.send(None))  #prime f
    print(f.send(7))

Output:

In [18]: run test4.py
a
>>  7
h

As you can see, it is much easier to organize coroutine code with yield from keyword.

generator based coroutine together with event loop bring ability for async programming.

It is possible to use return in generator.

This is a new feature in Python 3.3 (as a comment notes, it doesn’t even work in 3.2). Much like return in a generator has long been equivalent to raise StopIteration(), return something in a generator is now equivalent to raise StopIteration(something). For that reason, the exception you’re seeing should be printed as StopException: 3, and the value is accessible through the attribute value on the exception object. If the generator is delegated to using the (also new) yield from syntax, it is the result. See PEP 380 for details.

def f():
    return 1
    yield 2

def g():
    x = yield from f()
    print(x)

g() # prints 1

Generator based coroutine is kind of confusing in terms of grammar. It is quite hard to understanding the bidirectional communication mechanism of yield/yield from without some deep introduction like this article. And generator is first introduced to bring one at a time iterator concept, then yield is revised to make generator suitable for coroutine concept.

async/await keywords are introduced since Python 3.5 to make grammar of coroutine programming more meaningful. await equals yield from. So you can either use generator based coroutine or async/await defined coroutine for async/concurrency environment.

Non-blocking program in Python

Async programming is often related with I/O bound or CPU bound tasks. I/O bound task means the code will wait for reading data from another process or thread for quite a time such as page content sent back from server or file content sent back from disk I/O program. CPU bound task often the code waits for results from another process that does heavy computing task.

So async programming naturally contains communications between different processes(thread is not efficient due to GIL). Current running coroutine will pause at where results are needed from another process. Then event loop will take control and send signal to another coroutine to run.

So the coroutine has to be non-blocking, which means it needs to check the status of outer process. If the results is ready, then it will run some code to process the results, otherwise it will yield again to give up the running privilege. The outer process has to be able to provide running status, so that the coroutine can be written in non-blocking style.

It is possible to mimic the async programming through subprocess module. The subprocess module can spawn a process from current process, which is executed independent from current process and can be used to mimic the process of loading content from a remote server or reading data from local disk.

server_subp.py simply sleep n seconde and print ‘finished’ as result. When client get the response from server,

import sys
import time

if __name__ == '__main__':
    n = int(sys.argv[1])
    time.sleep(n)
    print('finished!')

client_subp.py

import subprocess
import time

def nb_read(t):
    process = subprocess.Popen(['python3', 'server_subp.py', t], shell=False, stdin=subprocess.PIPE, stdout=subprocess.PIPE, universal_newlines=True)

    while process.poll() is None:
        yield None
    yield process.communicate()

task1 = nb_read('5')
task2 = nb_read('3')
fin1 = False
fin2 = False

while 1:
    if not fin1:
        res1 = task1.send(None)
        if res1 != None:
            print('task1:'+res1[0])
            fin1 = True
            task1.close()
        else:
            print('still working on task1')
    if not fin2:
        res2 = task2.send(None)
        if res2 != None:
            print('task2:'+res2[0])
            fin2 = True
            task2.close()
        else:
            print('still working on task2')
    if fin1==True and fin2==True:
        break
    time.sleep(1)
#process.terminate()

nb_read() is a non-blocking style coroutine to load data. It will spawn a process to read data. subprocess.poll() is the method to poll status from sub process. It the return value is None, then nb_read() will yield None to notify caller data loading is not finished yet and pauses there. Otherwise, nb_read() will use subprocss.communicate() to retrieve stdout content from sub process and terminate the process.

The while 1 loop is used to mimic event loop in asyncio library. It will create two generators of nb_read() and query if they have finished loading data. If so, it will close the generator. The loop will continue to run unless all reading tasks are finished. The loop use send(None) to inform the generator that you can run now.

That is the reason that why the generator based coroutine has to be decorated by @asyncio.coroutine.

Output of the client_subp.py:

still working on task1
still working on task2
still working on task1
still working on task2
still working on task1
still working on task2
still working on task1
still working on task2
still working on task1
task2:finished!

still working on task1
task1:finished!

Reference:
What is new in Python 3.3
PEP380
In practice, what are the main uses for the new “yield from” syntax in Python 3.3?
PEP342
How the heck does async/await work in Python 3.5?

No comments: