python - How do I connect asyncio.coroutines that continually produce and consume data? -
i trying learn how (idiomatically) use python 3.4's asyncio
. biggest stumbling block how "chain" coroutines continually consume data, update state it, , allow state used coroutine.
the observable behaviour expect example program periodically report on sum of numbers received subprocess. reporting should happen @ same rate source
object recieves numbers subprocess. io blocking in reporting function should not block reading subprocess. if reporting function blocks longer iteration of reading subprocess, don't care if skips ahead or reports bunch @ once; there should many iterations of reporter()
there of expect_exact()
on long enough timeframe.
#!/usr/bin/python3 import asyncio import pexpect class source: def __init__(self): self.flag = asyncio.event() self.sum = 0 def start(self): self.flag.set() def stop(self): self.flag.clear() @asyncio.coroutine def run(self): yield self.flag.wait() p = pexpect.spawn( "python -c " "'import random, time\n" "while true: print(random.choice((-1, 1))); time.sleep(0.5)'") while self.flag.is_set(): yield p.expect_exact('\n', async=true) self.sum += int(p.before) p.terminate() @asyncio.coroutine def reporter(source): while true: # like: new_sum = yield source # ??? print("new sum is: {:d}".format(new_sum)) # potentially other blocking operation yield limited_throughput.write(new_sum) def main(): loop = asyncio.get_event_loop() source = source() loop.call_later(1, source.start) loop.call_later(11, source.stop) # again, not sure goes here... asyncio.async(reporter(source)) loop.run_until_complete(source.run()) loop.close() if __name__ == '__main__': main()
this example requires pexpect
installed git; replace run()
with:
@asyncio.coroutine def run(self): yield self.flag.wait() while self.flag.is_set(): value = yield asyncio.sleep(0.5, random.choice((-1, 1))) self.sum += value
but real subprocess i'm interested in needs run in pty
, think means supplied subprocess transport/protocol framework in asyncio
won't sufficient this. point source of asynchronous activity coroutine can used yield from
.
note reporter()
function in example not valid code; problem don't know should go in there. ideally i'd keep reporter()
code separate run()
; point of exersise see how factor out more complex programs smaller units of code using components in asyncio
.
is there way structure kind of behaviour asyncio
module?
the locking primitives , queues in asyncio
provide mechanisms doing this.
conditions
the asyncio.condition()
provides way notified of condition. use when doesn't matter if drop events.
class source: def __init__(self): self.flag = asyncio.event() self.sum = 0 # consumers self.ready = asyncio.condition() def start(self): self.flag.set() def stop(self): self.flag.clear() @asyncio.coroutine def run(self): yield self.flag.wait() p = pexpect.spawn( "python -c " "'import random, time\n" "while true: print(random.choice((-1, 1))); time.sleep(0.5)'") while self.flag.is_set(): yield p.expect_exact('\n', async=true) self.sum += int(p.before) (yield self.ready): self.ready.notify_all() # or notify() depending on situation p.terminate() @asyncio.coroutine def read(self): (yield self.ready): yield self.ready.wait() return self.sum @asyncio.coroutine def reporter(source): while true: # like: new_sum = yield source.read() print("new sum is: {:d}".format(new_sum)) # other potentially blocking stuff in here
queues
the asyncio.queue()
lets put data in queue (either lifo or fifo) , have else read it. use if absolutely want respond every event, if consumer gets behind (in time). note if limit size of queue, producer block if consumer slow enough.
note allows convert sum
local variable too.
#!/usr/bin/python3 import asyncio import pexpect class source: def __init__(self): self.flag = asyncio.event() # note: self.sum removed! # consumers self.output = asyncio.queue() def start(self): self.flag.set() def stop(self): self.flag.clear() @asyncio.coroutine def run(self): yield self.flag.wait() sum = 0 p = pexpect.spawn( "python -c " "'import random, time\n" "while true: print(random.choice((-1, 1))); time.sleep(0.5)'") while self.flag.is_set(): yield p.expect_exact('\n', async=true) sum += int(p.before) yield self.output.put(sum) p.terminate() @asyncio.coroutine def read(self): return (yield self.output.get()) @asyncio.coroutine def reporter(source): while true: # like: new_sum = yield source.read() print("new sum is: {:d}".format(new_sum)) # other potentially blocking stuff here
note python 3.4.4 add task_done()
, join()
methods queue
, allow gracefully finish processing when know consumer finished (where applicable).
Comments
Post a Comment