memory
from datetime import datetimeimport osfrom apscheduler.schedulers.blocking import BlockingSchedulerdef tick(): print('Tick! The time is: %s' % datetime.now())if __name__ == '__main__': scheduler = BlockingScheduler() scheduler.add_job(tick, 'interval', seconds=3) print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C')) try: scheduler.start() except (KeyboardInterrupt, SystemExit): pass
sqlalchemy
from datetime import datetime, timedeltaimport sysimport osfrom apscheduler.schedulers.blocking import BlockingSchedulerdef alarm(time): print('Alarm! This alarm was scheduled at %s.' % time)if __name__ == '__main__': scheduler = BlockingScheduler() url = sys.argv[1] if len(sys.argv) > 1 else 'sqlite:///example.sqlite' scheduler.add_jobstore('sqlalchemy', url=url) alarm_time = datetime.now() + timedelta(seconds=10) scheduler.add_job(alarm, 'date', run_date=alarm_time, args=[datetime.now()]) print('To clear the alarms, delete the example.sqlite file.') print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C')) try: scheduler.start() except (KeyboardInterrupt, SystemExit): pass
mongodb
from datetime import datetime, timedeltaimport sysimport osfrom apscheduler.schedulers.blocking import BlockingSchedulerdef alarm(time): print('Alarm! This alarm was scheduled at %s.' % time)if __name__ == '__main__': scheduler = BlockingScheduler() scheduler.add_jobstore('mongodb', collection='example_jobs') if len(sys.argv) > 1 and sys.argv[1] == '--clear': scheduler.remove_all_jobs() alarm_time = datetime.now() + timedelta(seconds=10) scheduler.add_job(alarm, 'date', run_date=alarm_time, args=[datetime.now()]) print('To clear the alarms, run this example with the --clear argument.') print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C')) try: scheduler.start() except (KeyboardInterrupt, SystemExit): pass
redids
from datetime import datetime, timedeltaimport sysimport osfrom apscheduler.schedulers.blocking import BlockingSchedulerdef alarm(time): print('Alarm! This alarm was scheduled at %s.' % time)if __name__ == '__main__': scheduler = BlockingScheduler() scheduler.add_jobstore('redis', jobs_key='example.jobs', run_times_key='example.run_times') if len(sys.argv) > 1 and sys.argv[1] == '--clear': scheduler.remove_all_jobs() alarm_time = datetime.now() + timedelta(seconds=10) scheduler.add_job(alarm, 'date', run_date=alarm_time, args=[datetime.now()]) print('To clear the alarms, run this example with the --clear argument.') print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C')) try: scheduler.start() except (KeyboardInterrupt, SystemExit): pass