RabbitMQ-SQL Server

一两个星期前,我在RabbitMQ用户论坛上看到一条消息 ,内容涉及如何设置从SQL Server向RabbitMQ发送消息。 由于我们与Derivco紧密合作, 因此我在此留下了一些建议,并说我正在撰写有关如何执行此操作的博客。 我的部分信息并不完全正确-至少到那时为止(对不起,兄弟,他很忙)。

太棒了,这是您的SQL Server 。 使用它很容易将信息放入数据库中。 使用查询从数据库检索数据同样容易。 但是获取刚刚更新或粘贴的数据已经有点困难了。 考虑一下实时事件; 进行了购买-发生这种情况时,必须立即通知有人。 也许有人会说这样的数据不应该从数据库中弹出,而应该从其他地方弹出。 当然是这种情况,但是很多时候我们根本别无选择。

我们有一个任务:从数据库外部发送事件以进行进一步处理,问题是-如何做到这一点?

SQL Server与外部通讯


在存在SQL Server的过程中,曾进行过几次尝试来组织数据库外部的通信。 SQL Server通知服务 (NS),出现在SQL Server 2000中,以及更高版本,出现在SQL Server 2005中,出现了SQL Server Service Broker (SSB)。 我与Bob Boshemen和Dan Sullivan一起在我的《 面向开发人员的SQL Server 2005初看书中对它们进行了描述。 正如我所说,NS出现在SQL Server 2000中,并在SQL Server 2005的Beta版中进行了重新设计。但是,NS已完全排除在SQL Server 2005的现成(RTM)版本中。
注意:如果您阅读本书,将会发现RTM版本中没有的许多功能。
SSB幸存下来,Microsoft在其SQL Server 2008 Feature Pack中引入了Service Broker外部激活器 (EA)。 通过SSB,可以在本地数据库之外进行交互。 从理论上讲,这听起来不错,但实际上-它既麻烦又令人困惑。 我们进行了一些测试,很快意识到它没有满足我们的要求。 此外,SSB并没有为我们提供所需的性能,因此我们不得不发明其他东西。

SQLCLR


结果,我们得出的结论是基于SQLCLR技术的。 SQLCLR是内置在SQL Server核心中的.NET平台,可用于在内核内部执行.NET代码。 由于我们执行.NET代码,因此我们能够像常规.NET应用程序一样执行几乎所有操作。
注意:我在上面写了“几乎”,因为实际上有一些限制。 在这种情况下,这些限制几乎不会影响我们要做的事情。
SQLCLR的操作原理如下:将代码编译到dll库中,然后使用SQL Server工具注册该库:

构建装配

CREATE ASSEMBLY [RabbitMQ.SqlServer] AUTHORIZATION rmq FROM 'F:\some_path\RabbitMQSqlClr4.dll' WITH PERMISSION_SET = UNSAFE; GO 

代码段1:沿绝对路径创建程序集

该代码执行以下操作:

  • CREATE ASSEMBLY创建具有给定名称的程序集(无论其名称如何)。
  • AUTHORIZATION -指示程序集的所有者。 在这种情况下,rmq是预定义的SQL Server角色。
  • FROM确定原始装配体的位置。 在FROM ,您还可以以二进制或UNC格式指定路径。 该项目的安装文件使用二进制表示形式。
  • WITH PERMISSION_SET设置权限。 在这种情况下, UNSAFE要求最严格。

注意:无论在AUTHORIZATION子句中使用什么角色或登录名,都必须使用与将程序集加载到域中时相同的名称来创建appdomain类。 建议使用不同的appdomain类名称来分隔程序集,以便在一个程序集失败时,其余程序集不会掉线。 但是,如果程序集相互依赖,则不能将它们划分为不同的类。
创建程序集后,我们在其中封装.NET方法:

 CREATE PROCEDURE rmq.pr_clr_PostRabbitMsg @EndpointID int, @Message nvarchar(max) AS EXTERNAL NAME [RabbitMQ.SqlServer].[RabbitMQSqlClr.RabbitMQSqlServer].[pr_clr_PostRabbitMsg]; GO 

代码段2: .NET方法包装器

该代码执行以下操作:

  • 创建一个名为rmq.pr_clr_PostRabbitMsg的T-SQL存储过程,该存储过程具有两个参数。 @EndpointID@Message
  • 代替该过程的主体,而是使用一个外部源,该外部源包括:
    • 一个名为RabbitMQ.SqlServer的程序集,即我们在上面的代码片段1中创建的集合。
    • 完整类型(命名空间和类): RabbitMQSqlClr.RabbitMQSqlServer
    • 上面的名称空间和类中的方法是: pr_clr_PostRabbitMsg

