@Pythonetc compilation, juillet 2019


Ceci est la douzième collection de conseils et de programmation Python de mon flux @pythonetc.

Collections précédentes


Vous ne pouvez pas modifier les variables de fermeture avec une simple affectation. Python considère l'affectation comme une définition à l'intérieur d'un corps de fonction et ne fait pas du tout de fermeture.

Fonctionne bien, affiche 2 :

 def make_closure(x):    def closure():        print(x)    return closure make_closure(2)() 

Et ce code renvoie une UnboundLocalError: local variable 'x' referenced before assignment :

 def make_closure(x):    def closure():        print(x)        x *= 2        print(x)    return closure make_closure(2)() 


Pour faire fonctionner le code, utilisez nonlocal . Cela indique explicitement à l'interpréteur de ne pas considérer l'affectation comme une définition:

 def make_closure(x):    def closure():        nonlocal x        print(x)        x *= 2        print(x)    return closure make_closure(2)() 


Parfois, lors d'une itération, vous devez savoir quel élément est traité, en premier ou en dernier. Cela peut être facilement déterminé à l'aide d'un indicateur explicite:

 def sparse_list(iterable, num_of_zeros=1):    result = []    zeros = [0 for _ in range(num_of_zeros)]    first = True    for x in iterable:        if not first:            result += zeros        result.append(x)        first = False    return result assert sparse_list([1, 2, 3], 2) == [    1,    0, 0,    2,    0, 0,    3, ] 

Bien sûr, vous pouvez gérer le premier élément en dehors de la boucle. Cela semble plus propre, mais conduit à une duplication partielle du code. De plus, il ne sera pas si facile de le faire lorsque vous travaillez avec un résumé iterable :

 def sparse_list(iterable, num_of_zeros=1):    result = []    zeros = [0 for _ in range(num_of_zeros)]    iterator = iter(iterable)    try:       result.append(next(iterator))    except StopIteration:        return []    for x in iterator:       result += zeros       result.append(x)    return result 

Vous pouvez également utiliser enumerate et effectuer une vérification i == 0 (cela ne fonctionne que pour déterminer le premier élément, pas le dernier), cependant, la meilleure solution serait un générateur qui retourne les first et last drapeaux avec l'élément iterable :

 def first_last_iter(iterable):    iterator = iter(iterable)    first = True    last = False    while not last:    if first:        try:            current = next(iterator)            except StopIteration:                return    else:        current = next_one    try:        next_one = next(iterator)    except StopIteration:        last = True   yield (first, last, current)    first = False 

Maintenant, la fonction d'origine pourrait ressembler à ceci:

 def sparse_list(iterable, num_of_zeros=1):    result = []    zeros = [0 for _ in range(num_of_zeros)]    for first, last, x in first_last_iter(iterable):        if not first:            result += zeros        result.append(x)    return result 


Si vous devez mesurer le temps écoulé entre deux événements, utilisez time.monotonic() au lieu de time.time() . time.monotonic() ne change jamais dans la petite direction, même lors de la mise à jour de l'horloge système:

 from contextlib import contextmanager import time @contextmanager def timeit():    start = time.monotonic()    yield    print(time.monotonic() - start) def main():    with timeit():        time.sleep(2) main() 


Les gestionnaires de contexte imbriqués ne savent généralement pas qu'ils sont imbriqués. Vous pouvez leur en parler en créant des gestionnaires internes à l'aide d'un gestionnaire externe:

 from contextlib import AbstractContextManager import time class TimeItContextManager(AbstractContextManager):    def __init__(self, name, parent=None):    super().__init__()    self._name = name    self._parent = parent    self._start = None    self._substracted = 0    def __enter__(self):    self._start = time.monotonic()    return self          def __exit__(self, exc_type, exc_value, traceback):    delta = time.monotonic() - self._start    if self._parent is not None:           self._parent.substract(delta)    print(self._name, 'total', delta)    print(self._name, 'outer', delta - self._substracted)    return False    def child(self, name):    return type(self)(name, parent=self)    def substract(self, n):    self._substracted += n timeit = TimeItContextManager def main():    with timeit('large') as large_t:    with large_t.child('medium') as medium_t:        with medium_t.child('small-1'):            time.sleep(1)        with medium_t.child('small-2'):            time.sleep(1)        time.sleep(1)    time.sleep(1) main() 


Lorsque vous devez transmettre des informations sur une chaîne d'appels, la première chose qui vous vient à l'esprit est de transmettre des données sous forme d'arguments de fonction.

Dans certains cas, il peut être beaucoup plus pratique de modifier toutes les fonctions de la chaîne pour transférer une nouvelle donnée. Au lieu de cela, vous pouvez spécifier un contexte qui sera utilisé par toutes les fonctions de la chaîne. Comment faire

La solution la plus simple consiste à utiliser une variable globale. En Python, vous pouvez également utiliser des modules et des classes comme gardes du contexte car, à proprement parler, ce sont aussi des variables globales. Vous le faites probablement déjà régulièrement, par exemple pour la journalisation.

Si votre application est multithread, les variables globales ordinaires ne fonctionneront pas pour vous, car elles ne sont pas thread-safe. À chaque instant, vous pouvez avoir plusieurs chaînes d'appels et chacune d'elles a besoin de son propre contexte. Le module de threading vous aidera, il fournit un objet threading.local() , qui est thread-safe. Vous pouvez y stocker des données avec un accès simple aux attributs: threading.local().symbol = '@' .

Cependant, les deux approches décrites ne sont pas sécurisées pour la concurrence, c'est-à-dire qu'elles ne conviennent pas à la chaîne d'appels Coroutine, dans laquelle le système appelle non seulement des fonctions, mais attend également qu'elles soient exécutées. Lorsqu'une coroutine s'exécute en await , un flux d'événements peut déclencher une autre coroutine à partir d'une chaîne différente. Cela ne fonctionnera pas:

 import asyncio import sys global_symbol = '.' async def indication(timeout):    while True:       print(global_symbol, end='')       sys.stdout.flush()       await asyncio.sleep(timeout) async def sleep(t, indication_t, symbol='.'):    loop = asyncio.get_event_loop()    global global_symbol    global_symbol = symbol    task = loop.create_task(          indication(indication_t)    )    await asyncio.sleep(t)    task.cancel() loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.gather(    sleep(1, 0.1, '0'),    sleep(1, 0.1, 'a'),    sleep(1, 0.1, 'b'),    sleep(1, 0.1, 'c'), )) 

Vous pouvez résoudre ce problème en forçant le cycle à définir et à restaurer le contexte chaque fois que vous basculez entre les coroutines. Vous pouvez implémenter ce comportement en utilisant le module contextvars , qui est disponible depuis Python 3.7.

 import asyncio import sys import contextvars global_symbol = contextvars.ContextVar('symbol') async def indication(timeout):    while True:       print(global_symbol.get(), end='')        sys.stdout.flush()       await asyncio.sleep(timeout) async def sleep(t, indication_t, symbol='.'):    loop = asyncio.get_event_loop()    global_symbol.set(symbol)    task = loop.create_task(indication(indication_t))    await asyncio.sleep(t)    task.cancel() loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.gather(    sleep(1, 0.1, '0'),    sleep(1, 0.1, 'a'),    sleep(1, 0.1, 'b'),    sleep(1, 0.1, 'c'), )) 

Source: https://habr.com/ru/post/fr462311/


All Articles