Streaming de données d'un service REST vers une file d'attente MQ

Bonjour, Habr!

Dans l'article, je vais décrire un moyen de développer un service REST qui vous permet de recevoir des fichiers et de les enregistrer sur le système de messagerie en mode streaming sans avoir besoin de stocker l'intégralité du fichier sur le côté du service. Le scénario inverse sera également décrit dans lequel le client recevra en réponse un fichier situé dans le système de messagerie.

Pour plus de clarté, je vais donner des exemples du code de service développé sur JEE7 pour le serveur d'applications IBM WebSphere Liberty Server, et IBM MQ agira comme un système de messagerie.
Cependant, la méthode décrite convient à d'autres plates-formes similaires, à savoir tout fournisseur d'API JMS peut agir comme un système de messagerie et tout serveur JEE (par exemple, Apache Tomcat) peut agir comme un serveur d'applications.

Énoncé du problème


Il était nécessaire de mettre en œuvre une solution qui permettrait à la fois de recevoir des fichiers volumineux (> 100 Mo) du client et de les transférer vers un autre système géographiquement distant, et dans le sens opposé - de transférer des fichiers de ce système vers le client comme réponse. Compte tenu du canal réseau peu fiable entre le réseau client et le réseau d'application, un système de messagerie est utilisé qui garantit une livraison garantie entre eux.

La solution de premier niveau comprend trois composants:

  1. Service REST - dont la tâche est de fournir au client la possibilité de transférer le fichier (ou la demande).
  2. MQ - est responsable de la transmission des messages entre les différents réseaux.
  3. Application - une application chargée de stocker les fichiers et de les publier sur demande.

image

Dans cet article, je décris une méthode pour implémenter un service REST, dont les tâches incluent:

  • Réception d'un fichier d'un client.
  • Transférez le fichier reçu vers MQ.
  • Transfert d'un fichier de MQ au client en tant que réponse.

Méthode de solution


En raison de la grande taille du fichier transmis, il n'est pas possible de le placer complètement dans la RAM, de plus, il y a également une restriction du côté MQ - la taille maximale d'un message dans MQ ne peut pas dépasser 100 Mo. Ainsi, ma décision sera basée sur les principes suivants:

  • La réception d'un fichier et son enregistrement dans la file d'attente MQ doivent être effectués en mode streaming, sans placer l'intégralité du fichier en mémoire.
  • Dans la file d'attente, le fichier MQ sera placé comme un ensemble de petits messages.

Graphiquement, l'allocation des fichiers côté client, service REST et MQ est illustrée ci-dessous:

image

Côté client, le fichier est entièrement localisé sur le système de fichiers, dans le service REST, seule une partie du fichier est stockée en RAM et côté MQ, chaque partie du fichier est placée sous forme de message distinct.

Développement d'un service REST


Pour plus de clarté sur la méthode de solution proposée, un service REST de démonstration sera développé contenant deux méthodes:

  • upload - reçoit un fichier du client et l'écrit dans la file d'attente MQ, renvoie l'identifiant du groupe de messages (au format base64) en réponse.
  • téléchargement - reçoit l'identifiant du groupe de messages (au format base64) du client et renvoie le fichier stocké dans la file d'attente MQ.

Méthode de réception d'un fichier d'un client (téléchargement)


La tâche de la méthode consiste à obtenir le flux du fichier entrant, puis à l'écrire dans la file d'attente MQ.

Réception du flux du fichier entrant


Pour recevoir un fichier d'entrée du client, la méthode attend un objet avec l'interface com.ibm.websphere.jaxrs20.multipart.IMultipartBody comme paramètre d'entrée, ce qui permet d'obtenir un lien vers le flux du fichier entrant

@PUT @Path("upload") public Response upload(IMultipartBody body) { ... IAttachment attachment = body.getAttachment("file"); InputStream inputStream = attachment.getDataHandler().getInputStream(); ... } 

