mongodb - APScheduler run async function in Tornado Python -
i trying develop small app gather weather data api. have used apscheduler execute function every x minutes. use python tornado framework.
the error getting is:
info job "getweather (trigger: interval[0:01:00], next run at: 2015-03-28 11:40:58 cet)" executed error exception in callback functools.partial(<function wrap.<locals>.null_wrapper @ 0x0335c978>, <tornado.concurrent.future object @ 0x03374430>) traceback (most recent call last): file "c:\python34\lib\site-packages\tornado\ioloop.py", line 568, in _run_callback ret = callback() file "c:\python34\lib\site-packages\tornado\stack_context.py", line 275, in null_wrapper return fn(*args, **kwargs) greenlet.error: cannot switch different thread
which think coming coroutine getweather() as, if remove asycn features it, works.
i using motor read needed coordinates , pass them through api , store weather data in mongodb.
import os.path, logging import tornado.web import tornado.ioloop tornado.httpclient import asynchttpclient tornado import gen tornado.options import define, options apscheduler.schedulers.tornado import tornadoscheduler import motor client = motor.motorclient() db = client['apitest'] console_log = logging.getlogger(__name__) define("port", default=8888, help="run on given port", type=int) define("debug", default=false, help="run in debug mode") class mainrequest (tornado.web.requesthandler): def get(self): self.write("hello") scheduler = tornadoscheduler() class scheduledtasks(object): def get(self): print("this scheduler"); def addjobs(): scheduler.add_job(getweather, 'interval', minutes=1) def startscheduler(): scheduler.start(); def stopscheduler(): scheduler.stop(); class weather(tornado.web.requesthandler): def get(self): self.write("this weather robot!") getweather() @gen.coroutine def getweather(): ''' getting city weather forecast.io api ''' console_log.debug('start: weather robot') cursor = findcities() while (yield cursor.fetch_next): city = cursor.next_object() lat = str(city["lat"]) lon = str(city["lon"]) http_client = asynchttpclient() response = yield http_client.fetch("https://api.forecast.io/forecast/3925d0668cf520768ca855951f1097cd/%s,%s" %(lat, lon)) if response.error: print ("error:", response.error) # store cities errors in order save them in log file else: json = tornado.escape.json_decode(response.body) temperature = json["currently"]["temperature"] summary = json["currently"]["summary"] db.cities.update({'_id': city["_id"]}, {'$set': {'temperature': temperature, 'summary': summary}}) console_log.debug('end: weather robot') return def findcities(): ''' cities = [{ "_id" : objectid("55165d07258058ee8dca2172"), "name" : "london", "country" : "united kingdom", "lat" : 51.507351, "lon" : -0.127758 }, { "_id" : objectid("55165d07258058ee8dca2173"), "name" : "barcelona", "country" : "spain", "lat" : 41.385064, "lon" : 2.173403 } ''' cities = db.cities.find().sort([('_id', -1)]) return cities def main(): logging.basicconfig(level=logging.debug,format='%(levelname)-8s %(message)s') app = tornado.web.application( [ (r'/robots/weather', weather), (r'/', mainrequest) ], cookie_secret="__todo:_generate_your_own_random_value_here__", login_url="/auth/login", template_path=os.path.join(os.path.dirname(__file__), "templates"), static_path=os.path.join(os.path.dirname(__file__), "static"), xsrf_cookies=true, debug=options.debug, ) app.listen(options.port) addjobs() startscheduler() tornado.ioloop.ioloop.instance().start() if __name__ == "__main__": main()
any idea doing wrong? see in apscheduler code, tornadoscheduler() runs in tornado ioloop... (https://bitbucket.org/agronholm/apscheduler/src/a34075b0037dba46735bae67f598ec6133003ef1/apscheduler/schedulers/tornado.py?at=master)
oh! forgot idea able execute task via apscheduler or manually both.
many thanks!
by default, tornadoscheduler runs scheduled tasks in thread pool. specific task, however, uses ioloop , expects run in same thread. fix this, can use add_callback() method of tornado ioloop schedule task run in ioloop's thread possible.
like so:
def your_scheduled_task(): ioloop.instance().add_callback(your_real_task_function)
or better:
scheduler.add_job(ioloop.instance().add_callback, 'interval', minutes=1, args=[getweather])
Comments
Post a Comment