将PHP后端传输到Redis流总线并选择独立于框架的库



前言


我的网站是我的爱好,旨在存储有趣的主页和个人网站。 这个话题在我编程生涯的开始就引起了我的兴趣,那时我很高兴找到优秀的专业人士,他们会为自己,自己的爱好和项目做文章。 到目前为止,发现它们的习惯仍然存在:在几乎每个商业网站上,而不是非常网站上,我都继续在页脚中寻找作者链接。

想法的实施


第一个版本只是我个人网站上的html页面,我将带有签名的链接放在ul列表中。 键入了20页一段时间后,我开始认为它不是很有效,因此决定尝试使该过程自动化。 在stackoverflow上,我注意到许多人在其配置文件中指示了站点,因此我写了一个php解析器,从第一个(SO上的地址到今天这样的地址:/ users / 1)开始浏览了这些配置文件,提取了链接从所需的标签,并堆叠在SQLite中。

这可以称为第二个版本:SQLite板中成千上万个URL的集合,该集合替换了html中的静态列表。 我在此列表上进行了简单搜索。 因为 只有网址,然后搜索仅针对它们。

在很长一段时间后,在这个阶段,我放弃了该项目并返回了该项目。 在这个阶段,我的工作经验已经超过三年,我觉得自己可以做的更认真。 另外,人们迫切希望自己掌握相对较新的技术。

现代版本


该项目已部署在docker中,数据库已转移到mongoDb,并且相对较新地添加了萝卜,起初只是用于缓存。 作为基础,使用了一个PHP微框架。

问题


通过控制台命令添加新站点,该命令同步执行以下操作:

  • 通过URL下载内容
  • 标记HTTPS是否可用
  • 保留网站的本质
  • 源HTML和标头保存到索引历史记录
  • 解析内容,检索标题和描述
  • 将数据保存到单独的集合中。

仅存储站点并将其显示在列表中就足够了:



但是,自动对所有内容进行索引,分类和排名,使所有内容保持最新状态的想法较弱地适合于此范例。 即使只是添加Web方法来添加页面,也需要代码复制和阻止,以避免潜在的DDoS。

通常,当然,所有操作都可以同步完成,在Web方法中,只需保存URL以确保可怕的守护程序执行列表中URL的所有任务。 但是,即使在这里,“转向”一词也是一样。 而且,如果实现了队列,那么可以划分并至少异步执行所有任务。

解决方案


介绍队列并为所有任务创建事件驱动的处理系统。 而且很长一段时间以来,我都想尝试Redis Streams。

在PHP中使用Redis流


因为 我没有Symfony,Laravel,Yii这三大巨头的框架,我想找到一个独立的图书馆。 但是,事实证明(在第一次检查时),不可能找到各个严肃的图书馆。 与队列相关的所有内容都是5年前3次提交的预测,或者是与框架相关的。

我听说过Symfony是一些有用组件的提供者,并且我已经使用了一些。 同样从Laravel,也可以使用某些东西,例如它们的ORM,而无需框架本身。

symfony / Messenger


第一位候选人立即显得很理想,毫无疑问,我安装了它。 但是,很难在Symfony之外搜索使用示例。 如何从具有通用名称,不说任何名称,发送消息的总线甚至是Redis的类的堆中收集数据?



官方站点上的文档非常详细,但是仅针对Symfony使用其最喜欢的YML和其他非交响乐手的魔术方法对初始化进行了描述。 我对安装过程没有兴趣,尤其是在新年假期期间。 但是我不得不这么做了很长时间。

在截止日期紧迫的情况下,尝试弄清楚如何使用Symfony源实例化系统也不是最琐碎的任务:



反复研究并尝试用手做某事,我得出的结论是我正在拐杖,并决定尝试其他方法。

照亮/排队


事实证明,该库与Laravel基础结构和许多其他依赖关系紧密相关,因此我并没有花费很多时间:安装,查看,看到依赖关系并将其删除。

yiisoft / yii2-queue


好吧,这里是从名称开始立即假定的,再次与Yii2紧密绑定。 我不得不使用这个库,而且还不错,但是我不认为它完全取决于Yii2。

其余的


我在github上发现的所有其他内容都是不可靠的过时且废弃的投影,没有星星,叉子和大量的提交。

返回symfony / Messenger,技术细节


我不得不处理这个库,花了更多时间后,我才能够做到。 事实证明,一切都非常简洁明了。 为了实例化总线,我做了一个小工厂,因为 我有几个轮胎,不同的操作员。



只需几个步骤:

  • 创建应该可以调用的消息处理程序
  • 将它们包装在HandlerDescriptor中(库中的类)
  • 我们将这些“描述符”包装在HandlersLocator实例中。
  • 将HandlersLocator添加到MessageBus实例
  • 在我的例子中,我们将RedisTransport类的实例传递给SendersLocator一组SenderInterface,它们以明显的方式进行配置
  • 将SendersLocator添加到MessageBus实例

MessageBus具有方法--dispatch(),该方法在HandlersLocator中搜索适当的处理程序,并使用相应的“ SenderInterface”将消息传递给它们,以通过总线发送(Redis流)。

