python - How do I connect asyncio.coroutines that continually produce and consume data? -


i trying learn how (idiomatically) use python 3.4's asyncio. biggest stumbling block how "chain" coroutines continually consume data, update state it, , allow state used coroutine.

the observable behaviour expect example program periodically report on sum of numbers received subprocess. reporting should happen @ same rate source object recieves numbers subprocess. io blocking in reporting function should not block reading subprocess. if reporting function blocks longer iteration of reading subprocess, don't care if skips ahead or reports bunch @ once; there should many iterations of reporter() there of expect_exact() on long enough timeframe.

#!/usr/bin/python3 import asyncio import pexpect  class source:      def __init__(self):         self.flag = asyncio.event()         self.sum = 0      def start(self):         self.flag.set()      def stop(self):         self.flag.clear()      @asyncio.coroutine     def run(self):         yield self.flag.wait()          p = pexpect.spawn(             "python -c "             "'import random, time\n"             "while true: print(random.choice((-1, 1))); time.sleep(0.5)'")          while self.flag.is_set():             yield p.expect_exact('\n', async=true)             self.sum += int(p.before)          p.terminate()  @asyncio.coroutine def reporter(source):     while true:         # like:         new_sum = yield source # ???         print("new sum is: {:d}".format(new_sum))         # potentially other blocking operation         yield limited_throughput.write(new_sum)  def main():     loop = asyncio.get_event_loop()      source = source()     loop.call_later(1, source.start)     loop.call_later(11, source.stop)      # again, not sure goes here...     asyncio.async(reporter(source))      loop.run_until_complete(source.run())     loop.close()  if __name__ == '__main__':     main() 

this example requires pexpect installed git; replace run() with:

@asyncio.coroutine def run(self):     yield self.flag.wait()      while self.flag.is_set():         value = yield asyncio.sleep(0.5, random.choice((-1, 1)))         self.sum += value 

but real subprocess i'm interested in needs run in pty, think means supplied subprocess transport/protocol framework in asyncio won't sufficient this. point source of asynchronous activity coroutine can used yield from.

note reporter() function in example not valid code; problem don't know should go in there. ideally i'd keep reporter() code separate run(); point of exersise see how factor out more complex programs smaller units of code using components in asyncio.

is there way structure kind of behaviour asyncio module?

the locking primitives , queues in asyncio provide mechanisms doing this.

conditions

the asyncio.condition() provides way notified of condition. use when doesn't matter if drop events.

class source:      def __init__(self):         self.flag = asyncio.event()         self.sum = 0          # consumers         self.ready = asyncio.condition()      def start(self):         self.flag.set()      def stop(self):         self.flag.clear()      @asyncio.coroutine     def run(self):         yield self.flag.wait()          p = pexpect.spawn(             "python -c "             "'import random, time\n"             "while true: print(random.choice((-1, 1))); time.sleep(0.5)'")          while self.flag.is_set():             yield p.expect_exact('\n', async=true)             self.sum += int(p.before)             (yield self.ready):                 self.ready.notify_all() # or notify() depending on situation          p.terminate()      @asyncio.coroutine     def read(self):         (yield self.ready):             yield self.ready.wait()             return self.sum   @asyncio.coroutine def reporter(source):     while true:         # like:         new_sum = yield source.read()         print("new sum is: {:d}".format(new_sum))         # other potentially blocking stuff in here 

queues

the asyncio.queue() lets put data in queue (either lifo or fifo) , have else read it. use if absolutely want respond every event, if consumer gets behind (in time). note if limit size of queue, producer block if consumer slow enough.

note allows convert sum local variable too.

#!/usr/bin/python3 import asyncio import pexpect  class source:      def __init__(self):         self.flag = asyncio.event()         # note: self.sum removed!          # consumers         self.output = asyncio.queue()      def start(self):         self.flag.set()      def stop(self):         self.flag.clear()      @asyncio.coroutine     def run(self):         yield self.flag.wait()          sum = 0          p = pexpect.spawn(             "python -c "             "'import random, time\n"             "while true: print(random.choice((-1, 1))); time.sleep(0.5)'")          while self.flag.is_set():             yield p.expect_exact('\n', async=true)             sum += int(p.before)             yield self.output.put(sum)          p.terminate()      @asyncio.coroutine     def read(self):         return (yield self.output.get())  @asyncio.coroutine def reporter(source):     while true:         # like:         new_sum = yield source.read()         print("new sum is: {:d}".format(new_sum))         # other potentially blocking stuff here 

note python 3.4.4 add task_done() , join() methods queue, allow gracefully finish processing when know consumer finished (where applicable).


Comments

Popular posts from this blog

google chrome - Developer tools - How to inspect the elements which are added momentarily (by JQuery)? -

angularjs - Showing an empty as first option in select tag -

php - Cloud9 cloud IDE and CakePHP -