Streaming de dados de um serviço REST para uma fila do MQ

Olá Habr!

Neste artigo, descreverei uma maneira de desenvolver um serviço REST que permita receber arquivos e salvá-los no sistema de mensagens no modo de streaming, sem a necessidade de armazenar o arquivo inteiro no lado do serviço. O cenário inverso também será descrito no qual o cliente receberá como resposta um arquivo localizado no sistema de mensagens.

Para maior clareza, darei exemplos do código de serviço desenvolvido no JEE7 para o servidor de aplicativos IBM WebSphere Liberty Server e o IBM MQ atuará como um sistema de mensagens.
No entanto, o método descrito é adequado para outras plataformas semelhantes, isto é, qualquer provedor de API JMS pode atuar como um sistema de mensagens e qualquer servidor JEE (por exemplo, Apache Tomcat) pode atuar como um servidor de aplicativos.

Declaração do problema


Havia uma necessidade de implementar uma solução que permitisse receber arquivos grandes (> 100 Mb) do cliente e transferi-los para outro sistema geograficamente remoto, e na direção oposta - transferir arquivos desse sistema para o cliente como resposta. Em vista do canal de rede não confiável entre a rede do cliente e a rede do aplicativo, é usado um sistema de mensagens que garante a entrega garantida entre eles.

A solução de nível superior inclui três componentes:

  1. Serviço REST - cuja tarefa é fornecer ao cliente a oportunidade de transferir o arquivo (ou solicitação).
  2. MQ - é responsável pela transmissão de mensagens entre diferentes redes.
  3. Aplicativo - um aplicativo responsável por armazenar arquivos e emiti-los mediante solicitação.

imagem

Neste artigo, descrevo um método para implementar um serviço REST, cujas tarefas incluem:

  • Recebendo um arquivo de um cliente.
  • Transfira o arquivo recebido para o MQ.
  • Transferindo um arquivo do MQ para o cliente como resposta.

Método de solução


Devido ao grande tamanho do arquivo transmitido, não é possível colocá-lo completamente na RAM; além disso, também há uma restrição no lado do MQ - o tamanho máximo de uma mensagem no MQ não pode exceder 100 Mb. Assim, minha decisão será baseada nos seguintes princípios:

  • Receber um arquivo e salvá-lo na fila do MQ deve ser realizado no modo de streaming, sem colocar o arquivo inteiro na memória.
  • Na fila, o arquivo MQ será colocado como um conjunto de pequenas mensagens.

Graficamente, a alocação de arquivos no lado do cliente, serviço REST e MQ é mostrada abaixo:

imagem

No lado do cliente, o arquivo está completamente localizado no sistema de arquivos, no serviço REST, apenas uma parte do arquivo é armazenada na RAM e, no lado do MQ, cada parte do arquivo é colocada como uma mensagem separada.

Desenvolvimento de um serviço REST


Para maior clareza do método de solução proposto, será desenvolvido um serviço REST de demonstração contendo dois métodos:

  • upload - recebe um arquivo do cliente e o grava na fila do MQ, retorna o identificador do grupo de mensagens (no formato base64) como resposta.
  • download - recebe o identificador do grupo de mensagens (no formato base64) do cliente e retorna o arquivo armazenado na fila do MQ.

Método para receber um arquivo de um cliente (upload)


A tarefa do método é obter o fluxo do arquivo recebido e, em seguida, gravá-lo na fila do MQ.

Recebendo o fluxo do arquivo recebido


Para receber um arquivo de entrada do cliente, o método espera um objeto com a interface com.ibm.websphere.jaxrs20.multipart.IMultipartBody como um parâmetro de entrada, que fornece a capacidade de obter um link para o fluxo do arquivo recebido

@PUT @Path("upload") public Response upload(IMultipartBody body) { ... IAttachment attachment = body.getAttachment("file"); InputStream inputStream = attachment.getDataHandler().getInputStream(); ... } 

Essa interface (IMultipartBody) está localizada no archive JAR com.ibm.websphere.appserver.api.jaxrs20_1.0.21.jar, está incluída no IBM Liberty Server e está localizada na pasta: < WLP_INSTALLATION_PATH > / dev / api / ibm.

Nota:

  • WLP_INSTALLATION_PATH - caminho para o diretório do WebSphere Liberty Profile.
  • Espera-se que o cliente transfira o arquivo no parâmetro denominado "arquivo".
  • Se você estiver usando um servidor de aplicativos diferente, poderá usar a biblioteca alternativa do Apache CXF.

Stream salvando um arquivo no MQ


O método recebe o fluxo do arquivo de entrada, o nome da fila do MQ em que o arquivo deve ser gravado e o identificador do grupo de mensagens que será usado para ligar as mensagens. O identificador do grupo é gerado no lado do serviço, por exemplo, com o utilitário org.apache.commons.lang3.RandomStringUtils:

 String groupId = RandomStringUtils.randomAscii(24); 

