Snažím se pravidelně query API. Můj kód struktura je následující:
async def QueryData():
print(datetime.datetime.now())
async def main():
await TestApiConnection()
scheduler = AsyncIOScheduler(timezone="Europe/Berlin")
scheduler.add_job(QueryData, 'cron', minute='0-59')
scheduler.start()
try:
asyncio.get_event_loop().run_forever()
except (KeyboardInterrupt, SystemExit):
pass
scheduler.shutdown(wait=False)
if __name__ == "__main__":
asyncio.run(main())
Následující chyba je vyvolána, když jsem spustit tento:
v run_forever prosadit samostatně._self_reading_future není AssertionError
Když jsem zase hlavní funkce na synchronní funkce, QueryData práce začíná tím, že výsledek TestApiConnection není očekávané.
async def QueryData():
print(datetime.datetime.now())
def main():
TestApiConnection()
...
if __name__ == "__main__":
main()
Tak jak mám začít práci od asynchronní hlavní metoda? Nebo bych měl restrukturalizovat kód?