Django Channels-现代网络的答案

在Django的世界中,Django Channels插件越来越受欢迎。 该库应将我们一直在等待的异步网络编程带给Django。 在莫斯科Python Conf 2017上的Artyom Malyshev解释了该库的第一个版本是如何做到的(现在,作者已经压缩了通道2),为什么要这样做以及完全要这样做。

首先,Zen Zen表示任何解决方案都应该是唯一的解决方案。 因此, 在Python中,每个至少三个 。 已经有很多网络异步框架:

  • 扭曲的
  • 事件事件
  • 格文特
  • 龙卷风;
  • 异步

看来,为什么要编写另一个库,以及是否完全有必要。


关于演讲者: Artyom Malyshev是一名独立的Python开发人员。 他从事分布式系统的开发,在有关Python的会议上发表演讲。 可以在Github和社交网络上以昵称PROOFIT404找到Artyom。

Django按照定义是同步的 。 如果我们谈论的是ORM,那么在属性访问期间(例如,编写post.author.username时)同步访问数据库不会花费任何费用。

另外,Django是WSGI框架。

WSGI


WSGI是用于Web服务器的同步接口。

def app (environ, callback) : status, headers = '200 OK', [] callback (status, headers) return ['Hello world!\n'] 

它的主要特征是我们有一个接受参数并立即返回值的函数。 这就是Web服务器可以向我们期望的一切。 无异步,无异味

这是在很久以前(即2003年)进行的,当时网络非常简单,用户可以在Internet上阅读各种新闻,并进入留言簿。 仅接受请求并处理它就足够了。 给出答案,然后完全忘记该用户。


但是,有一秒钟,现在不是2003年,因此用户希望从我们这里得到更多。

他们想要丰富的Web应用程序,实时内容,他们希望应用程序在台式机,笔记本电脑,其他台式机和时钟上都能正常运行。 最重要的是, 用户不想按F5 ,因为例如平板电脑没有这样的按钮。



Web浏览器自然会遇到我们-它们添加了新的协议和新功能。 如果您和我只是开发前端,那么我们只是将浏览器作为平台并使用其核心功能,因为它已准备好向我们提供它们。

但是,对于后端程序员而言,一切都发生了很大变化 。 Web套接字,HTTP2等在体系结构方面是一个巨大的痛苦,因为它们是状态长期存在的连接,需要以某种方式进行处理。


这是Django的Django渠道正试图解决的问题。 该库旨在为您提供处理连接的能力,而我们习惯使用的Django Core完全不变。

他的口语很快,可怕的英语口音的所有者安德鲁·戈德温Andrew Godwin )将他变成了一个很棒的人。 您应该通过早已被遗忘的Django South和Django Migrations之类的1.7版本了解它。 自从为Django修复迁移问题以来,他就开始修复Web套接字和HTTP2。

他是怎么做到的? 曾几何时,互联网上有这样的图片:空的正方形,箭头,“好的建筑”字样-您在这些小正方形中输入自己喜欢的技术,就会得到一个可扩展的网站。



安德鲁·戈德温(Andrew Godwin)在这些框中输入了一个服务器,该服务器位于前面,可以接受任何请求,无论它们是异步,同步,电子邮件还是其他。 它们之间是所谓的通道层,它以同步工作程序池可以访问的格式存储收到的消息。 一旦异步连接向我们发送了一些东西,我们就将其记录在Channel Layer中,然后同步工作程序可以从那里拾取它,并以与任何Django View或其他任何视图相同的方式同步处理它。 同步代码将响应发送回通道层后,异步服务器将立即给予响应,流式传输,并执行所需的任何操作。 因此,执行抽象。

这意味着有几种实现方式,在生产中,建议将Twisted用作实现Django前端的异步服务器 ,以及Redis ,这将是同步Django和异步Twisted之间的相同通信渠道。

好消息:要使用Django渠道,您根本不需要了解Twisted或Redis,这些都是实现细节。 您的DevOps会知道这一点,或者当您在凌晨三点修理下降的产品时,您将见面。

ASGI


抽象是称为ASGI的协议。 这是位于任何网络接口,服务器(无论是同步协议还是异步协议)与应用程序之间的标准接口。 它的主要概念是渠道。