O algoritmo para salvar o arquivo de entrada no MQ consiste nas seguintes etapas:

  1. Inicialização de objetos de conexão do MQ.
  2. Leitura cíclica de uma parte do arquivo recebido até que o arquivo seja totalmente lido:
    1. Uma parte dos dados do arquivo é registrada como uma mensagem separada no MQ.
    2. Cada mensagem no arquivo possui seu próprio número de série (propriedade "JMSXGroupSeq").
    3. Todas as mensagens no arquivo têm o mesmo valor de grupo (propriedade "JMSXGroupID").
    4. A última mensagem possui um sinal indicando que esta mensagem é final (propriedade "JMS_IBM_Last_Msg_In_Group").
    5. A constante SEGMENT_SIZE contém o tamanho da porção. Por exemplo, 1Mb.

 public void write(InputStream inputStream, String queueName, String groupId) throws IOException, JMSException { try ( Connection connection = connectionFactory.createConnection(); Session session = connection.createSession(); MessageProducer producer = session.createProducer(session.createQueue(queueName)); ) { byte[] buffer = new byte[SEGMENT_SIZE]; BytesMessage message = null; for(int readBytesSize = 1, sequenceNumber = 1; readBytesSize > 0; sequenceNumber++) { readBytesSize = inputStream.read(buffer); if (message != null) { if (readBytesSize < 1) { message.setBooleanProperty("JMS_IBM_Last_Msg_In_Group", true); } producer.send(message); } if (readBytesSize > 0) { message = session.createBytesMessage(); message.setStringProperty("JMSXGroupID", groupId); message.setIntProperty("JMSXGroupSeq", sequenceNumber); if (readBytesSize == SEGMENT_SIZE) { message.writeBytes(buffer); } else { message.writeBytes(Arrays.copyOf(buffer, readBytesSize)); } } } } } 

Método para enviar um arquivo para o cliente (download)


O método obtém o identificador de um grupo de mensagens no formato base64, pelo qual ele lê mensagens da fila do MQ e envia como resposta no modo de streaming.

Obtendo o ID do Grupo de Mensagens


O método recebe o identificador do grupo de mensagens como um parâmetro de entrada.

 @PUT @Path("download") public Response download(@QueryParam("groupId") String groupId) { ... } 

Streaming de uma resposta para um cliente


Para transferir um arquivo para o cliente, armazenado como um conjunto de mensagens separadas no MQ, no modo de streaming, crie uma classe com a interface javax.ws.rs.core.StreamingOutput:

 public class MQStreamingOutput implements StreamingOutput { private String groupId; private String queueName; public MQStreamingOutput(String groupId, String queueName) { super(); this.groupId = groupId; this.queueName = queueName; } @Override public void write(OutputStream outputStream) throws IOException, WebApplicationException { try { new MQWorker().read(outputStream, queueName, groupId); } catch(NamingException | JMSException e) { e.printStackTrace(); new IOException(e); } finally { outputStream.flush(); outputStream.close(); } } } 

Na classe, implementamos o método write, que recebe uma referência de entrada para o fluxo de saída no qual as mensagens do MQ serão gravadas. Também adicionei o nome da fila e o identificador do grupo cujas mensagens serão lidas para a classe.

Um objeto desta classe será passado como parâmetro para criar uma resposta para o cliente:

 @GET @Path("download") public Response download(@QueryParam("groupId") String groupId) { ResponseBuilder responseBuilder = null; try { MQStreamingOutput streamingOutput = new MQStreamingOutput(new String(Utils.decodeBase64(groupId)), Utils.QUEUE_NAME); responseBuilder = Response.ok(streamingOutput); } catch(Exception e) { e.printStackTrace(); responseBuilder.status(Status.INTERNAL_SERVER_ERROR).entity(e.getMessage()); } return responseBuilder.build(); } 

Stream lendo um arquivo do MQ


O algoritmo para ler mensagens do MQ para o fluxo de saída consiste nas seguintes etapas:

  1. Inicialização de objetos de conexão do MQ.
  2. Leitura cíclica de mensagens do MQ até que uma mensagem com o sinal de finalização no grupo seja lida (propriedade "JMS_IBM_Last_Msg_In_Group"):
    1. Antes de cada mensagem ser lida da fila, é definido um filtro (messageSelector), no qual o identificador do grupo de mensagens e o número de série da mensagem no grupo são definidos.
    2. O conteúdo da mensagem lida é gravado no fluxo de saída.


 public void read(OutputStream outputStream, String queueName, String groupId) throws IOException, JMSException { try( Connection connection = connectionFactory.createConnection(); Session session = connection.createSession(); ) { connection.start(); Queue queue = session.createQueue(queueName); int sequenceNumber = 1; for(boolean isMessageExist = true; isMessageExist == true; ) { String messageSelector = "JMSXGroupID='" + groupId.replaceAll("'", "''") + "' AND JMSXGroupSeq=" + sequenceNumber++; try( MessageConsumer consumer = session.createConsumer(queue, messageSelector); ) { BytesMessage message = (BytesMessage) consumer.receiveNoWait(); if (message == null) { isMessageExist = false; } else { byte[] buffer = new byte[(int) message.getBodyLength()]; message.readBytes(buffer); outputStream.write(buffer); if (message.getBooleanProperty("JMS_IBM_Last_Msg_In_Group")) { isMessageExist = false; } } } } } } 

Chamada de serviço REST


Para testar o serviço, usarei a ferramenta de curvatura.

Upload de arquivo


 curl -X PUT -F file=@<__> http://localhost:9080/Demo/rest/service/upload 

A resposta será uma string base64 que contém o identificador do grupo de mensagens, que indicaremos no próximo método para obter o arquivo.

Recebendo um arquivo


 curl -X GET http://localhost:9080/Demo/rest/service/download?groupId=<base64____> -o <_____> 

Conclusão


O artigo examinou a abordagem para o desenvolvimento de um serviço REST que permite transmitir e receber grandes dados na fila do sistema de mensagens, bem como lê-los na fila para retornar como resposta. Esse método reduz o uso de recursos e, portanto, aumenta o rendimento da solução.

Materiais adicionais


Mais informações sobre a interface IMultipartBody usada para receber o fluxo de arquivos de entrada é um link .

Uma biblioteca alternativa para receber arquivos no modo de streaming nos serviços REST é o Apache CXF .

A interface StreamingOutput para transmitir uma resposta REST a um cliente é um link .

Source: https://habr.com/ru/post/pt424941/


All Articles