Vor ein oder zwei Wochen sah ich im
RabbitMQ-Benutzerforum eine
Nachricht darüber, wie das Senden von Nachrichten von SQL Server an RabbitMQ eingerichtet wird. Da wir eng mit
Derivco zusammenarbeiten , habe ich dort einige Vorschläge hinterlassen und auch gesagt, dass ich einen Blog darüber schreibe, wie das geht. Ein Teil meiner Nachricht war nicht ganz wahr - zumindest bis zu diesem Moment (sorry, Bro, er war sehr beschäftigt).
Tolle Sache, das ist dein
SQL Server . Die Verwendung ist sehr einfach, um Informationen in eine Datenbank zu stellen. Das Abrufen von Daten aus einer Datenbank mithilfe einer Abfrage ist ebenso einfach. Das Abrufen der gerade aktualisierten oder eingefügten Daten ist jedoch bereits etwas schwieriger. Denken Sie an Echtzeitereignisse. Ein Kauf wird getätigt - jemand muss sofort darüber informiert werden, sobald dies geschehen ist. Vielleicht wird jemand sagen, dass solche Daten nicht aus der Datenbank, sondern von einem anderen Ort entfernt werden sollten. Das ist natürlich der Fall, aber oft haben wir einfach keine Wahl.
Wir hatten eine Aufgabe: Ereignisse aus der Datenbank zur weiteren Verarbeitung nach draußen senden, und die Frage war: Wie geht das?
SQL Server und externe Kommunikation
Während der Existenz von SQL Server gab es mehrere Versuche, die Kommunikation außerhalb der Datenbank zu organisieren.
SQL Server Notification Services (NS), die in SQL Server 2000 und später in SQL Server 2005
angezeigt wurden , wurden in
SQL Server Service Broker (SSB) angezeigt. Ich habe sie in meinem Buch
Ein erster Blick auf SQL Server 2005 für Entwickler zusammen mit Bob Boshemen und Dan Sullivan beschrieben. NS erschien wie gesagt in SQL Server 2000 und wurde in der Beta-Version von SQL Server 2005 neu gestaltet. NS wurde jedoch
vollständig von der verkaufsfertigen Version (RTM) von SQL Server 2005 ausgeschlossen.
Hinweis: Wenn Sie das Buch lesen, finden Sie dort eine Reihe von Funktionen, die nicht in der RTM-Version enthalten waren.
SSB überlebte und Microsoft führte
Service Broker External Activator (EA) in sein SQL Server 2008 Feature Pack ein. Es ermöglicht über den SSB die Interaktion außerhalb der lokalen Datenbank. Theoretisch klingt es gut, aber in der Praxis ist es umständlich und verwirrend. Wir haben einige Tests durchgeführt und schnell festgestellt, dass es nicht das tut, was wir brauchen. Außerdem hat uns SSB nicht die Leistung geliefert, die wir brauchten, also mussten wir etwas anderes erfinden.
SQLCLR
Das Ergebnis war die SQLCLR-Technologie. SQLCLR ist eine .NET-Plattform, die in den SQL Server-Kern integriert ist und zum Ausführen von .NET-Code im Kernel verwendet werden kann. Da wir .NET-Code ausführen, können wir fast alles wie in einer normalen .NET-Anwendung ausführen.
Hinweis: Ich habe oben „fast“ geschrieben, da es tatsächlich einige Einschränkungen gibt. In diesem Zusammenhang haben diese Einschränkungen fast keine Auswirkungen auf das, was wir tun werden.
Das Funktionsprinzip von SQLCLR lautet wie folgt: Der Code wird in eine DLL-Bibliothek kompiliert und anschließend mit den SQL Server-Tools registriert:
Baugruppe erstellen
CREATE ASSEMBLY [RabbitMQ.SqlServer] AUTHORIZATION rmq FROM 'F:\some_path\RabbitMQSqlClr4.dll' WITH PERMISSION_SET = UNSAFE; GO
Code-Snippet 1: Erstellen einer Assembly entlang eines absoluten Pfads
Der Code führt die folgenden Aktionen aus:
CREATE ASSEMBLY
- Erstellt eine Assembly mit dem angegebenen Namen (unabhängig davon, wie sie aussehen soll).AUTHORIZATION
- Zeigt den Eigentümer der Baugruppe an. In diesem Fall ist rmq eine vordefinierte SQL Server-Rolle.FROM
- Bestimmt, wo sich die ursprüngliche Baugruppe befindet. In der FROM
können Sie den Pfad auch in Binär- oder UNC-Formaten angeben. Die Installationsdateien für dieses Projekt verwenden eine binäre Darstellung.WITH PERMISSION_SET
- Legt Berechtigungen fest. UNSAFE
ist weniger streng und in diesem Fall notwendig.
Hinweis: Unabhängig von der in der AUTHORIZATION
Klausel verwendeten Rolle oder Anmeldung muss die Appdomain-Klasse mit demselben Namen erstellt werden wie beim Laden der Assembly in die Domäne. Es wird empfohlen, Assemblys mit unterschiedlichen Namen von Appdomain-Klassen zu trennen, damit der Rest nicht herunterfällt, wenn eine Assembly fehlschlägt. Wenn Assemblys jedoch voneinander abhängig sind, können sie nicht in verschiedene Klassen unterteilt werden.
Wenn die Assembly erstellt wird, erstellen wir darin Wrapper für .NET-Methoden:
CREATE PROCEDURE rmq.pr_clr_PostRabbitMsg @EndpointID int, @Message nvarchar(max) AS EXTERNAL NAME [RabbitMQ.SqlServer].[RabbitMQSqlClr.RabbitMQSqlServer].[pr_clr_PostRabbitMsg]; GO
Code-Snippet 2: .NET Method Wrapper
Der Code führt die folgenden Aktionen aus:
- Erstellt eine gespeicherte T-SQL-Prozedur mit dem Namen
rmq.pr_clr_PostRabbitMsg
, die zwei Parameter rmq.pr_clr_PostRabbitMsg
. @EndpointID
und @Message
. - Anstelle des Hauptteils der Prozedur wird eine externe Quelle verwendet, die besteht aus:
- Zusammenbauen namens
RabbitMQ.SqlServer
, t. E. Das Gerät , dass wir oben im Code - Schnipsel 1 erstellt haben . - Vollständiger Typ (Namespace und Klasse):
RabbitMQSqlClr.RabbitMQSqlServer
- Die Methode aus dem obigen Namespace und der Klasse lautet:
pr_clr_PostRabbitMsg
.
Wenn
rmq.pr_clr_PostRabbitMsg
, wird die Methode
pr_clr_PostRabbitMsg
aufgerufen.
Hinweis: Beim Erstellen einer Prozedur wird beim Namen der Assembly im Gegensatz zum vollständigen Namen des Typs und der Methode nicht zwischen Groß- und Kleinschreibung unterschieden. Es ist nicht erforderlich, dass der Name der zu erstellenden Prozedur mit dem Namen der Methode übereinstimmt. Die endgültigen Datentypen für die Parameter müssen jedoch übereinstimmen.
Wie ich bereits sagte, haben wir Derivco ein Bedarf Daten außerhalb von SQL Server zu senden, damit wir die SQLCLR und verwenden
RabbitMQ (RMQ).
Rabbitmq
RMQ ist ein Open Source Message Broker, der das Advanced Message Queuing Protocol (AMQP) implementiert und in Erlang geschrieben ist.
Da RMQ ein Nachrichtenbroker ist, müssen AMQP-Clientbibliotheken eine Verbindung herstellen. Die Anwendung verweist auf Clientbibliotheken und öffnet mit ihrer Hilfe eine Verbindung und sendet Nachrichten - beispielsweise, wenn über ADO.NET ein Aufruf an SQL Server erfolgt. Im Gegensatz zu ADO.NET, bei dem die Verbindung höchstwahrscheinlich bei jedem Zugriff auf die Datenbank geöffnet wird, bleibt die Verbindung hier für den gesamten Zeitraum der Anwendung geöffnet.
Um mit RabbitMQ aus der Datenbank interagieren zu können, benötigen wir die Anwendung und die .NET-Clientbibliothek für RabbitMQ.
Hinweis: Im folgenden Teil dieses Artikels wird RabbitMQ Codefragmente auftreten, aber ohne detaillierte Erklärung , was sie tun. Wenn Sie mit der Arbeit mit RabbitMQ noch nicht vertraut sind, empfehlen wir Ihnen, sich die verschiedenen RabbitMQ-Tutorials anzusehen, um den Zweck des Codes zu verstehen. Das Hello World C # -Tutorial ist ein guter Anfang. Einer der Unterschiede zwischen Lehrbüchern und Codebeispielen besteht darin, dass in den Beispielen keine Austauscher deklariert sind. Sie sollen vordefiniert sein.
RabbitMQ.SqlServer
RabbitMQ.SqlServer ist eine Assembly, die die .NET-Clientbibliothek für RabbitMQ verwendet und die Möglichkeit bietet, Nachrichten aus der Datenbank an einen oder mehrere RabbitMQ-Endpunkte (VHosts und Austauscher) zu senden. Der Code kann von meinem Repository
RabbitMQ-SqlServer auf GitHub heruntergeladen / gegabelt werden. Es enthält Assembly-Quellen und Installationsdateien (d. H. Sie müssen diese nicht selbst kompilieren).
Hinweis: Dies ist nur ein Beispiel, um zu zeigen, wie SQL Server mit RabbitMQ interagieren kann. Dies ist KEIN fertiges Produkt oder sogar ein Teil davon. Wenn dieser Code Ihr Gehirn bricht - beschuldigen Sie mich nicht, denn dies ist nur ein Beispiel.
Funktionalität
Wenn die Assembly geladen wird oder wenn ihre Initialisierung explizit oder indirekt aufgerufen wird, lädt die Assembly zum Zeitpunkt des Aufrufs der Wrapper-Prozedur die Verbindungszeichenfolge in die lokale Datenbank, in der sie installiert wurde, sowie die RabbitMQ-Endpunkte, mit denen sie verbunden ist:
Verbindung
internal bool InternalConnect() { try { connFactory = new ConnectionFactory(); connFactory.Uri = connString; connFactory.AutomaticRecoveryEnabled = true; connFactory.TopologyRecoveryEnabled = true; RabbitConn = connFactory.CreateConnection(); for (int x = 0; x < channels; x++) { var ch = RabbitConn.CreateModel(); rabbitChannels.Push(ch); } return true; } catch(Exception ex) { return false; } }
Code-Snippet 3: Stellen Sie eine Verbindung zum Endpunkt her
Gleichzeitig erstellt ein Teil der Verbindung zum Endpunkt auch IModels für die Verbindung, die beim Senden (Hinzufügen zur Warteschlange) von Nachrichten verwendet werden:
Nachrichten senden
internal bool Post(string exchange, byte[] msg, string topic) { IModel value = null; int channelTryCount = 0; try { while ((!rabbitChannels.TryPop(out value)) && channelTryCount < 100) { channelTryCount += 1; Thread.Sleep(50); } if (channelTryCount == 100) { var errMsg = $"Channel pool blocked when trying to post message to Exchange: {exchange}."; throw new ApplicationException(errMsg); } value.BasicPublish(exchange, topic, false, null, msg); rabbitChannels.Push(value); return true; } catch (Exception ex) { if (value != null) { _rabbitChannels.Push(value); } throw; } }
Die
Post
Methode wird von der
pr_clr_PostRabbitMsg(int endPointId, string msgToPost)
Methode
pr_clr_PostRabbitMsg(int endPointId, string msgToPost)
, die als Prozedur unter Verwendung der
CREATE PROCEDURE
Klausel in Codefragment 2 dargestellt wurde:
Methode nach dem Aufruf
public static void pr_clr_PostRabbitMsg(int endPointId, string msgToPost) { try { if(endPointId == 0) { throw new ApplicationException("EndpointId cannot be 0"); } if (!isInitialised) { pr_clr_InitialiseRabbitMq(); } var msg = Encoding.UTF8.GetBytes(msgToPost); if (endPointId == -1) { foreach (var rep in remoteEndpoints) { var exch = rep.Value.Exchange; var topic = rep.Value.RoutingKey; foreach (var pub in rabbitPublishers.Values) { pub.Post(exch, msg, topic); } } } else { RabbitPublisher pub; if (rabbitPublishers.TryGetValue(endPointId, out pub)) { pub.Post(remoteEndpoints[endPointId].Exchange, msg, remoteEndpoints[endPointId].RoutingKey); } else { throw new ApplicationException($"EndpointId: {endPointId}, does not exist"); } } } catch { throw; } }
Code-Snippet 5: Darstellung einer Methode als Prozedur
Wenn die Methode ausgeführt wird, wird angenommen, dass der Aufrufer die Kennung des Endpunkts sendet, an den die Nachricht gesendet werden muss, und tatsächlich die Nachricht selbst. Wenn der Wert -1 als Kennung des Endpunkts übergeben wird, durchlaufen wir alle Punkte und senden jedem eine Nachricht. Die Nachricht kommt in Form einer Zeichenfolge, aus der wir mithilfe von
Encoding.UTF8.GetBytes
Bytes
Encoding.UTF8.GetBytes
. In einer Produktionsumgebung sollte der Aufruf
Encoding.UTF8.GetBytes
durch Serialisierung ersetzt werden.
Installation
Zum Installieren und Ausführen des Beispiels benötigen Sie alle Dateien im Ordner
src\SQL
. Gehen Sie zur Installation folgendermaßen vor:
- Führen Sie das Skript
01.create_database_and_role.sql
. Er wird schaffen:
RabbitMQTest
Testdatenbank, in der die Assembly erstellt wird.rmq
Rolle, die als Assembly-Eigentümer zugewiesen werden soll- Schema, das auch als
rmq
. In diesem Diagramm werden verschiedene Datenbankobjekte erstellt.
- Führen Sie die Datei
02.create_database_objects.sql
. Er wird schaffen:
- die Tabelle
rmq.tb_RabbitSetting
, in der die Verbindungszeichenfolge zur lokalen Datenbank gespeichert wird. - Die Tabelle
rmq.tb_RabbitEndpoint
, in der ein oder mehrere RabbitMQ
Endpunkte gespeichert werden.
- Die Datei
03.create_localhost_connstring.sql
den Wert der Variablen ändern @connString
die richtige Verbindungszeichenfolge der Datenbank RabbitMQTest
, erstellt in Schritt 1 und das Skript ausführen.
Bevor Sie fortfahren können, müssen Sie über eine laufende Instanz des RabbitMQ-Brokers und von VHost verfügen (standardmäßig wird VHost als / dargestellt). In der Regel haben wir mehrere VHost, nur zur Isolation. Dieser Host benötigt auch einen Austauscher. In dem Beispiel verwenden wir
amq.topic
. Wenn Sie den RabbitMQ-Broker bereit haben, bearbeiten Sie die Prozedurparameter
rmq.pr_UpsertRabbitEndpoint
, die sich in der Datei
04.upsert_rabbit_endpoint.sql
:
Endpoint RabbitMQ
EXEC rmq.pr_UpsertRabbitEndpoint @Alias = 'rabbitEp1', @ServerName = 'RabbitServer', @Port = 5672, @VHost = 'testHost', @LoginName = 'rabbitAdmin', @LoginPassword = 'some_secret_password', @Exchange = 'amq.topic', @RoutingKey = '#', @ConnectionChannels = 5, @IsEnabled = 1
Code-Snippet 6: Erstellen eines Endpunkts in RabbitMQ
Zu diesem Zeitpunkt ist es Zeit, Assemblys bereitzustellen. Es gibt Unterschiede bei den Bereitstellungsoptionen für Versionen von SQL Server vor SQL Server 2014 (2005, 2008, 2008R2, 2012) und für 2014 und höher. Der Unterschied liegt in der unterstützten Version der CLR. Vor SQL Server 2014 wurde die .NET-Plattform in der CLR-Version 2 ausgeführt, und in SQL Server 2014 und höher wurde Version 4 verwendet.
SQL Server 2005 - 2012
Beginnen wir mit Versionen von SQL Server, die auf CLR 2 ausgeführt werden, da sie ihre eigenen Merkmale haben. Wir müssen die erstellte Assembly bereitstellen und gleichzeitig die .NET-Bibliothek des RabbitMQ-Clients (
RabbitMQ.Client
) bereitstellen. In unserer Assembly beziehen wir uns auf die RabbitMQ-Clientbibliothek. Weil Da wir CLR 2 verwenden wollten, sollten unsere Assembly und
RabbitMQ.Client
auf Basis von .NET 3.5 kompiliert werden. Es gibt Probleme.
Alle neuesten Versionen der
RabbitMQ.Client
Bibliothek wurden für die CLR 4-Umgebung kompiliert, sodass sie in unserer Assembly nicht verwendet werden können. Die neueste Version der Clientbibliotheken für CLR 2 ist in .NET 3.4.3 kompiliert. Aber selbst wenn wir versuchen, diese Assembly bereitzustellen, wird eine Fehlermeldung angezeigt:
Abbildung 1: Fehlende System.ServiceModel-AssemblyDiese Version von
RabbitMQ.Client
bezieht sich auf eine Assembly, die nicht Teil der SQL Server-CLR ist. Dies ist eine WCF-Assembly, und dies ist eine der oben erwähnten Einschränkungen in SQLCLR: Diese bestimmte Assembly ist für Aufgabentypen vorgesehen, die in SQL Server nicht ausgeführt werden dürfen. Neuere Versionen von
RabbitMQ.Client
haben diese Abhängigkeiten nicht, sodass sie problemlos verwendet werden können, mit Ausnahme der lästigen Anforderungen der CLR 4. Was soll ich tun?
Wie Sie wissen, ist RabbitMQ Open Source, aber wir sind Entwickler, oder? ;) Also lasst uns neu kompilieren! In der Version vor den neuesten Versionen (d. H. Version <3.5.0) von
RabbitMQ.Client
ich die Links zu
System.ServiceModel
gelöscht und neu kompiliert. Ich musste einige Codezeilen mithilfe der
System.ServiceModel
Funktionalität
System.ServiceModel
, dies waren jedoch geringfügige Änderungen.
In diesem Beispiel habe ich keine Client-Version 3.4.3 verwendet, aber ich habe die
stabile Version 3.6.6 verwendet und mit .NET 3.5 (CLR 2) neu kompiliert. Es funktionierte fast :), mit der Ausnahme , dass die späteren Versionen
RabbitMQ.Client
verwenden Sie
Task
‚und dass zunächst nicht Teil von .NET 3.5.
Glücklicherweise gibt es eine Version von
System.Threading.dll
für .NET 3.5, die
Task
. Ich habe es heruntergeladen, die Links eingerichtet und alles ging! Hier ist der Haupttrick, dass
System.Threading.dll
mit der Assembly installiert werden sollte.
Hinweis: Die Quelle von RabbitMQ.Client
, aus der ich eine Version von .NET 3.5 kompiliert habe, befindet sich in meinem Repository auf GitHub RabbitMQ Client 3.6.6 .NET 3.5 . Die DLL-Binärdatei befindet sich zusammen mit System.Threading.dll
für .NET 3.5 auch im lib\NET3.5
Repositorys (RabbitMQ-SqlServer) .
System.Threading
zum Installieren der erforderlichen Assemblys (
System.Threading
,
RabbitMQ.Client
und
RabbitMQ.SqlServer
) die Installationsskripts aus dem Verzeichnis
src\sql
in der folgenden Reihenfolge aus:
05.51.System.Threading.sql2k5-12.sql
- System.Threading05.52.RabbitMQ.Client.sql2k5-12.sql
- RabbitMQ.Client05.53.RabbitMQ.SqlServer.sql2k5-12.sql
- RabbitMQ.SqlServer
SQL Server 2014+
In SQL Server 2014 und höher wird die Assembly unter .NET 4.XX kompiliert (mein Beispiel ist 4.5.2), und Sie können auf jede der neuesten Versionen von
RabbitMQ.Client
verweisen, die mit
NuGet abgerufen werden können. In meinem Beispiel verwende ich 4.1.1.
RabbitMQ.Client
, der sich ebenfalls im
lib\NET4
Repositorys befindet (RabbitMQ-SqlServer) .
Führen Sie zur Installation die Skripte aus dem Verzeichnis
src\sql
in der folgenden Reihenfolge aus:
05.141.RabbitMQ.Client.sql2k14+.sql
- RabbitMQ.Client05.142.RabbitMQ.SqlServer.sql2k14+.sql
- RabbitMQ.SqlServer
SQL Method Wrapper
Führen Sie das Skript
06.create_sqlclr_procedures.sql
aus, um Prozeduren zu erstellen, die in unserer Assembly (3.5 oder 4)
06.create_sqlclr_procedures.sql
. Er wird T-SQL-Prozeduren für drei .NET-Methoden erstellen:
rmq.pr_clr_InitialiseRabbitMq
ruft pr_clr_InitialiseRabbitMq
. Wird zum Laden und Initialisieren der RabbitMQ.SqlServer-Assembly verwendet.rmq.pr_clr_ReloadRabbitEndpoints
verursacht pr_clr_ReloadRabbitEndpoints
. Lädt verschiedene RabbitMQ-Endpunkte.rmq.pr_clr_PostRabbitMsg
ruft pr_clr_PostRabbitMsg
. Wird zum Senden von Nachrichten an RabbitMQ verwendet.
Das Skript erstellt auch eine einfache T-SQL-Prozedur -
rmq.pr_PostRabbitMsg
, die für
rmq.pr_clr_PostRabbitMsg
. Dies ist eine Wrapper-Prozedur, die weiß, was mit Daten zu tun ist, Ausnahmen behandelt usw. In einer Produktionsumgebung gibt es mehrere ähnliche Verfahren, mit denen verschiedene Arten von Nachrichten verarbeitet werden. Lesen Sie weiter unten mehr darüber.
Verwenden Sie
Aus alledem geht hervor, dass wir zum Senden von Nachrichten an RabbitMQ
rmq.pr_PostRabbitMsg
oder
rmq.pr_clr_PostRabbitMsg
und in den Parametern die Kennung des Endpunkts und die Nachricht selbst als Zeichenfolge übergeben. Das alles ist natürlich cool, aber ich würde gerne sehen, wie es in der Realität funktionieren wird.
In Produktionsumgebungen erfassen wir in gespeicherten Prozeduren, die die an RabbitMQ zu sendenden Daten verarbeiten, die zu sendenden Daten und rufen im Verbindungsblock eine Prozedur wie
rmq.pr_PostRabbitMsg
. Das Folgende ist ein sehr vereinfachtes Beispiel für ein solches Verfahren:
Datenverarbeitungsverfahren
ALTER PROCEDURE dbo.pr_SomeProcessingStuff @id int AS BEGIN SET NOCOUNT ON; BEGIN TRY
Im
Codefragment 7 sehen wir, wie die erforderlichen Daten in der Prozedur erfasst und verarbeitet und nach der Verarbeitung gesendet werden. Führen Sie zur Verwendung dieser Prozedur das Skript
07.create_processing_procedure.sql
aus dem Verzeichnis
src\SQL
.
Lassen Sie uns alles laufen
An diesem Punkt sollten Sie bereit sein, einige Nachrichten zu senden.
rmq.tb_RabbitEndpoint
Sie vor dem Testen sicher, dass Warteschlangen in RabbitMQ an den Endpunktaustauscher in
rmq.tb_RabbitEndpoint
.
Um zu beginnen, müssen Sie Folgendes tun:
Öffnen Sie die Datei
99.test_send_message.sql
.
Ausführen
EXEC rmq.pr_clr_InitialiseRabbitMq;
um die Assembly zu initialisieren und die RabbitMQ-Endpunkte zu laden. Dies ist kein erforderlicher Schritt. Es wird jedoch empfohlen, die Assembly nach dem Erstellen oder Ändern vorab zu laden.
Ausführen
EXEC dbo.pr_SomeProcessingStuff @id = 101
(Sie können eine beliebige andere Kennung verwenden).
Wenn alles fehlerfrei funktioniert hat, sollte eine Meldung in der RabbitMQ-Warteschlange angezeigt werden! Sie haben also SQLCLR verwendet, um eine Nachricht an RabbitMQ zu senden.
Glückwunsch!