在容器配置(在本例中为php-di)中,可以按以下方式配置整个服务器集:

CONTAINER_REDIS_TRANSPORT_SECRET => function (ContainerInterface $c) { return new RedisTransport( $c->get(CONTAINER_REDIS_STREAM_CONNECTION_SECRET), $c->get(CONTAINER_SERIALIZER)) ; }, CONTAINER_REDIS_TRANSPORT_LOG => function (ContainerInterface $c) { return new RedisTransport( $c->get(CONTAINER_REDIS_STREAM_CONNECTION_LOG), $c->get(CONTAINER_SERIALIZER)) ; }, CONTAINER_REDIS_STREAM_RECEIVER_SECRET => function (ContainerInterface $c) { return new RedisReceiver( $c->get(CONTAINER_REDIS_STREAM_CONNECTION_SECRET), $c->get(CONTAINER_SERIALIZER) ); }, CONTAINER_REDIS_STREAM_RECEIVER_LOG => function (ContainerInterface $c) { return new RedisReceiver( $c->get(CONTAINER_REDIS_STREAM_CONNECTION_LOG), $c->get(CONTAINER_SERIALIZER) ); }, CONTAINER_REDIS_STREAM_BUS => function (ContainerInterface $c) { $sendersLocator = new SendersLocator([ \App\Messages\SecretJsonMessages::class => [CONTAINER_REDIS_TRANSPORT_SECRET], \App\Messages\DaemonLogMessage::class => [CONTAINER_REDIS_TRANSPORT_LOG], ], $c); $middleware[] = new SendMessageMiddleware($sendersLocator); return new MessageBus($middleware); }, CONTAINER_REDIS_STREAM_CONNECTION_SECRET => function (ContainerInterface $c) { $host = 'bu-02-redis'; $port = 6379; $dsn = "redis://$host:$port"; $options = [ 'stream' => 'secret', 'group' => 'default', 'consumer' => 'default', ]; return Connection::fromDsn($dsn, $options); }, CONTAINER_REDIS_STREAM_CONNECTION_LOG => function (ContainerInterface $c) { $host = 'bu-02-redis'; $port = 6379; $dsn = "redis://$host:$port"; $options = [ 'stream' => 'log', 'group' => 'default', 'consumer' => 'default', ]; return Connection::fromDsn($dsn, $options); }, 

可以看出,在SendersLocator中,我们为两个不同的消息分配了一个不同的“传输”,每个消息与对应的流都有自己的连接。

我做了一个单独的演示项目,演示了使用这种总线相互通信的三个恶魔的应用程序: https : //github.com/backend-university/products/tree/master/products/02-redis-streams-bus

但我将向您展示如何安排消费者:

 use App\Messages\DaemonLogMessage; use Symfony\Component\Messenger\Handler\HandlerDescriptor; use Symfony\Component\Messenger\Handler\HandlersLocator; use Symfony\Component\Messenger\MessageBus; use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware; use Symfony\Component\Messenger\Middleware\SendMessageMiddleware; use Symfony\Component\Messenger\Transport\Sender\SendersLocator; require_once __DIR__ . '/../vendor/autoload.php'; /** @var \Psr\Container\ContainerInterface $container */ $container = require_once('config/container.php'); $handlers = [ DaemonLogMessage::class => [ new HandlerDescriptor( function (DaemonLogMessage $m) { \error_log('DaemonLogHandler: message handled: / ' . $m->getMessage()); }, ['from_transport' => CONTAINER_REDIS_TRANSPORT_LOG] ) ], ]; $middleware = []; $middleware[] = new HandleMessageMiddleware(new HandlersLocator($handlers)); $sendersLocator = new SendersLocator(['*' => [CONTAINER_REDIS_TRANSPORT_LOG]], $container); $middleware[] = new SendMessageMiddleware($sendersLocator); $bus = new MessageBus($middleware); $receivers = [ CONTAINER_REDIS_TRANSPORT_LOG => $container->get(CONTAINER_REDIS_STREAM_RECEIVER_LOG), ]; $w = new \Symfony\Component\Messenger\Worker($receivers, $bus, $container->get(CONTAINER_EVENT_DISPATCHER)); $w->run(); 

在应用程序中使用此基础架构


在后端实现了总线后,我从旧的同步团队中选择了各个步骤,并创建了单独的处理程序,每个处理程序都从事自己的业务。

用于将新站点添加到数据库的管道如下:



之后,对我来说,添加新功能变得更加容易,例如,提取和解析Rss。 因为 由于此过程还需要源内容,因此rss链接的处理程序-提取程序以及WebsiteIndexHistoryPersistor会订阅消息“ Content / HtmlContent”,对其进行处理,然后在其管道上进一步传递所需的消息。



最后,结果证明了几个恶魔,每个恶魔仅保持与必要资源的连接。 例如, 搜寻器守护程序包含所有需要访问Internet以获得内容的处理程序,而持久性守护程序则保持与数据库的连接。

现在,不再需要从数据库中进行选择,而是将持久性插入后所需的id仅仅通过总线传递给所有感兴趣的处理程序。

Source: https://habr.com/ru/post/zh-CN483584/


All Articles