rmq.pr_clr_PostRabbitMsg pr_clr_PostRabbitMsg将调用pr_clr_PostRabbitMsg方法。
注意:创建过程时,程序集名称不区分大小写,这与类型和方法的全名不同。 创建的过程的名称不必与方法的名称匹配。 但是,参数的最终数据类型必须匹配。
就像我之前说的,我们在Derivco需要将数据发送到SQL Server之外,因此我们使用SQLCLR和RabbitMQ (RMQ)。

Rabbitmq


RMQ是一个开源消息代理,它实现高级消息队列协议(AMQP)并用Erlang编写。

由于RMQ是消息代理,因此需要AMQP客户端库才能连接到它。 该应用程序引用客户端库,并在其帮助下打开连接并发送消息-例如,通过ADO.NET调用SQL Server。 但是与ADO.NET不同的是,每次访问数据库时,该连接很可能会打开,而在整个应用程序期间,该连接仍保持打开状态。

因此,为了能够与RabbitMQ从数据库进行交互,我们需要RabbitMQ的应用程序和.NET客户端库。
注意:在本文的下一部分中,将找到RabbitMQ代码片段,但没有详细说明它们的作用。 如果您不熟悉RabbitMQ,那么建议您阅读各种RabbitMQ教程,以了解代码的用途。 Hello World C#教程是一个好的开始。 教科书和代码示例之间的区别之一是示例中未声明交换器。 它们应该是预定义的。

RabbitMQ.SqlServer


RabbitMQ.SqlServer是使用.NET客户端库用于RabbitMQ的程序集,并提供了将消息从数据库发送到一个或多个RabbitMQ端点(VHost和交换器)的功能。 可以从我在GitHub上的RabbitMQ-SqlServer存储库下载/分叉代码。 它包含程序集源代码和安装文件(即,您不必自己编译它们)。
注意:这只是一个示例,显示SQL Server如何与RabbitMQ进行交互。 这不是成品,甚至不是一部分。 如果此代码使您不知所措-不要怪我,因为这只是一个例子。

功能性


当程序集被加载时,或其显式调用被初始化时,或在包装程序被调用时被间接调用时,该程序集将连接字符串加载到其安装所在的本地数据库中,以及与其连接的RabbitMQ端点:

连接方式

 internal bool InternalConnect() { try { connFactory = new ConnectionFactory(); connFactory.Uri = connString; connFactory.AutomaticRecoveryEnabled = true; connFactory.TopologyRecoveryEnabled = true; RabbitConn = connFactory.CreateConnection(); for (int x = 0; x < channels; x++) { var ch = RabbitConn.CreateModel(); rabbitChannels.Push(ch); } return true; } catch(Exception ex) { return false; } } 

代码段3:连接到端点

同时,到端点的部分连接也会在该连接上创建IModel,并在发送(添加到队列)消息时使用它们:

讯息发送

 internal bool Post(string exchange, byte[] msg, string topic) { IModel value = null; int channelTryCount = 0; try { while ((!rabbitChannels.TryPop(out value)) && channelTryCount < 100) { channelTryCount += 1; Thread.Sleep(50); } if (channelTryCount == 100) { var errMsg = $"Channel pool blocked when trying to post message to Exchange: {exchange}."; throw new ApplicationException(errMsg); } value.BasicPublish(exchange, topic, false, null, msg); rabbitChannels.Push(value); return true; } catch (Exception ex) { if (value != null) { _rabbitChannels.Push(value); } throw; } } 

Post方法是从pr_clr_PostRabbitMsg(int endPointId, string msgToPost)方法pr_clr_PostRabbitMsg(int endPointId, string msgToPost)调用的,该方法是使用代码片段2中的CREATE PROCEDURE子句作为过程呈现的:

邮寄方式

 public static void pr_clr_PostRabbitMsg(int endPointId, string msgToPost) { try { if(endPointId == 0) { throw new ApplicationException("EndpointId cannot be 0"); } if (!isInitialised) { pr_clr_InitialiseRabbitMq(); } var msg = Encoding.UTF8.GetBytes(msgToPost); if (endPointId == -1) { foreach (var rep in remoteEndpoints) { var exch = rep.Value.Exchange; var topic = rep.Value.RoutingKey; foreach (var pub in rabbitPublishers.Values) { pub.Post(exch, msg, topic); } } } else { RabbitPublisher pub; if (rabbitPublishers.TryGetValue(endPointId, out pub)) { pub.Post(remoteEndpoints[endPointId].Exchange, msg, remoteEndpoints[endPointId].RoutingKey); } else { throw new ApplicationException($"EndpointId: {endPointId}, does not exist"); } } } catch { throw; } } 

代码段5:将方法表示为过程

