Streaming von Daten von einem REST-Service in eine MQ-Warteschlange

Hallo Habr!

In diesem Artikel werde ich eine Möglichkeit beschreiben, einen REST-Dienst zu entwickeln, mit dem Sie Dateien empfangen und im Streaming-Modus auf dem Nachrichtensystem speichern können, ohne die gesamte Datei auf der Seite des Dienstes speichern zu müssen. Es wird auch das umgekehrte Szenario beschrieben, in dem der Client als Antwort eine Datei erhält, die sich im Nachrichtensystem befindet.

Aus Gründen der Übersichtlichkeit werde ich Beispiele für den auf JEE7 entwickelten Servicecode für den IBM WebSphere Liberty Server-Anwendungsserver geben, und IBM MQ wird als Messagingsystem fungieren.
Das beschriebene Verfahren ist jedoch für andere ähnliche Plattformen geeignet, d.h. Jeder JMS-API-Anbieter kann als Messagingsystem fungieren, und jeder JEE-Server (z. B. Apache Tomcat) kann als Anwendungsserver fungieren.

Erklärung des Problems


Es musste eine Lösung implementiert werden, die es sowohl ermöglicht, große Dateien (> 100 MB) vom Client zu empfangen und auf ein anderes geografisch entferntes System zu übertragen, als auch in die entgegengesetzte Richtung - Dateien von diesem System als Antwort auf den Client zu übertragen. Angesichts des unzuverlässigen Netzwerkkanals zwischen dem Client-Netzwerk und dem Anwendungsnetzwerk wird ein Nachrichtensystem verwendet, das eine garantierte Zustellung zwischen ihnen gewährleistet.

Die Top-Level-Lösung umfasst drei Komponenten:

  1. REST-Service - dessen Aufgabe es ist, dem Client die Möglichkeit zu geben, die Datei (oder Anfrage) zu übertragen.
  2. MQ - ist für die Übertragung von Nachrichten zwischen verschiedenen Netzwerken verantwortlich.
  3. Anwendung - Eine Anwendung, die für das Speichern und Ausgeben von Dateien auf Anfrage verantwortlich ist.

Bild

In diesem Artikel beschreibe ich eine Methode zum Implementieren eines REST-Service, zu deren Aufgaben gehören:

  • Empfangen einer Datei von einem Client.
  • Übertragen Sie die empfangene Datei an MQ.
  • Übertragen einer Datei von MQ auf den Client als Antwort.

Lösungsmethode


Aufgrund der Größe der übertragenen Datei ist es nicht möglich, sie vollständig im RAM abzulegen. Darüber hinaus gibt es auch eine Einschränkung auf der MQ-Seite - die maximale Größe einer Nachricht in MQ darf 100 MB nicht überschreiten. Daher wird meine Entscheidung auf folgenden Grundsätzen beruhen:

  • Das Empfangen und Speichern einer Datei in der MQ-Warteschlange sollte im Streaming-Modus erfolgen, ohne dass die gesamte Datei im Speicher abgelegt wird.
  • In der Warteschlange wird die MQ-Datei als Satz kleiner Nachrichten abgelegt.

Die Dateizuordnung auf der Clientseite, dem REST-Service und dem MQ wird im Folgenden grafisch dargestellt:

Bild

Auf der Clientseite befindet sich die Datei vollständig im Dateisystem, im REST-Service wird nur ein Teil der Datei im RAM gespeichert, und auf der MQ-Seite wird jeder Teil der Datei als separate Nachricht abgelegt.

Entwicklung eines REST-Service


Zur Verdeutlichung der vorgeschlagenen Lösungsmethode wird ein Demo-REST-Service entwickelt, der zwei Methoden enthält:

  • Upload - Empfängt eine Datei vom Client und schreibt sie in die MQ-Warteschlange. Gibt die Nachrichtengruppen-ID (im Base64-Format) als Antwort zurück.
  • download - empfängt die Nachrichtengruppen-ID (im Base64-Format) vom Client und gibt die in der MQ-Warteschlange gespeicherte Datei zurück.

Methode zum Empfangen einer Datei von einem Client (Upload)


Die Aufgabe der Methode besteht darin, den Stream der eingehenden Datei abzurufen und ihn dann in die MQ-Warteschlange zu schreiben.

Empfangen des Streams der eingehenden Datei