Cette interface (IMultipartBody) se trouve dans l'archive JAR com.ibm.websphere.appserver.api.jaxrs20_1.0.21.jar, est incluse avec IBM Liberty Server et se trouve dans le dossier: < WLP_INSTALLATION_PATH > / dev / api / ibm.

Remarque:

  • WLP_INSTALLATION_PATH - chemin d'accès au répertoire WebSphere Liberty Profile.
  • Il est prévu que le client transfère le fichier dans le paramètre nommé "fichier".
  • Si vous utilisez un autre serveur d'applications, vous pouvez utiliser la bibliothèque alternative d'Apache CXF.

Stream enregistrer un fichier dans MQ


La méthode reçoit le flux de fichiers d'entrée, le nom de la file d'attente MQ où le fichier doit être écrit et l'identifiant du groupe de messages qui sera utilisé pour lier les messages. L'identifiant de groupe est généré côté service, par exemple, avec l'utilitaire org.apache.commons.lang3.RandomStringUtils:

 String groupId = RandomStringUtils.randomAscii(24); 

L'algorithme d'enregistrement du fichier d'entrée dans MQ comprend les étapes suivantes:

  1. Initialisation des objets de connexion MQ.
  2. Lecture cyclique d'une partie du fichier entrant jusqu'à ce que le fichier soit entièrement lu:
    1. Un bloc de données de fichier est enregistré en tant que message séparé dans MQ.
    2. Chaque message du fichier possède son propre numéro de série (propriété "JMSXGroupSeq").
    3. Tous les messages du fichier ont la même valeur de groupe (propriété "JMSXGroupID").
    4. Le dernier message a un signe indiquant que ce message est final (propriété "JMS_IBM_Last_Msg_In_Group").
    5. La constante SEGMENT_SIZE contient la taille de la portion. Par exemple, 1 Mo.

 public void write(InputStream inputStream, String queueName, String groupId) throws IOException, JMSException { try ( Connection connection = connectionFactory.createConnection(); Session session = connection.createSession(); MessageProducer producer = session.createProducer(session.createQueue(queueName)); ) { byte[] buffer = new byte[SEGMENT_SIZE]; BytesMessage message = null; for(int readBytesSize = 1, sequenceNumber = 1; readBytesSize > 0; sequenceNumber++) { readBytesSize = inputStream.read(buffer); if (message != null) { if (readBytesSize < 1) { message.setBooleanProperty("JMS_IBM_Last_Msg_In_Group", true); } producer.send(message); } if (readBytesSize > 0) { message = session.createBytesMessage(); message.setStringProperty("JMSXGroupID", groupId); message.setIntProperty("JMSXGroupSeq", sequenceNumber); if (readBytesSize == SEGMENT_SIZE) { message.writeBytes(buffer); } else { message.writeBytes(Arrays.copyOf(buffer, readBytesSize)); } } } } } 

Méthode d'envoi d'un fichier au client (téléchargement)


Le procédé obtient l'identifiant du groupe de messages au format base64, par lequel il lit les messages de la file d'attente MQ et l'envoie comme réponse en mode streaming.

Obtention de l'ID du groupe de messages


Le procédé reçoit l'identifiant de groupe de messages en tant que paramètre d'entrée.

 @PUT @Path("download") public Response download(@QueryParam("groupId") String groupId) { ... } 

Streaming d'une réponse à un client


Pour transférer un fichier vers le client, stocké sous la forme d'un ensemble de messages séparés dans MQ, en mode streaming, créez une classe avec l'interface javax.ws.rs.core.StreamingOutput:

 public class MQStreamingOutput implements StreamingOutput { private String groupId; private String queueName; public MQStreamingOutput(String groupId, String queueName) { super(); this.groupId = groupId; this.queueName = queueName; } @Override public void write(OutputStream outputStream) throws IOException, WebApplicationException { try { new MQWorker().read(outputStream, queueName, groupId); } catch(NamingException | JMSException e) { e.printStackTrace(); new IOException(e); } finally { outputStream.flush(); outputStream.close(); } } } 