频道


通道是具有生存期的消息的先进先出队列。 这些消息可以零次发送一次,也只能由一个消费者接收。

消费者


在Consumer,您只是在编写代码。

 def ws_message (message) : message.reply_channel.send ( { 'text': message.content ['text'], } ) 

接受消息的功能可能会发送多个响应,或者可能根本不发送响应。 与视图非常相似,唯一的区别是没有返回函数,因此我们可以讨论从该函数返回的答案。

我们将此功能添加到路由中,例如,将其挂起以在Web套接字上接收消息。

 from channels.routing import route from myapp.consumers import ws_message channel_routing = [ route ('websocket.receive' ws_message), } 

就像规定数据库一样,我们在Django设置中编写该代码。

 CHANNEL_LAYERS = { 'default': { 'BACKEND': 'asgiref.inmemory', 'ROUTING': 'myproject.routing', }, } 

一个项目可以具有多个通道层,就像可以有多个数据库一样。 如果有人使用过,这东西与db路由器非常相似。

接下来,我们定义ASGI应用程序。 它可以同步Twisted的启动方式和同步的工作程序的启动方式-他们都需要此应用程序。

 import os from channels.asgi import get_channel_layer os.environ.setdefault( 'DJANGO_SETTINGS_MODULE', 'myproject.settings', ) channel_layer = get_channel_layer() 

然后,部署代码:运行gunicorn,按照惯例,与视图同步发送HTTP请求。 我们启动异步服务器,该服务器将在同步Django的最前面,并启动处理消息的工作人员。

 $ gunicorn myproject.wsgi $ daphne myproject.asgi:channel_layer $ django-admin runworker 

回复频道


如我们所见,消息具有类似回复通道的概念。 为什么需要这个?

单向通道,分别是WebSocket接收,WebSocket连接,WebSocket断开-这是系统中用于接收消息的公共通道。 回复通道是严格与用户连接相关的通道。 因此,消息具有输入和输出通道。 这对可让您识别此消息的来源。


团体


一组是渠道的集合。 如果我们向某组发送一条消息,那么它将自动发送到该组的所有通道。 这很方便,因为没人喜欢编写循环。 另外,组的实现通常是使用Channel层的本机功能完成的,因此它比一次仅发送一条消息要快。

 from channels import Group def ws_connect (message): Group ('chat').add (message.reply_channel) def ws_disconnect (message): Group ('chat').discard(message.reply_channel) def ws_message (message): Group ('chat'). Send ({ 'text': message.content ['text'], }) 

组也以相同的方式添加到路由中。

 from channels.routing import route from myapp.consumers import * channel_routing = [ route ('websocket.connect' , ws_connect), route ('websocket.disconnect' , ws_disconnect), route ('websocket.receive' , ws_message), ] 

将该频道添加到组中后,答复将发送给连接到我们站点的所有用户,而不仅仅是对我们自己的回声答复。

普通消费者


我爱Django的目的是声明性的。 同样,有声明性的使用者。

基本使用者是一个基本的使用者,它只能映射您在某种方法上定义的通道并调用它。

 from channels.generic import BaseConsumer class MyComsumer (BaseConsumer) : method_mapping = { 'channel.name.here': 'method_name', } def method_name (self, message, **kwargs) : pass 

有大量具有故意增强的行为的预定义使用者,例如WebSocket使用者,它预先确定它将处理WebSocket连接,WebSocket接收和WebSocket断开连接。 您可以立即指出要在哪个组中添加回复频道,一旦您使用self.send,他就会知道是将其发送给一个组还是发送给一个用户。

 from channels.generic import WebsocketConsumer class MyConsumer (WebsocketConsumer) : def connection_groups (self) : return ['chat'] def connect (self, message) : pass def receive (self, text=None, bytes=None) : self.send (text=text, bytes=bytes) 

还有一个带有JSON的WebSocket使用者选项,即不是文本,不是字节,但是已经解析的JSON可以接收,这很方便。

在路由中,它通过route_class以相同的方式添加。 Myapp在route_class中获取,该路径由使用者确定,所有通道均从此处获取,并且myapp中指定的所有通道均已路由。 用这种方式少写。