Um eine Eingabedatei vom Client zu empfangen, erwartet die Methode ein Objekt mit der Schnittstelle com.ibm.websphere.jaxrs20.multipart.IMultipartBody als Eingabeparameter, mit dem eine Verknüpfung zum Stream der eingehenden Datei hergestellt werden kann

@PUT @Path("upload") public Response upload(IMultipartBody body) { ... IAttachment attachment = body.getAttachment("file"); InputStream inputStream = attachment.getDataHandler().getInputStream(); ... } 

Diese Schnittstelle (IMultipartBody) befindet sich im JAR-Archiv com.ibm.websphere.appserver.api.jaxrs20_1.0.21.jar, ist im IBM Liberty Server enthalten und befindet sich im Ordner: < WLP_INSTALLATION_PATH > / dev / api / ibm.

Hinweis:

  • WLP_INSTALLATION_PATH - Pfad zum WebSphere Liberty Profile-Verzeichnis.
  • Es wird erwartet, dass der Client die Datei im Parameter "Datei" überträgt.
  • Wenn Sie einen anderen Anwendungsserver verwenden, können Sie die alternative Bibliothek von Apache CXF verwenden.

Stream Speichern einer Datei in MQ


Die Methode empfängt den Eingabedateistream, den Namen der MQ-Warteschlange, in die die Datei geschrieben werden soll, und die Kennung der Nachrichtengruppe, die zum Binden von Nachrichten verwendet wird. Die Gruppenkennung wird auf der Serviceseite beispielsweise mit dem Dienstprogramm org.apache.commons.lang3.RandomStringUtils generiert:

 String groupId = RandomStringUtils.randomAscii(24); 

Der Algorithmus zum Speichern der Eingabedatei in MQ besteht aus den folgenden Schritten:

  1. Initialisierung von MQ-Verbindungsobjekten.
  2. Zyklisches Lesen eines Teils der eingehenden Datei, bis die Datei vollständig gelesen ist:
    1. Ein Teil der Dateidaten wird als separate Nachricht in MQ aufgezeichnet.
    2. Jede Nachricht in der Datei hat eine eigene Seriennummer (Eigenschaft "JMSXGroupSeq").
    3. Alle Nachrichten in der Datei haben denselben Gruppenwert (Eigenschaft "JMSXGroupID").
    4. Die letzte Nachricht hat ein Zeichen, das angibt, dass diese Nachricht endgültig ist (Eigenschaft "JMS_IBM_Last_Msg_In_Group").
    5. Die Konstante SEGMENT_SIZE enthält die Portionsgröße. Zum Beispiel 1 MB.

 public void write(InputStream inputStream, String queueName, String groupId) throws IOException, JMSException { try ( Connection connection = connectionFactory.createConnection(); Session session = connection.createSession(); MessageProducer producer = session.createProducer(session.createQueue(queueName)); ) { byte[] buffer = new byte[SEGMENT_SIZE]; BytesMessage message = null; for(int readBytesSize = 1, sequenceNumber = 1; readBytesSize > 0; sequenceNumber++) { readBytesSize = inputStream.read(buffer); if (message != null) { if (readBytesSize < 1) { message.setBooleanProperty("JMS_IBM_Last_Msg_In_Group", true); } producer.send(message); } if (readBytesSize > 0) { message = session.createBytesMessage(); message.setStringProperty("JMSXGroupID", groupId); message.setIntProperty("JMSXGroupSeq", sequenceNumber); if (readBytesSize == SEGMENT_SIZE) { message.writeBytes(buffer); } else { message.writeBytes(Arrays.copyOf(buffer, readBytesSize)); } } } } } 

Methode zum Senden einer Datei an den Client (Download)


Die Methode erhält die Kennung einer Gruppe von Nachrichten im Base64-Format, mit der sie Nachrichten aus der MQ-Warteschlange liest und im Streaming-Modus als Antwort sendet.

Abrufen der Nachrichtengruppen-ID


Die Methode empfängt die Nachrichtengruppen-ID als Eingabeparameter.

 @PUT @Path("download") public Response download(@QueryParam("groupId") String groupId) { ... } 

Streaming einer Antwort an einen Client


Erstellen Sie eine Klasse mit der Schnittstelle javax.ws.rs.core.StreamingOutput, um eine Datei auf den Client zu übertragen, die als separate Nachrichten in MQ im Streaming-Modus gespeichert ist:

 public class MQStreamingOutput implements StreamingOutput { private String groupId; private String queueName; public MQStreamingOutput(String groupId, String queueName) { super(); this.groupId = groupId; this.queueName = queueName; } @Override public void write(OutputStream outputStream) throws IOException, WebApplicationException { try { new MQWorker().read(outputStream, queueName, groupId); } catch(NamingException | JMSException e) { e.printStackTrace(); new IOException(e); } finally { outputStream.flush(); outputStream.close(); } } } 