Dans la classe, nous implémentons la méthode d'écriture, qui reçoit une référence d'entrée au flux sortant dans lequel les messages de MQ seront écrits. J'ai également ajouté le nom de la file d'attente et l'identifiant du groupe dont les messages seront lus à la classe.

Un objet de cette classe sera passé en paramètre pour créer une réponse au client:

 @GET @Path("download") public Response download(@QueryParam("groupId") String groupId) { ResponseBuilder responseBuilder = null; try { MQStreamingOutput streamingOutput = new MQStreamingOutput(new String(Utils.decodeBase64(groupId)), Utils.QUEUE_NAME); responseBuilder = Response.ok(streamingOutput); } catch(Exception e) { e.printStackTrace(); responseBuilder.status(Status.INTERNAL_SERVER_ERROR).entity(e.getMessage()); } return responseBuilder.build(); } 

Lecture en continu d'un fichier à partir de MQ


L'algorithme de lecture des messages de MQ vers le flux sortant comprend les étapes suivantes:

  1. Initialisation des objets de connexion MQ.
  2. Lecture cyclique des messages de MQ jusqu'à la lecture d'un message avec le signe de la terminaison dans le groupe (propriété "JMS_IBM_Last_Msg_In_Group"):
    1. Avant que chaque message ne soit lu dans la file d'attente, un filtre (messageSelector) est défini, dans lequel l'identifiant du groupe de messages et le numéro de série du message dans le groupe sont définis.
    2. Le contenu du message lu est écrit dans le flux sortant.


 public void read(OutputStream outputStream, String queueName, String groupId) throws IOException, JMSException { try( Connection connection = connectionFactory.createConnection(); Session session = connection.createSession(); ) { connection.start(); Queue queue = session.createQueue(queueName); int sequenceNumber = 1; for(boolean isMessageExist = true; isMessageExist == true; ) { String messageSelector = "JMSXGroupID='" + groupId.replaceAll("'", "''") + "' AND JMSXGroupSeq=" + sequenceNumber++; try( MessageConsumer consumer = session.createConsumer(queue, messageSelector); ) { BytesMessage message = (BytesMessage) consumer.receiveNoWait(); if (message == null) { isMessageExist = false; } else { byte[] buffer = new byte[(int) message.getBodyLength()]; message.readBytes(buffer); outputStream.write(buffer); if (message.getBooleanProperty("JMS_IBM_Last_Msg_In_Group")) { isMessageExist = false; } } } } } } 

Appel de service REST


Pour tester le service, j'utiliserai l'outil curl.

Téléchargement de fichier


 curl -X PUT -F file=@<__> http://localhost:9080/Demo/rest/service/upload 

La réponse sera une chaîne base64 contenant l'identifiant du groupe de messages, que nous indiquerons dans la prochaine méthode pour obtenir le fichier.

Recevoir un fichier


 curl -X GET http://localhost:9080/Demo/rest/service/download?groupId=<base64____> -o <_____> 

Conclusion


L'article a examiné l'approche de développement d'un service REST qui vous permet de diffuser et de recevoir des données volumineuses dans la file d'attente du système de messagerie, ainsi que de les lire dans la file d'attente pour les renvoyer en réponse. Cette méthode réduit l'utilisation des ressources et augmente ainsi le débit de la solution.

Matériel supplémentaire


Plus d'informations sur l'interface IMultipartBody utilisée pour recevoir le flux de fichiers entrant est un lien .

Apache CXF est une bibliothèque alternative pour recevoir des fichiers en mode streaming dans les services REST.

L'interface StreamingOutput pour diffuser une réponse REST à un client est un lien .

Source: https://habr.com/ru/post/fr424941/


All Articles