路由选择


让我们详细讨论路由及其为我们提供的功能。

首先,这些是过滤器。

 // app.js S = new WebSocket ('ws://localhost:8000/chat/') # routing.py route('websocket.connect', ws_connect, path=r'^/chat/$') 

这可以是Web套接字连接URI或http请求方法提供给我们的路径。 它可以是渠道中的任何消息字段,例如,对于电子邮件:文本,正文,复本等等。 route的关键字参数数目是任意的。

路由允许您创建嵌套路由。 如果几个消费者是由某些共同特征决定的,则将他们分组并立即将每个人添加到路由中将很方便。

 from channels import route, include blog_routes = [ route ( 'websocket.connect', blog, path = r'^/stream/') , ] routing = [ include (blog_routes, path= r'^/blog' ), ] 

多路复用


如果我们打开多个Web套接字,每个Web套接字都有一个不同的URI,那么我们可以在其上悬挂多个处理程序。 但是说实话,打开多个连接只是为了在后端做一些漂亮的事情,这看起来并不像一种工程方法。

因此,可以在一个Web套接字上调用多个处理程序。 我们定义了一个WebsocketDemultiplexer,它在单个Web套接字内基于流的概念进行操作。 通过此流,它将把您的消息重定向到另一个频道。

 from channels import WebsocketDemultiplexer class Demultiplexer (WebsocketDemultiplexer) : mapping = { 'intval': 'binding.intval', } 

在路由中,以与任何其他声明性使用者route_class相同的方式添加多路复用器。

 from channels import route_class, route from .consumers import Demultiplexer, ws_message channel_routing = [ route_class (Demultiplexer, path='^/binding/') , route ('binding.intval', ws_message ) , ] 

将流参数添加到消息中,以便多路复用器可以弄清楚将给定消息放置在何处。 有效负载参数包含多路复用器处理它后进入通道的所有内容。

需要特别注意的是,在通道层中,消息将获得两次 :多路复用器之前和之后。 因此,一旦开始使用多路复用器,就会自动将延迟添加到请求中。

 { "stream" : "intval", "payload" : { … } } 

届会


每个通道都有自己的会话。 这是非常方便的事情,例如,在处理程序调用之间存储状态。 您可以按回复渠道对它们进行分组,因为这是属于用户的标识符。 该会话与常规http会话存储在同一引擎中。 出于明显的原因,不支持签名的cookie,它们根本不在Web套接字中。

 from channels.sessions import channel_session @channel_session def ws_connect(message) : room=message.content ['path'] message.channel_session ['room'] = room Croup ('chat-%s' % room).add ( message.reply_channel ) 

在连接期间,您可以获取一个http会话,并在您的使用者中使用它。 作为协商过程的一部分,建立Web套接字连接时,会将cookie发送给用户。 因此,因此,您可以获得用户会话,获得以前在Django中使用的用户对象,就像在使用view一样。

 from channels.sessions import http_session_user @http_session_user def ws_connect(message) : message.http_session ['room'] = room if message.user.username : … 

留言顺序


渠道可以解决一个非常重要的问题。 如果我们建立与Web套接字的连接并立即发送,则会导致以下事实:两个事件-WebSocket connect和WebSocket receive-在时间上非常接近。 这些Web套接字的使用者很可能会并行运行。 调试它会很有趣。

Django通道允许您输入两种类型的锁:

  1. 。 使用会话机制,我们保证在处理消费者以接收消息之前,我们不会在Web套接字上处理任何消息。 建立连接后,顺序是任意的,可以并行执行。
  2. -一次只有一个特定用户的使用者正在运行。 这是同步的开销,因为它使用了慢速会话引擎。 尽管如此,还是有这样的机会。

 from channels.generic import WebsocketConsumer class MyConsumer(WebsocketConsumer) : http_user = True slight_ordering = True strict_ordering = False def connection_groups (self, **kwargs) : return ['chat'] 

为了编写此内容,我们在http会话,通道会话中看到了与前面相同的装饰器。 在声明性使用者中,您可以简单地编写属性,一旦编写属性,它将自动应用于该使用者的所有方法。

资料绑定


一次,Meteor以数据绑定而闻名。