In der Klasse implementieren wir die Schreibmethode, die eine Eingabereferenz auf den ausgehenden Stream empfängt, in den Nachrichten von MQ geschrieben werden. Ich habe auch den Warteschlangennamen und die Kennung der Gruppe hinzugefügt, deren Nachrichten der Klasse vorgelesen werden.

Ein Objekt dieser Klasse wird als Parameter übergeben, um eine Antwort an den Client zu erstellen:

 @GET @Path("download") public Response download(@QueryParam("groupId") String groupId) { ResponseBuilder responseBuilder = null; try { MQStreamingOutput streamingOutput = new MQStreamingOutput(new String(Utils.decodeBase64(groupId)), Utils.QUEUE_NAME); responseBuilder = Response.ok(streamingOutput); } catch(Exception e) { e.printStackTrace(); responseBuilder.status(Status.INTERNAL_SERVER_ERROR).entity(e.getMessage()); } return responseBuilder.build(); } 

Stream liest eine Datei aus MQ


Der Algorithmus zum Lesen von Nachrichten von MQ in den ausgehenden Stream besteht aus den folgenden Schritten:

  1. Initialisierung von MQ-Verbindungsobjekten.
  2. Zyklisches Lesen von Nachrichten aus MQ, bis eine Nachricht mit dem Vorzeichen der Beendigung in der Gruppe gelesen wird (Eigenschaft "JMS_IBM_Last_Msg_In_Group"):
    1. Bevor jede Nachricht aus der Warteschlange gelesen wird, wird ein Filter (messageSelector) festgelegt, in dem die Nachrichtengruppen-ID und die Nachrichtenseriennummer in der Gruppe festgelegt werden.
    2. Der Inhalt der gelesenen Nachricht wird in den ausgehenden Stream geschrieben.


 public void read(OutputStream outputStream, String queueName, String groupId) throws IOException, JMSException { try( Connection connection = connectionFactory.createConnection(); Session session = connection.createSession(); ) { connection.start(); Queue queue = session.createQueue(queueName); int sequenceNumber = 1; for(boolean isMessageExist = true; isMessageExist == true; ) { String messageSelector = "JMSXGroupID='" + groupId.replaceAll("'", "''") + "' AND JMSXGroupSeq=" + sequenceNumber++; try( MessageConsumer consumer = session.createConsumer(queue, messageSelector); ) { BytesMessage message = (BytesMessage) consumer.receiveNoWait(); if (message == null) { isMessageExist = false; } else { byte[] buffer = new byte[(int) message.getBodyLength()]; message.readBytes(buffer); outputStream.write(buffer); if (message.getBooleanProperty("JMS_IBM_Last_Msg_In_Group")) { isMessageExist = false; } } } } } } 

REST-Serviceabruf


Um den Service zu testen, verwende ich das Curl-Tool.

Datei hochladen


 curl -X PUT -F file=@<__> http://localhost:9080/Demo/rest/service/upload 

Die Antwort ist eine base64-Zeichenfolge, die die Kennung der Nachrichtengruppe enthält, die wir in der nächsten Methode zum Abrufen der Datei angeben.

Eine Datei empfangen


 curl -X GET http://localhost:9080/Demo/rest/service/download?groupId=<base64____> -o <_____> 

Fazit


In dem Artikel wurde der Ansatz zur Entwicklung eines REST-Dienstes untersucht, mit dem Sie große Datenmengen in die Warteschlange des Nachrichtensystems streamen und empfangen sowie aus der Warteschlange lesen und als Antwort zurückgeben können. Diese Methode reduziert den Ressourcenverbrauch und erhöht dadurch den Durchsatz der Lösung.

Zusätzliche Materialien


Weitere Informationen zur IMultipartBody-Schnittstelle, die zum Empfangen des eingehenden Dateistreams verwendet wird, sind ein Link .

Eine alternative Bibliothek zum Empfangen von Dateien im Streaming-Modus in REST-Diensten ist Apache CXF .

Die StreamingOutput-Schnittstelle zum Streaming einer REST-Antwort an einen Client ist eine Verknüpfung .

Source: https://habr.com/ru/post/de424941/


All Articles