break.py
是一个自动 break的 程序.

  1. #### break.py #####
  2. import time
  3. count = 1
  4. def main():
  5. global count
  6. while True:
  7. time.sleep(1)
  8. if (count % 3) != 0:
  9. count = count + 1
  10. print(" count = ",count )
  11. pass
  12. else:
  13. count = count + 1
  14. print(" count = ", count," ## it is time to break ## ")
  15. time.sleep(3)
  16. break

test_break.py

test_break.py 用来执行

####  test_break.py  ####
import test_break
from ForeverThread import ForeverThread
t = ForeverThread(target=test_break.main(), timeSleep=3, args=())
if __name__ == "__main__":
    t.start()

ForeverThread.py

这是一个自动重启的 thread.py,重写了 threading.thread

import threading
import inspect
import ctypes
import time
import logging
import traceback
# https://stackoverflow.com/questions/323972/is-there-any-way-to-kill-a-thread
def _async_raise(tid, exctype):
    '''Raises an exception in the threads with id tid'''
    if not inspect.isclass(exctype):
        raise TypeError("Only types can be raised (not instances)")
    res = ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid),
                                                     ctypes.py_object(exctype))
    if res == 0:
        raise ValueError("invalid thread id")
    elif res != 1:
        # "if it returns a number greater than one, you're in trouble,
        # and you should call it again with exc=NULL to revert the effect"
        ctypes.pythonapi.PyThreadState_SetAsyncExc(ctypes.c_long(tid), None)
        raise SystemError("PyThreadState_SetAsyncExc failed")
class ForceInturruptException(Exception):
    pass
class ForceTerminateException(Exception):
    pass
class ThreadWithExc(threading.Thread):
    '''A thread class that supports raising exception in the thread from
       another thread.
    '''
    def _get_my_tid(self):
        """determines this (self's) thread id
        CAREFUL : this function is executed in the context of the caller
        thread, to get the identity of the thread represented by this
        instance.
        """
        if not self.isAlive():
            raise threading.ThreadError("the thread is not active")
        # do we have it cached?
        if hasattr(self, "_thread_id"):
            return self._thread_id
        # no, look for it in the _active dict
        for tid, tobj in threading._active.items():
            if tobj is self:
                self._thread_id = tid
                return tid
        # TODO: in python 2.6, there's a simpler way to do : self.ident
        raise AssertionError("could not determine the thread's id")
    def raiseExc(self, exctype):
        """Raises the given exception type in the context of this thread.
        If the thread is busy in a system call (time.sleep(),
        socket.accept(), ...), the exception is simply ignored.
        If you are sure that your exception should terminate the thread,
        one way to ensure that it works is:
            t = ThreadWithExc( ... )
            ...
            t.raiseExc( SomeException )
            while t.isAlive():
                time.sleep( 0.1 )
                t.raiseExc( SomeException )
        If the exception is to be caught by the thread, you need a way to
        check that your thread has caught it.
        CAREFUL : this function is executed in the context of the
        caller thread, to raise an excpetion in the context of the
        thread represented by this instance.
        """
        _async_raise(self._get_my_tid(), exctype)
    def interrupt_single_run(self):
        print('interrupt_single_run')
        self.raiseExc(ForceInturruptException)
    def terminate(self):
        self.raiseExc(ForceTerminateException)
class ForeverThread(ThreadWithExc):
    ''' This ForeverThread keeps thread alive with entry `single_run` '''
    def __init__(self, timesleepInScond, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.timesleepInScond = timesleepInScond
        self.logger = logging.getLogger('ForeverThread')
    def single_run(self):
        if self._target:
            self._target(*self._args, **self._kwargs)
        else:
            raise NotImplementedError('Implement me to run or set target')
    def run(self):
        while True:
            try:
                self._last_run_time = time.time()
                self.single_run()
            except ForceInturruptException:
                self.logger.warning('Interrupted by others, run again')
            except ForceTerminateException:
                self.logger.critical('Terminated by others, exit')
                break
            except:
                self.logger.error(traceback.format_exc())
            finally:
                time.sleep(self.timesleepInScond)
class TimeoutDaemon(ForeverThread):
    def __init__(self):
        super().__init__()
        self.threads_timeout = {}
        self.lock = threading.Lock()
        self.start()
    def set_thread_single_run_timeout(self, t, timeout):
        if not issubclass(type(t), ForeverThread):
            raise ValueError('t is not subclass of ForeverThread!')
        with self.lock:
            self.threads_timeout[t] = timeout
    def single_run(self):
        time.sleep(0.1)
        now = time.time()
        with self.lock:
            for t, timeout in self.threads_timeout.items():
                if now - t._last_run_time > timeout:
                    t.interrupt_single_run()
if __name__ == '__main__':
    # class MyThread(ForeverThread):
    #     def __init__(self):
    #         super().__init__()
    #         self.cnt = 0
    #     def single_run(self):
    #         self.cnt += 1
    #         print(self.cnt)
    #         time.sleep(1)
    #         if self.cnt > 3:
    #             raise Exception()
    # t = MyThread()
    # t.start()
    # def xx():
    #     print(time.time())
    #     time.sleep(0.5)
    #     raise Exception()
    # t = ForeverThread(target=xx)
    # t.start()
    # t.join()
    # time.sleep(2)
    # t.interrupt_single_run()
    # time.sleep(2)
    # t.terminate()
    # t.join()
    # print('over')
    def xx2():
        print('xx2:', time.time())
        for _ in range(30):
            time.sleep(0.1)
    t = ForeverThread(target=xx2)
    t.start()
    daemon = TimeoutDaemon()
    daemon.set_thread_single_run_timeout(t, 4)
    t.join()

精彩函数

传入一个字典,
用其中的函数.

for m in target_machines:
        m = MachineInfo(m)
        print(m.getDisplayName(), ' '.join(m.getTags()))
    print(f'Num machines: {len(target_machines)}. Auto pushing started in 5 seconds...')
    time.sleep(5)
class MachineInfo(dict):
    def getMachineId(self):
        return self['Id']
    def getDisplayName(self):
        return self['DisplayName']
    def getPendingCommandsCount(self):
        return self['PendingCommandsCount']
    def getTags(self):
        return self['Tags']

说明参数类型

def __init__(self, client: AiClothClient, machineId: str,
                 check_version_key: str, target_version_value: str, deb_name: str, dry_run : bool):