代码之家  ›  专栏  ›  技术社区  ›  Teifion

函数调用超时

  •  211
  • Teifion  · 技术社区  · 16 年前

    如何调用该函数,或者如何包装该函数,以便在花费的时间超过5秒时,脚本取消该函数并执行其他操作?

    13 回复  |  直到 8 年前
        1
  •  281
  •   sweden    4 年前

    你可以使用 signal 如果您在UNIX上运行,请执行以下操作:

    In [1]: import signal
    
    # Register an handler for the timeout
    In [2]: def handler(signum, frame):
       ...:     print("Forever is over!")
       ...:     raise Exception("end of time")
       ...: 
    
    # This function *may* run for an indetermined time...
    In [3]: def loop_forever():
       ...:     import time
       ...:     while 1:
       ...:         print("sec")
       ...:         time.sleep(1)
       ...:         
       ...:         
    
    # Register the signal function handler
    In [4]: signal.signal(signal.SIGALRM, handler)
    Out[4]: 0
    
    # Define a timeout for your function
    In [5]: signal.alarm(10)
    Out[5]: 0
    
    In [6]: try:
       ...:     loop_forever()
       ...: except Exception, exc: 
       ...:     print(exc)
       ....: 
    sec
    sec
    sec
    sec
    sec
    sec
    sec
    sec
    Forever is over!
    end of time
    
    # Cancel the timer if the function returned before timeout
    # (ok, mine won't but yours maybe will :)
    In [7]: signal.alarm(0)
    Out[7]: 0
    

    通话后10秒 signal.alarm(10) ,则调用处理程序。这会引发一个异常,您可以从常规Python代码中截获该异常。

    注意 由于我们在超时发生时引发异常,因此可能会在函数内部捕获并忽略异常,例如,一个这样的函数:

    def loop_forever():
        while 1:
            print('sec')
            try:
                time.sleep(10)
            except:
                continue
    
        2
  •  192
  •   Emil    4 年前

    你可以用 multiprocessing.Process

    密码

    import multiprocessing
    import time
    
    # bar
    def bar():
        for i in range(100):
            print "Tick"
            time.sleep(1)
    
    if __name__ == '__main__':
        # Start bar as a process
        p = multiprocessing.Process(target=bar)
        p.start()
    
        # Wait for 10 seconds or until process finishes
        p.join(10)
    
        # If thread is still active
        if p.is_alive():
            print "running... let's kill it..."
    
            # Terminate - may not work if process is stuck for good
            p.terminate()
            # OR Kill - will work for sure, no chance for process to finish nicely however
            # p.kill()
    
            p.join()
    
        3
  •  94
  •   Community paulsm4    7 年前

    如何调用该函数,或者如何包装该函数,以便在时间超过5秒时脚本取消该函数?

    我贴了一张 gist 这就解决了这个问题,一个装饰师和一个 threading.Timer

    导入和设置以实现兼容性

    它是用Python2和Python3测试的。它也应该在Unix/Linux和Windows下工作。

    from __future__ import print_function
    import sys
    import threading
    from time import sleep
    try:
        import thread
    except ImportError:
        import _thread as thread
    

    try:
        range, _print = xrange, print
        def print(*args, **kwargs): 
            flush = kwargs.pop('flush', False)
            _print(*args, **kwargs)
            if flush:
                kwargs.get('file', sys.stdout).flush()            
    except NameError:
        pass
    

    现在我们已经从标准库导入了我们的功能。

    exit_after 室内装修设计师

    main() 从子线程:

    def quit_function(fn_name):
        # print to stderr, unbuffered in Python 2.
        print('{0} took too long'.format(fn_name), file=sys.stderr)
        sys.stderr.flush() # Python 3 stderr is likely buffered.
        thread.interrupt_main() # raises KeyboardInterrupt
    

    这是装饰师自己:

    def exit_after(s):
        '''
        use as decorator to exit process if 
        function takes longer than s seconds
        '''
        def outer(fn):
            def inner(*args, **kwargs):
                timer = threading.Timer(s, quit_function, args=[fn.__name__])
                timer.start()
                try:
                    result = fn(*args, **kwargs)
                finally:
                    timer.cancel()
                return result
            return inner
        return outer
    

    下面的用法直接回答了您关于5秒后退出的问题!:

    @exit_after(5)
    def countdown(n):
        print('countdown started', flush=True)
        for i in range(n, -1, -1):
            print(i, end=', ', flush=True)
            sleep(1)
        print('countdown finished')
    

    演示:

    >>> countdown(3)
    countdown started
    3, 2, 1, 0, countdown finished
    >>> countdown(10)
    countdown started
    10, 9, 8, 7, 6, countdown took too long
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
      File "<stdin>", line 11, in inner
      File "<stdin>", line 6, in countdown
    KeyboardInterrupt
    

    KeyboardInterrupt 不总是停止睡眠线程

    请注意,在Windows上的Python 2上,睡眠不会总是被键盘中断中断,例如:

    @exit_after(1)
    def sleep10():
        sleep(10)
        print('slept 10 seconds')
    
    >>> sleep10()
    sleep10 took too long         # Note that it hangs here about 9 more seconds
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
      File "<stdin>", line 11, in inner
      File "<stdin>", line 3, in sleep10
    KeyboardInterrupt
    

    PyErr_CheckSignals() Cython, Python and KeyboardInterrupt ignored

    在任何情况下,我都不会让一个线程睡眠超过一秒钟——这是处理器时间上的一个问题。

    还有别的事吗?

    要捕获它并执行其他操作,您可以捕获键盘中断。

    >>> try:
    ...     countdown(10)
    ... except KeyboardInterrupt:
    ...     print('do something else')
    ... 
    countdown started
    10, 9, 8, 7, 6, countdown took too long
    do something else
    
        4
  •  60
  •   Alex    12 年前

    我有一个不同的建议,它是一个纯函数(使用与线程建议相同的API),并且似乎工作良好(基于此线程的建议)

    def timeout(func, args=(), kwargs={}, timeout_duration=1, default=None):
        import signal
    
        class TimeoutError(Exception):
            pass
    
        def handler(signum, frame):
            raise TimeoutError()
    
        # set the timeout handler
        signal.signal(signal.SIGALRM, handler) 
        signal.alarm(timeout_duration)
        try:
            result = func(*args, **kwargs)
        except TimeoutError as exc:
            result = default
        finally:
            signal.alarm(0)
    
        return result
    
        5
  •  36
  •   Matt Tardiff Rich    8 年前

    import multiprocessing.pool
    import functools
    
    def timeout(max_timeout):
        """Timeout decorator, parameter in seconds."""
        def timeout_decorator(item):
            """Wrap the original function."""
            @functools.wraps(item)
            def func_wrapper(*args, **kwargs):
                """Closure for function."""
                pool = multiprocessing.pool.ThreadPool(processes=1)
                async_result = pool.apply_async(item, args, kwargs)
                # raises a TimeoutError if execution exceeds max_timeout
                return async_result.get(max_timeout)
            return func_wrapper
        return timeout_decorator
    

    那么,让测试或任何您喜欢的功能超时就这么简单:

    @timeout(5.0)  # if execution takes longer than 5 seconds, raise a TimeoutError
    def test_base_regression(self):
        ...
    
        6
  •  31
  •   egeland    10 年前

    这个 stopit

    我喜欢这个 @stopit.threading_timeoutable timeout 参数设置为修饰函数,该函数执行预期的操作,它会停止该函数。

    在pypi上查看它: https://pypi.python.org/pypi/stopit

        7
  •  19
  •   Brian    7 年前

    有很多建议,但是没有一个使用concurrent.futures,我认为这是处理这个问题最清晰的方法。

    from concurrent.futures import ProcessPoolExecutor
    
    # Warning: this does not terminate function if timeout
    def timeout_five(fnc, *args, **kwargs):
        with ProcessPoolExecutor() as p:
            f = p.submit(fnc, *args, **kwargs)
            return f.result(timeout=5)
    

    python 3.2+的本机版本,后端口为2.7(pip安装)。

    在线程和进程之间切换就像替换 ProcessPoolExecutor 具有 ThreadPoolExecutor

    如果您想在超时时终止进程,我建议您 Pebble .

        8
  •  17
  •   Gil    6 年前

    非常好,易于使用和可靠 皮皮 项目 ( https://pypi.org/project/timeout-decorator/

    安装 :

    pip install timeout-decorator
    

    :

    import time
    import timeout_decorator
    
    @timeout_decorator.timeout(5)
    def mytest():
        print "Start"
        for i in range(1,10):
            time.sleep(1)
            print "%d seconds have passed" % i
    
    if __name__ == '__main__':
        mytest()
    
        9
  •  15
  •   Inaimathi    4 年前

    我是wrapt_timeout_decorator的作者

    乍一看,这里介绍的大多数解决方案在Linux下都能正常工作——因为我们有fork()和signals()——但在windows上看起来有些不同。 当涉及到Linux上的子线程时,您不能再使用信号。

    为了在Windows下生成进程,它需要是可拾取的,而许多修饰函数或类方法则不是。

    因此,您需要使用更好的pickler,比如dill和multiprocess(而不是pickle和multiprocessing)——这就是为什么您不能使用ProcessPoolExecutor(或者只能使用有限的功能)。

    函数是否应该在0.5+0.2秒后超时(因此让该方法运行0.2秒)?

    此外,嵌套的装饰程序可能很讨厌,并且不能在子线程中使用信号。如果您想创建一个真正通用的、跨平台的装饰器,所有这些都需要考虑(并进行测试)。

    @Alexis Eggermont-不幸的是,我没有足够的观点发表评论-也许其他人可以通知你-我想我解决了你的进口问题。

        10
  •  14
  •   Enginer    3 年前

    在@piro的基础上并通过@piro增强答案,您可以构建一个contextmanager。这允许非常可读的代码在成功运行后禁用alaram信号(设置signal.alarm(0))

    from contextlib import contextmanager
    import signal
    import time
    
    @contextmanager
    def timeout(duration):
        def timeout_handler(signum, frame):
            raise Exception(f'block timedout after {duration} seconds')
        signal.signal(signal.SIGALRM, timeout_handler)
        signal.alarm(duration)
        yield
        signal.alarm(0)
    
    def sleeper(duration):
        time.sleep(duration)
        print('finished')
    

    用法示例:

    In [19]: with timeout(2):
        ...:     sleeper(1)
        ...:     
    finished
    
    In [20]: with timeout(2):
        ...:     sleeper(3)
        ...:         
    ---------------------------------------------------------------------------
    Exception                                 Traceback (most recent call last)
    <ipython-input-20-66c78858116f> in <module>()
          1 with timeout(2):
    ----> 2     sleeper(3)
          3 
    
    <ipython-input-7-a75b966bf7ac> in sleeper(t)
          1 def sleeper(t):
    ----> 2     time.sleep(t)
          3     print('finished')
          4 
    
    <ipython-input-18-533b9e684466> in timeout_handler(signum, frame)
          2 def timeout(duration):
          3     def timeout_handler(signum, frame):
    ----> 4         raise Exception(f'block timedout after {duration} seconds')
          5     signal.signal(signal.SIGALRM, timeout_handler)
          6     signal.alarm(duration)
    
    Exception: block timedout after 2 seconds
    
        11
  •  8
  •   as - if    6 年前

    timeout-decorator 不在windows系统上工作,因为windows不支持 signal

    如果在windows系统中使用超时装饰器,您将获得以下结果

    AttributeError: module 'signal' has no attribute 'SIGALRM'
    

    use_signals=False 但这对我不起作用。

    Author@bitranox创建了以下包:

    pip install https://github.com/bitranox/wrapt-timeout-decorator/archive/master.zip
    

    代码示例:

    import time
    from wrapt_timeout_decorator import *
    
    @timeout(5)
    def mytest(message):
        print(message)
        for i in range(1,10):
            time.sleep(1)
            print('{} seconds have passed'.format(i))
    
    def main():
        mytest('starting')
    
    
    if __name__ == '__main__':
        main()
    

    TimeoutError: Function mytest timed out after 5 seconds
    
        12
  •  5
  •   Alexander McFarlane    4 年前

    • 提高 TimeoutError 使用异常在超时时发出警报-可以轻松修改
    • :视窗及;MacOSX

    有关平行贴图的完整解释和扩展,请参见此处 https://flipdazed.github.io/blog/quant%20dev/parallel-functions-with-timeouts

    最小示例

    >>> @killer_call(timeout=4)
    ... def bar(x):
    ...        import time
    ...        time.sleep(x)
    ...        return x
    >>> bar(10)
    Traceback (most recent call last):
      ...
    __main__.TimeoutError: function 'bar' timed out after 4s
    

    果然

    >>> bar(2)
    2
    

    import multiprocessing as mp
    import multiprocessing.queues as mpq
    import functools
    import dill
    
    from typing import Tuple, Callable, Dict, Optional, Iterable, List, Any
    
    class TimeoutError(Exception):
    
        def __init__(self, func: Callable, timeout: int):
            self.t = timeout
            self.fname = func.__name__
    
        def __str__(self):
                return f"function '{self.fname}' timed out after {self.t}s"
    
    
    def _lemmiwinks(func: Callable, args: Tuple, kwargs: Dict[str, Any], q: mp.Queue):
        """lemmiwinks crawls into the unknown"""
        q.put(dill.loads(func)(*args, **kwargs))
    
    
    def killer_call(func: Callable = None, timeout: int = 10) -> Callable:
        """
        Single function call with a timeout
    
        Args:
            func: the function
            timeout: The timeout in seconds
        """
    
        if not isinstance(timeout, int):
            raise ValueError(f'timeout needs to be an int. Got: {timeout}')
    
        if func is None:
            return functools.partial(killer_call, timeout=timeout)
    
        @functools.wraps(killer_call)
        def _inners(*args, **kwargs) -> Any:
            q_worker = mp.Queue()
            proc = mp.Process(target=_lemmiwinks, args=(dill.dumps(func), args, kwargs, q_worker))
            proc.start()
            try:
                return q_worker.get(timeout=timeout)
            except mpq.Empty:
                raise TimeoutError(func, timeout)
            finally:
                try:
                    proc.terminate()
                except:
                    pass
        return _inners
    
    if __name__ == '__main__':
        @killer_call(timeout=4)
        def bar(x):
            import time
            time.sleep(x)
            return x
    
        print(bar(2))
        bar(10)
    

    笔记

    由于这种方式,您需要在函数内部导入 dill

    doctest 如果目标函数中有导入。你会对我有意见的 __import__ 没有找到。

        13
  •  4
  •   A R    11 年前

    我们可以用信号来做同样的事情。我认为下面的例子对你很有用。与线程相比,它非常简单。

    import signal
    
    def timeout(signum, frame):
        raise myException
    
    #this is an infinite loop, never ending under normal circumstances
    def main():
        print 'Starting Main ',
        while 1:
            print 'in main ',
    
    #SIGALRM is only usable on a unix platform
    signal.signal(signal.SIGALRM, timeout)
    
    #change 5 to however many seconds you need
    signal.alarm(5)
    
    try:
        main()
    except myException:
        print "whoops"
    
        14
  •  3
  •   raphaelauv    4 年前

    另一个使用asyncio的解决方案:

    如果您想取消后台任务,而不仅仅是在运行的主代码上超时,那么您需要来自主线程的显式通信来请求取消任务的代码,比如threading.Event()

    import asyncio
    import functools
    import multiprocessing
    from concurrent.futures.thread import ThreadPoolExecutor
    
    
    class SingletonTimeOut:
        pool = None
    
        @classmethod
        def run(cls, to_run: functools.partial, timeout: float):
            pool = cls.get_pool()
            loop = cls.get_loop()
            try:
                task = loop.run_in_executor(pool, to_run)
                return loop.run_until_complete(asyncio.wait_for(task, timeout=timeout))
            except asyncio.TimeoutError as e:
                error_type = type(e).__name__ #TODO
                raise e
    
        @classmethod
        def get_pool(cls):
            if cls.pool is None:
                cls.pool = ThreadPoolExecutor(multiprocessing.cpu_count())
            return cls.pool
    
        @classmethod
        def get_loop(cls):
            try:
                return asyncio.get_event_loop()
            except RuntimeError:
                asyncio.set_event_loop(asyncio.new_event_loop())
                # print("NEW LOOP" + str(threading.current_thread().ident))
                return asyncio.get_event_loop()
    
    # ---------------
    
    TIME_OUT = float('0.2')  # seconds
    
    def toto(input_items,nb_predictions):
        return 1
    
    to_run = functools.partial(toto,
                               input_items=1,
                               nb_predictions="a")
    
    results = SingletonTimeOut.run(to_run, TIME_OUT)
    
    
        15
  •  2
  •   Hal Canary    8 年前
    #!/usr/bin/python2
    import sys, subprocess, threading
    proc = subprocess.Popen(sys.argv[2:])
    timer = threading.Timer(float(sys.argv[1]), proc.terminate)
    timer.start()
    proc.wait()
    timer.cancel()
    exit(proc.returncode)
    
        16
  •  2
  •   jose.marcos.rf    3 年前

    以防万一,基于@piro的答案,我制作了一个函数装饰器:

    import time
    import signal
    from functools import wraps
    
    
    def timeout(timeout_secs: int):
        def wrapper(func):
            @wraps(func)
            def time_limited(*args, **kwargs):
                # Register an handler for the timeout
                def handler(signum, frame):
                    raise Exception(f"Timeout for function '{func.__name__}'")
    
                # Register the signal function handler
                signal.signal(signal.SIGALRM, handler)
    
                # Define a timeout for your function
                signal.alarm(timeout_secs)
    
                result = None
                try:
                    result = func(*args, **kwargs)
                except Exception as exc:
                    raise exc
                finally:
                    # disable the signal alarm
                    signal.alarm(0)
    
                return result
    
            return time_limited
    
        return wrapper
    

    在具有 20 seconds 超时将类似于:

        @timeout(20)
        def my_slow_or_never_ending_function(name):
            while True:
                time.sleep(1)
                print(f"Yet another second passed {name}...")
    
        try:
            results = my_slow_or_never_ending_function("Yooo!")
        except Exception as e:
            print(f"ERROR: {e}")
    
        17
  •  1
  •   James    9 年前

    嵌套的 定时中断(SIGALARM无法做到)不会被time.sleep阻止(基于线程的方法无法做到)。我最终从这里复制并轻微修改了代码: http://code.activestate.com/recipes/577600-queue-for-managing-multiple-sigalrm-alarms-concurr/

    #!/usr/bin/python
    
    # lightly modified version of http://code.activestate.com/recipes/577600-queue-for-managing-multiple-sigalrm-alarms-concurr/
    
    
    """alarm.py: Permits multiple SIGALRM events to be queued.
    
    Uses a `heapq` to store the objects to be called when an alarm signal is
    raised, so that the next alarm is always at the top of the heap.
    """
    
    import heapq
    import signal
    from time import time
    
    __version__ = '$Revision: 2539 $'.split()[1]
    
    alarmlist = []
    
    __new_alarm = lambda t, f, a, k: (t + time(), f, a, k)
    __next_alarm = lambda: int(round(alarmlist[0][0] - time())) if alarmlist else None
    __set_alarm = lambda: signal.alarm(max(__next_alarm(), 1))
    
    
    class TimeoutError(Exception):
        def __init__(self, message, id_=None):
            self.message = message
            self.id_ = id_
    
    
    class Timeout:
        ''' id_ allows for nested timeouts. '''
        def __init__(self, id_=None, seconds=1, error_message='Timeout'):
            self.seconds = seconds
            self.error_message = error_message
            self.id_ = id_
        def handle_timeout(self):
            raise TimeoutError(self.error_message, self.id_)
        def __enter__(self):
            self.this_alarm = alarm(self.seconds, self.handle_timeout)
        def __exit__(self, type, value, traceback):
            try:
                cancel(self.this_alarm) 
            except ValueError:
                pass
    
    
    def __clear_alarm():
        """Clear an existing alarm.
    
        If the alarm signal was set to a callable other than our own, queue the
        previous alarm settings.
        """
        oldsec = signal.alarm(0)
        oldfunc = signal.signal(signal.SIGALRM, __alarm_handler)
        if oldsec > 0 and oldfunc != __alarm_handler:
            heapq.heappush(alarmlist, (__new_alarm(oldsec, oldfunc, [], {})))
    
    
    def __alarm_handler(*zargs):
        """Handle an alarm by calling any due heap entries and resetting the alarm.
    
        Note that multiple heap entries might get called, especially if calling an
        entry takes a lot of time.
        """
        try:
            nextt = __next_alarm()
            while nextt is not None and nextt <= 0:
                (tm, func, args, keys) = heapq.heappop(alarmlist)
                func(*args, **keys)
                nextt = __next_alarm()
        finally:
            if alarmlist: __set_alarm()
    
    
    def alarm(sec, func, *args, **keys):
        """Set an alarm.
    
        When the alarm is raised in `sec` seconds, the handler will call `func`,
        passing `args` and `keys`. Return the heap entry (which is just a big
        tuple), so that it can be cancelled by calling `cancel()`.
        """
        __clear_alarm()
        try:
            newalarm = __new_alarm(sec, func, args, keys)
            heapq.heappush(alarmlist, newalarm)
            return newalarm
        finally:
            __set_alarm()
    
    
    def cancel(alarm):
        """Cancel an alarm by passing the heap entry returned by `alarm()`.
    
        It is an error to try to cancel an alarm which has already occurred.
        """
        __clear_alarm()
        try:
            alarmlist.remove(alarm)
            heapq.heapify(alarmlist)
        finally:
            if alarmlist: __set_alarm()
    

    以及一个使用示例:

    import alarm
    from time import sleep
    
    try:
        with alarm.Timeout(id_='a', seconds=5):
            try:
                with alarm.Timeout(id_='b', seconds=2):
                    sleep(3)
            except alarm.TimeoutError as e:
                print 'raised', e.id_
            sleep(30)
    except alarm.TimeoutError as e:
        print 'raised', e.id_
    else:
        print 'nope.'
    
        18
  •  1
  •   Dozy Sun    3 年前

    from timeout_timer import timeout, TimeoutInterrupt
    
    class TimeoutInterruptNested(TimeoutInterrupt):
        pass
    
    def test_timeout_nested_loop_both_timeout(timer="thread"):
        cnt = 0
        try:
            with timeout(5, timer=timer):
                try:
                    with timeout(2, timer=timer, exception=TimeoutInterruptNested):
                        sleep(2)
                except TimeoutInterruptNested:
                    cnt += 1
                time.sleep(10)
        except TimeoutInterrupt:
            cnt += 1
        assert cnt == 2
    

    查看更多: https://github.com/dozysun/timeout-timer

        19
  •  0
  •   diemacht    12 年前

    这里是对给定的基于线程的解决方案的一个轻微改进。

    例外情况 :

    def runFunctionCatchExceptions(func, *args, **kwargs):
        try:
            result = func(*args, **kwargs)
        except Exception, message:
            return ["exception", message]
    
        return ["RESULT", result]
    
    
    def runFunctionWithTimeout(func, args=(), kwargs={}, timeout_duration=10, default=None):
        import threading
        class InterruptableThread(threading.Thread):
            def __init__(self):
                threading.Thread.__init__(self)
                self.result = default
            def run(self):
                self.result = runFunctionCatchExceptions(func, *args, **kwargs)
        it = InterruptableThread()
        it.start()
        it.join(timeout_duration)
        if it.isAlive():
            return default
    
        if it.result[0] == "exception":
            raise it.result[1]
    
        return it.result[1]
    

    result = timeout(remote_calculate, (myarg,), timeout_duration=5)
    
        20
  •  0
  •   Troels    4 年前

    以下是POSIX版本,它结合了前面的许多答案,提供了以下功能:

    1. 阻止执行的子进程。
    2. 严格要求按时终止。

    以下是代码和一些测试用例:

    import threading
    import signal
    import os
    import time
    
    class TerminateExecution(Exception):
        """
        Exception to indicate that execution has exceeded the preset running time.
        """
    
    
    def quit_function(pid):
        # Killing all subprocesses
        os.setpgrp()
        os.killpg(0, signal.SIGTERM)
    
        # Killing the main thread
        os.kill(pid, signal.SIGTERM)
    
    
    def handle_term(signum, frame):
        raise TerminateExecution()
    
    
    def invoke_with_timeout(timeout, fn, *args, **kwargs):
        # Setting a sigterm handler and initiating a timer
        old_handler = signal.signal(signal.SIGTERM, handle_term)
        timer = threading.Timer(timeout, quit_function, args=[os.getpid()])
        terminate = False
    
        # Executing the function
        timer.start()
        try:
            result = fn(*args, **kwargs)
        except TerminateExecution:
            terminate = True
        finally:
            # Restoring original handler and cancel timer
            signal.signal(signal.SIGTERM, old_handler)
            timer.cancel()
    
        if terminate:
            raise BaseException("xxx")
    
        return result
    
    ### Test cases
    def countdown(n):
        print('countdown started', flush=True)
        for i in range(n, -1, -1):
            print(i, end=', ', flush=True)
            time.sleep(1)
        print('countdown finished')
        return 1337
    
    
    def really_long_function():
        time.sleep(10)
    
    
    def really_long_function2():
        os.system("sleep 787")
    
    
    # Checking that we can run a function as expected.
    assert invoke_with_timeout(3, countdown, 1) == 1337
    
    # Testing various scenarios
    t1 = time.time()
    try:
        print(invoke_with_timeout(1, countdown, 3))
        assert(False)
    except BaseException:
        assert(time.time() - t1 < 1.1)
        print("All good", time.time() - t1)
    
    t1 = time.time()
    try:
        print(invoke_with_timeout(1, really_long_function2))
        assert(False)
    except BaseException:
        assert(time.time() - t1 < 1.1)
        print("All good", time.time() - t1)
    
    
    t1 = time.time()
    try:
        print(invoke_with_timeout(1, really_long_function))
        assert(False)
    except BaseException:
        assert(time.time() - t1 < 1.1)
        print("All good", time.time() - t1)
    
    # Checking that classes are referenced and not
    # copied (as would be the case with multiprocessing)
    
    
    class X:
        def __init__(self):
            self.value = 0
    
        def set(self, v):
            self.value = v
    
    
    x = X()
    invoke_with_timeout(2, x.set, 9)
    assert x.value == 9