def run(self): """Starts or resumes the generator, running until it reaches a yield point that is not ready. """ if self.running or self.finished: return try: self.running = True while True: future = self.future
# 当前 future 没有完成时直接返回,等待 IOLoop 在 future 完成后回调再执行 if not future.done(): return
# Tornado 4.0 之前使用 YieldPoint 驱动,Callback 与 Wait/WaitAll # 协调时,Callback 的回调结果需要 Runner 作为中转站,通过 # Runner.register_callback(key) 登记 Callback ,再通过 # YieldPoint.result_callback(key) 取回“设置(回调)方法”, # 外部通过“设置(回调)方法”把结果保存到 Runner.results 字典中。 # Wait/WaitAll 通过 get_result(key) 取回 结果。 # YieldFuture 的实现也采用了相同的实现方式。 # Tornado 4.0 之后使用 Future 代替 YieldPoint,这些已经过时。 # 与 Yield 相关的代码都是为了向后兼容。 if self.pending_callbacks and not self.had_exception: # If we ran cleanly without waiting on all callbacks # raise an error (really more of a warning). If we # had an exception then some callbacks may have been # orphaned, so skip the check in that case. raise LeakedCallbackError( "finished without waiting for callbacks %r" % self.pending_callbacks) self.result_future.set_result(getattr(e, 'value', None)) self.result_future = None self._deactivate_stack_context() return except Exception: self.finished = True self.future = _null_future self.result_future.set_exc_info(sys.exc_info()) self.result_future = None self._deactivate_stack_context() return
# 继续处理 yield 表达式结果 if not self.handle_yield(yielded): return finally: self.running = False
def handle_yield(self, yielded): # 为了保持向后兼容,需要对多个 YieldPonit 和 Future 的混合集合做处理。 # 对于全是 Future 的集合类型使用新的 multi_future 函数进行封装处理; # 不全是的使用 Multi 类进行封装,对于 Future 提供了 YieldFuture 适配器类。 # 详细的实现细节见 YieldFuture、Multi的实现代码。 # 若需要 run() 循环立即处理该 YieldPoint(被启动)/Future(已经完成) 则返 # 回 True,否则返回 False。 if isinstance(yielded, list): if all(is_future(f) for f in yielded): yielded = multi_future(yielded) else: yielded = Multi(yielded) elif isinstance(yielded, dict): if all(is_future(f) for f in yielded.values()): yielded = multi_future(yielded) else: yielded = Multi(yielded)
# 针对第一个 YieldPoint 使用一个 ExceptionStackContext 上下文来处理 # StackContexts 中没有处理的异常,将未处理的异常记录到 result_future 中。 # 对于 Future 对象则没有必要, Future 提供了方法来记录异常和异常堆栈信息, # 在 Future 完成后通过其 result() 方法获取结果(在 run 方法的调用)时会 # 再次抛出异常,这时可捕获记录到 result_future 中。 if isinstance(yielded, YieldPoint): self.future = TracebackFuture() def start_yield_point(): try: yielded.start(self) # 如果 yielded 已经完成,则将其结果赋值给 self.future,等待 run 循环处理; # 若未就绪,则需要通过 Runner.set_result(key, value) 来进行赋值操作。 if yielded.is_ready(): self.future.set_result( yielded.get_result()) else: self.yield_point = yielded except Exception: self.future = TracebackFuture() self.future.set_exc_info(sys.exc_info()) if self.stack_context_deactivate is None: # Start a stack context if this is the first # YieldPoint we've seen. with stack_context.ExceptionStackContext( self.handle_exception) as deactivate: self.stack_context_deactivate = deactivate def cb(): start_yield_point() self.run() # 第 1 个 yielded 交由 IOLoop来启动 self.io_loop.add_callback(cb) return False else: # 启动 YieldPoint,需要返回 True,在 run 循环中继续处理 start_yield_point() elif is_future(yielded): self.future = yielded # self.future 完成后继续 self.run() # moment = Future() 是一个特殊的对象,主要用在需要长时间执行的 coroutine 中, # 通过 “yield gen.moment” 中断当前 coroutine ,将控制权交给 IOLoop 去轮询。 # 等效于当前 coroutine 临时放弃时间片,给了其他 callback 机会运行。 if not self.future.done() or self.future is moment: self.io_loop.add_future( self.future, lambda f: self.run()) return False else: self.future = TracebackFuture() self.future.set_exception(BadYieldError( "yielded unknown object %r" % (yielded,))) return True
def __init__(self, gen, result_future, first_yielded): self.gen = genreturn_futurereturn_future self.result_future = result_future self.future = _null_future self.yield_point = None self.pending_callbacks = None self.results = None self.running = False self.finished = False self.had_exception = False self.io_loop = IOLoop.current() # For efficiency, we do not create a stack context until we # reach a YieldPoint (stack contexts are required for the historical # semantics of YieldPoints, but not for Futures). When we have # done so, this field will be set and must be called at the end # of the coroutine. self.stack_context_deactivate = None if self.handle_yield(first_yielded): self.run()
def _argument_adapter(callback): """Returns a function that when invoked runs ``callback`` with one arg.
If the function returned by this function is called with exactly one argument, that argument is passed to ``callback``. Otherwise the args tuple and kwargs dict are wrapped in an `Arguments` object. """ def wrapper(*args, **kwargs): if kwargs or len(args) > 1: callback(Arguments(args, kwargs)) elif args: callback(args[0]) else: callback(None) return wrapper