当执行该方法时,假定调用方发送了必须将消息发送到的端点的标识符,实际上是消息本身。 如果将值-1作为端点的标识符传递,则我们遍历所有点,并向每个点发送一条消息。 消息以字符串的形式出现,我们可以使用Encoding.UTF8.GetBytes从中获取字节。 在生产环境中,应将Encoding.UTF8.GetBytes调用替换为序列化。

安装方式


要安装和运行该示例,您需要src\SQL文件夹中的所有文件。 要安装,请按照下列步骤操作:

  • 运行脚本01.create_database_and_role.sql 。 他将创建:
    • RabbitMQTest测试数据库,将在其中创建程序集。
    • rmq角色被分配为程序集所有者
    • 方案,也称为rmq 。 在此图中,创建了各种数据库对象。

  • 运行02.create_database_objects.sql文件。 他将创建:

    • rmq.tb_RabbitSetting表,该表会将连接字符串存储到本地数据库。
    • rmq.tb_RabbitEndpoint表,将在其中存储一个或多个RabbitMQ端点。

  • 03.create_localhost_connstring.sql文件中03.create_localhost_connstring.sql@connString变量的值更改为步骤1中创建的RabbitMQTest数据库的正确连接字符串,然后运行脚本。

在继续之前,您必须具有RabbitMQ代理和VHost的运行实例(默认情况下,VHost表示为/)。 通常,我们有几个用于隔离的VHost。 该主机还需要一个交换器,在示例中,我们使用amq.topic 。 准备好RabbitMQ代理后,编辑rmq.pr_UpsertRabbitEndpoint过程rmq.pr_UpsertRabbitEndpoint ,该rmq.pr_UpsertRabbitEndpoint位于04.upsert_rabbit_endpoint.sql文件中:

端点RabbitMQ

 EXEC rmq.pr_UpsertRabbitEndpoint @Alias = 'rabbitEp1', @ServerName = 'RabbitServer', @Port = 5672, @VHost = 'testHost', @LoginName = 'rabbitAdmin', @LoginPassword = 'some_secret_password', @Exchange = 'amq.topic', @RoutingKey = '#', @ConnectionChannels = 5, @IsEnabled = 1 

代码段6:在RabbitMQ中创建端点

此时,该部署程序集了。 对于SQL Server 2014之前的版本(2005、2008、2008R2、2012)和2014及更高版本,SQL Server的部署选项有所不同。 区别在于受支持的CLR版本。 在SQL Server 2014之前,.NET平台在CLR版本2中运行,在SQL Server 2014及更高版本中,使用版本4。

SQL Server 2005年-2012年


让我们从运行在CLR 2上的SQL Server版本开始,因为它们具有自己的特征。 我们需要部署创建的程序集,同时部署RabbitMQ客户端.NET库( RabbitMQ.Client )。 从我们的程序集中,我们将引用RabbitMQ客户端库。 因为 由于我们计划使用CLR 2,因此我们的程序集和RabbitMQ.Client应该基于.NET 3.5进行编译。 有问题。

RabbitMQ.Client库的所有最新版本都是为CLR 4环境编译的,因此不能在我们的程序集中使用。 .NET 3.4.3上编译了CLR 2客户端库的最新版本。 但是,即使我们尝试部署此程序集,也会收到错误消息:


图1:缺少System.ServiceModel程序集

此版本的RabbitMQ.Client引用的组件不属于SQL Server CLR。 这是WCF程序集,这是我上面提到的SQLCLR的局限性之一:此特定程序集用于不允许在SQL Server中执行的任务类型。 RabbitMQ.Client最新版本不具有这些依赖关系,因此可以使用它们,而CLR 4的烦人要求除外。我该怎么办?

如您所知,RabbitMQ是开源的,但是我们是开发人员,对吗? ;)让我们重新编译! 在RabbitMQ.Client的最新发行版(即版本<3.5.0)之前的版本中RabbitMQ.Client我删除了指向System.ServiceModel的链接并重新编译。 我必须使用System.ServiceModel功能更改几行代码,但这只是次要更改。

在此示例中,我没有使用客户端版本3.4.3,而是使用了稳定的版本3.6.6,并使用.NET 3.5(CLR 2)重新编译。 它几乎可以正常工作:),只是RabbitMQ.Client更高版本使用Task '并且最初不是.NET 3.5的一部分。

幸运的是,.NET 3.5有一个包含TaskSystem.Threading.dll版本。 我下载了它,设置了链接,一切顺利! 这里的主要技巧是应将System.Threading.dll与程序集一起安装。
注意: RabbitMQ.Client 源代码(我从中编译了.NET 3.5的版本)位于GitHub RabbitMQ Client 3.6.6 .NET 3.5上的存储库中。 .NET 3.5的dll二进制文件以及System.Threading.dll也位于存储库(RabbitMQ-SqlServer)lib\NET3.5
要安装必要的程序集( System.ThreadingRabbitMQ.ClientRabbitMQ.SqlServer ),请按以下顺序从src\sql目录运行安装脚本:

  1. 05.51.System.Threading.sql2k5-12.sql -System.Threading
  2. 05.52.RabbitMQ.Client.sql2k5-12.sql -RabbitMQ.Client
  3. 05.53.RabbitMQ.SqlServer.sql2k5-12.sql -RabbitMQ.SqlServer