我们打开两个浏览器,进入同一页面,然后在其中一个中单击滚动条。 同时,在此页面上的第二个浏览器中,滚动条更改其值。 太酷了

 class IntegerValueBinding (WebsocketBinding) : model = IntegerValue stream = intval' fields= ['name', 'value'] def group_names (self, instance, action ) : return ['intval-updates'] def has_permission (self, user, action, pk) : return True 

Django现在也做同样的事情。

这是使用Django Signals提供的钩子实现的。 如果为模型定义了绑定,则将为每个事件通知该模型实例的组中的所有连接。 我们创建了一个模型,更改了模型,将其删除-所有这些都会是警告。 通知发生在指示的字段上:该字段的值已更改-正在通过Web套接字发送有效负载。 这很方便。

重要的是要理解,如果在我们的示例中不断单击滚动条,则消息将不断出现,并且模型将被保存。 这将一直工作到一定的负载,然后一切都靠在基座上。

Redis层


让我们再谈谈最受欢迎的制作频道层-Redis的排列方式。

安排得井井有条:

  • 与工人级别的同步连接一起工作;
  • 对Twisted非常友好,不会在特别需要的地方(即在您的前端服务器上)放慢速度;
  • MSGPACK用于在Redis内部序列化消息,从而减少了每条消息的占用空间;
  • 您可以将负载分配给多个Redis实例,然后会使用一致的哈希算法自动对其进行混洗。 因此,单点故障消失了。

频道只是Redis的ID列表。 ID是特定消息的值。 这样做是为了使您可以分别控制每个消息和通道的寿命。 原则上,这是合乎逻辑的。

 >> SET "b6dc0dfce" " \x81\xa4text\xachello" >> RPUSH "websocket.send!sGOpfny" "b6dc0dfce" >> EXPIRE "b6dc0dfce" "60" >> EXPIRE "websocket.send!sGOpfny" "61" 

组通过排序集实现。 分发到组是在Lua脚本中执行的-这非常快。

 >> type group:chat zset >> ZRANGE group:chat 0 1 WITHSCORES 1) "websocket.send!sGOpfny" 2) "1476199781.8159261" 

问题


让我们看看这种方法有什么问题。

回调地狱


第一个问题是新发明的回调地狱。 非常重要的一点是要理解,您将遇到的渠道中的大多数问题都是风格上的:争执引起了消费者的期待,这是他所料想不到的。 他们来自哪里,将他们放在Redis上-所有这都是一项可疑的调查任务。 调试分布式系统通常是出于坚强。 AsyncIO解决了这个问题。

芹菜


在互联网上,他们写道Django Channels可以代替Celery。

我对你有个坏消息-不,不是这样。

在频道中:

  • 不重试,您不能延迟处理程序的执行;
  • 没有画布-只是回调。 Celery还提供了组,链,这是我最喜欢的和弦,在并行执行组之后,它会引起另一个具有同步的回调。 所有这些都不在渠道中。
  • 没有消息到达时间的设置,某些没有此设置的系统根本无法设计。

我将未来视为官方支持,以最小的成本和最小的努力一起使用频道和芹菜。 但是Django Channels不能代替Celery。

适用于现代网络的Django


Django Channels是现代网络的Django。 这是我们都习惯使用的Django:同步,声明式,有很多电池。 Django Channels仅需加一个电池。 您始终需要了解在哪里使用它以及是否值得这样做。 如果项目中不需要Django,则那里也不需要通道。 它们仅在证明Django合理的项目中有用。

莫斯科Python Conf ++

面向Python开发人员的专业会议再上一个新台阶-2018 年10月22日至23日,我们将聚集俄罗斯600位最优秀的Python程序员,发表最有趣的报告,当然还要在Ontiko团队的支持下为莫斯科Python社区的最佳传统提供一个交流环境。

我们邀请专家进行报告。 计划委员会已经开始运作,并且接受申请直到9月7日。

对于参与者,正在进行在线头脑风暴计划。 您可以立即将丢失的主题添加到本文档中,或者立即添加您感兴趣的演讲者。 实际上,该文档将一直进行更新,您可以随时跟随程序的形成。

Source: https://habr.com/ru/post/zh-CN418445/


All Articles