这是我的@pythonetc feed中的Python技巧和编程的第12个集合。
←
以前的收藏您不能通过简单的赋值来更改闭包变量。 Python将赋值视为函数体内的定义,根本不做闭包。
工作正常,显示
2
:
def make_closure(x): def closure(): print(x) return closure make_closure(2)()
并且此代码引发
UnboundLocalError: local variable 'x' referenced before assignment
:
def make_closure(x): def closure(): print(x) x *= 2 print(x) return closure make_closure(2)()
要使代码正常工作,请使用
nonlocal
。 这明确告诉解释器不要将分配视为定义:
def make_closure(x): def closure(): nonlocal x print(x) x *= 2 print(x) return closure make_closure(2)()
有时在迭代过程中,您需要找出要处理的元素,第一个还是最后一个。 可以使用显式标志轻松确定:
def sparse_list(iterable, num_of_zeros=1): result = [] zeros = [0 for _ in range(num_of_zeros)] first = True for x in iterable: if not first: result += zeros result.append(x) first = False return result assert sparse_list([1, 2, 3], 2) == [ 1, 0, 0, 2, 0, 0, 3, ]
当然,您可以处理循环外的第一个元素。 这看起来更干净,但是会导致部分代码重复。 另外,在使用抽象
iterable
时,要做到这一点并非易事:
def sparse_list(iterable, num_of_zeros=1): result = [] zeros = [0 for _ in range(num_of_zeros)] iterator = iter(iterable) try: result.append(next(iterator)) except StopIteration: return [] for x in iterator: result += zeros result.append(x) return result
您还可以使用
enumerate
并执行
i == 0
检查(它仅用于确定第一个元素,而不是最后一个元素),但是,最好的解决方案是使用返回
iterable
元素的
first
和
last
标志的生成器:
def first_last_iter(iterable): iterator = iter(iterable) first = True last = False while not last: if first: try: current = next(iterator) except StopIteration: return else: current = next_one try: next_one = next(iterator) except StopIteration: last = True yield (first, last, current) first = False
现在原始功能可能如下所示:
def sparse_list(iterable, num_of_zeros=1): result = [] zeros = [0 for _ in range(num_of_zeros)] for first, last, x in first_last_iter(iterable): if not first: result += zeros result.append(x) return result
如果您需要测量两个事件之间经过的时间,请使用
time.monotonic()
代替
time.time()
。 即使更新系统时钟,
time.monotonic()
也不会在较小的方向上变化:
from contextlib import contextmanager import time @contextmanager def timeit(): start = time.monotonic() yield print(time.monotonic() - start) def main(): with timeit(): time.sleep(2) main()
嵌套的上下文管理器通常不知道它们是嵌套的。 您可以通过使用外部经理创建内部经理来告诉他们:
from contextlib import AbstractContextManager import time class TimeItContextManager(AbstractContextManager): def __init__(self, name, parent=None): super().__init__() self._name = name self._parent = parent self._start = None self._substracted = 0 def __enter__(self): self._start = time.monotonic() return self def __exit__(self, exc_type, exc_value, traceback): delta = time.monotonic() - self._start if self._parent is not None: self._parent.substract(delta) print(self._name, 'total', delta) print(self._name, 'outer', delta - self._substracted) return False def child(self, name): return type(self)(name, parent=self) def substract(self, n): self._substracted += n timeit = TimeItContextManager def main(): with timeit('large') as large_t: with large_t.child('medium') as medium_t: with medium_t.child('small-1'): time.sleep(1) with medium_t.child('small-2'): time.sleep(1) time.sleep(1) time.sleep(1) main()
当您需要在调用链上传递信息时,首先想到的是以功能参数的形式传递数据。
在某些情况下,修改链中的所有功能以传输新数据可能会更加方便。 而是可以指定一个上下文,该上下文将被链中的所有函数使用。 怎么做?
最简单的解决方案是使用全局变量。 在Python中,您也可以将模块和类用作上下文保持器,因为严格来说,它们也是全局变量。 您可能已经定期执行此操作,例如用于日记。
如果您的应用程序是多线程的,那么普通的全局变量将对您不起作用,因为它们不是线程安全的。 在每个时间点,您可以有多个呼叫链,每个呼叫链都需要自己的上下文。
threading
模块将为您提供帮助,它提供了
threading.local()
对象,该对象是线程安全的。 您可以通过简单地访问以下属性来将数据存储在其中:
threading.local().symbol = '@'
。
但是,这两种描述的方法都不是并发安全的,也就是说,它们都不适合Coroutine调用链,在Coroutine调用链中,系统不仅调用函数,而且还希望它们被执行。 当协程执行
await
,事件流可能会触发来自不同链的另一个协程。 这将不起作用:
import asyncio import sys global_symbol = '.' async def indication(timeout): while True: print(global_symbol, end='') sys.stdout.flush() await asyncio.sleep(timeout) async def sleep(t, indication_t, symbol='.'): loop = asyncio.get_event_loop() global global_symbol global_symbol = symbol task = loop.create_task( indication(indication_t) ) await asyncio.sleep(t) task.cancel() loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.gather( sleep(1, 0.1, '0'), sleep(1, 0.1, 'a'), sleep(1, 0.1, 'b'), sleep(1, 0.1, 'c'), ))
您可以通过在每次协程之间切换时强制循环设置和还原上下文来解决此问题。 您可以使用
contextvars
模块实现此行为,该模块自Python 3.7起可用。
import asyncio import sys import contextvars global_symbol = contextvars.ContextVar('symbol') async def indication(timeout): while True: print(global_symbol.get(), end='') sys.stdout.flush() await asyncio.sleep(timeout) async def sleep(t, indication_t, symbol='.'): loop = asyncio.get_event_loop() global_symbol.set(symbol) task = loop.create_task(indication(indication_t)) await asyncio.sleep(t) task.cancel() loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.gather( sleep(1, 0.1, '0'), sleep(1, 0.1, 'a'), sleep(1, 0.1, 'b'), sleep(1, 0.1, 'c'), ))