SQL Server 2014以上版本


在SQL Server 2014及更高版本中,程序集在.NET 4.XX下编译(我的示例在4.5.2上),并且您可以引用RabbitMQ.Client任何最新版本,可以使用NuGet获得该版本。 在我的示例中,我使用的是4.1.1。 RabbitMQ.Client ,它也位于存储库(RabbitMQ-SqlServer)lib\NET4

要安装,请按以下顺序从src\sql目录运行脚本:

  1. 05.141.RabbitMQ.Client.sql2k14+.sql -RabbitMQ.Client
  2. 05.142.RabbitMQ.SqlServer.sql2k14+.sql -RabbitMQ.SqlServer

SQL方法包装


要创建将在我们的程序06.create_sqlclr_procedures.sql使用的过程(3.5或4),请运行脚本06.create_sqlclr_procedures.sql 。 他将为三种.NET方法创建T-SQL过程:

  • rmq.pr_clr_InitialiseRabbitMq调用pr_clr_InitialiseRabbitMq 。 用于加载和初始化RabbitMQ.SqlServer程序集。
  • rmq.pr_clr_ReloadRabbitEndpoints调用pr_clr_ReloadRabbitEndpoints 。 加载各种RabbitMQ端点。
  • rmq.pr_clr_PostRabbitMsg调用pr_clr_PostRabbitMsg 。 用于向RabbitMQ发送消息。

该脚本还创建了一个简单的T-SQL过程rmq.pr_PostRabbitMsg ,该过程适用于rmq.pr_clr_PostRabbitMsg 。 这是一个包装程序,它知道如何处理数据,处理异常等。 在生产环境中,我们有几种类似的过程可处理各种类型的消息。 在下面阅读有关此内容的更多信息。

使用方法


从以上所有内容可以明显看出,要将消息发送到RabbitMQ,我们调用rmq.pr_PostRabbitMsgrmq.pr_clr_PostRabbitMsg ,并以rmq.pr_clr_PostRabbitMsg传递参数端点的标识符和消息本身。 当然,所有这些都很酷,但是我想看看它如何在现实中发挥作用。

在生产环境中,我们要做的是在存储过程中处理应该发送到RabbitMQ的数据,我们收集要发送的数据,并在连接块中调用rmq.pr_PostRabbitMsg类的过程。 以下是此过程的非常简化的示例:

数据处理程序

 ALTER PROCEDURE dbo.pr_SomeProcessingStuff @id int AS BEGIN SET NOCOUNT ON; BEGIN TRY --     DECLARE @endPointId int; --    DECLARE @msg nvarchar(max) = '{' --        SET @msg = @msg + '"Id":' + CAST(@id AS varchar(10)) + ',' --  -  SET @msg = @msg + '"FName":"Hello",'; SET @msg = @msg + '"LName":"World"'; SET @msg = @msg + '}'; -- -  --     -,  -  SELECT @endPointId = 1; --    --     EXEC rmq.pr_PostRabbitMsg @Message = @msg, @EndpointID = @endPointId; END TRY BEGIN CATCH DECLARE @errMsg nvarchar(max); DECLARE @errLine int; SELECT @errMsg = ERROR_MESSAGE(), @errLine = ERROR_LINE(); RAISERROR('Error: %s at line: %d', 16, -1, @errMsg, @errLine); END CATCH END 

代码片段7中,我们看到了如何在过程中捕获和处理必要的数据并在处理后发送它们。 要使用此过程,请从src\SQL目录运行07.create_processing_procedure.sql脚本。

让我们全部运行


此时,您应该准备发送一些消息。 在测试之前,请确保rmq.tb_RabbitEndpoint的端点交换器上的RabbitMQ中有队列。

因此,要开始,您需要执行以下操作:
打开99.test_send_message.sql文件。


 EXEC rmq.pr_clr_InitialiseRabbitMq; 

初始化程序集并加载RabbitMQ端点。 这不是必需的,但是建议您在创建或修改装配后预加载装配。



 EXEC dbo.pr_SomeProcessingStuff @id = 101 

(您可以使用任何其他喜欢的标识符)。

如果一切正常而没有错误,那么RabbitMQ队列中将出现一条消息! 因此,您使用了SQLCLR将消息发送到RabbitMQ。

恭喜你!

Source: https://habr.com/ru/post/zh-CN419457/


All Articles