memory

  1. from datetime import datetime
  2. import os
  3. from apscheduler.schedulers.blocking import BlockingScheduler
  4. def tick():
  5. print('Tick! The time is: %s' % datetime.now())
  6. if __name__ == '__main__':
  7. scheduler = BlockingScheduler()
  8. scheduler.add_job(tick, 'interval', seconds=3)
  9. print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))
  10. try:
  11. scheduler.start()
  12. except (KeyboardInterrupt, SystemExit):
  13. pass

sqlalchemy

  1. from datetime import datetime, timedelta
  2. import sys
  3. import os
  4. from apscheduler.schedulers.blocking import BlockingScheduler
  5. def alarm(time):
  6. print('Alarm! This alarm was scheduled at %s.' % time)
  7. if __name__ == '__main__':
  8. scheduler = BlockingScheduler()
  9. url = sys.argv[1] if len(sys.argv) > 1 else 'sqlite:///example.sqlite'
  10. scheduler.add_jobstore('sqlalchemy', url=url)
  11. alarm_time = datetime.now() + timedelta(seconds=10)
  12. scheduler.add_job(alarm, 'date', run_date=alarm_time, args=[datetime.now()])
  13. print('To clear the alarms, delete the example.sqlite file.')
  14. print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))
  15. try:
  16. scheduler.start()
  17. except (KeyboardInterrupt, SystemExit):
  18. pass

mongodb

  1. from datetime import datetime, timedelta
  2. import sys
  3. import os
  4. from apscheduler.schedulers.blocking import BlockingScheduler
  5. def alarm(time):
  6. print('Alarm! This alarm was scheduled at %s.' % time)
  7. if __name__ == '__main__':
  8. scheduler = BlockingScheduler()
  9. scheduler.add_jobstore('mongodb', collection='example_jobs')
  10. if len(sys.argv) > 1 and sys.argv[1] == '--clear':
  11. scheduler.remove_all_jobs()
  12. alarm_time = datetime.now() + timedelta(seconds=10)
  13. scheduler.add_job(alarm, 'date', run_date=alarm_time, args=[datetime.now()])
  14. print('To clear the alarms, run this example with the --clear argument.')
  15. print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))
  16. try:
  17. scheduler.start()
  18. except (KeyboardInterrupt, SystemExit):
  19. pass

redids

  1. from datetime import datetime, timedelta
  2. import sys
  3. import os
  4. from apscheduler.schedulers.blocking import BlockingScheduler
  5. def alarm(time):
  6. print('Alarm! This alarm was scheduled at %s.' % time)
  7. if __name__ == '__main__':
  8. scheduler = BlockingScheduler()
  9. scheduler.add_jobstore('redis', jobs_key='example.jobs', run_times_key='example.run_times')
  10. if len(sys.argv) > 1 and sys.argv[1] == '--clear':
  11. scheduler.remove_all_jobs()
  12. alarm_time = datetime.now() + timedelta(seconds=10)
  13. scheduler.add_job(alarm, 'date', run_date=alarm_time, args=[datetime.now()])
  14. print('To clear the alarms, run this example with the --clear argument.')
  15. print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))
  16. try:
  17. scheduler.start()
  18. except (KeyboardInterrupt, SystemExit):
  19. pass