Olá Habr!
Neste artigo, descreverei uma maneira de desenvolver um serviço REST que permita receber arquivos e salvá-los no sistema de mensagens no modo de streaming, sem a necessidade de armazenar o arquivo inteiro no lado do serviço. O cenário inverso também será descrito no qual o cliente receberá como resposta um arquivo localizado no sistema de mensagens.
Para maior clareza, darei exemplos do código de serviço desenvolvido no JEE7 para o servidor de aplicativos IBM WebSphere Liberty Server e o IBM MQ atuará como um sistema de mensagens.
No entanto, o método descrito é adequado para outras plataformas semelhantes, isto é, qualquer provedor de API JMS pode atuar como um sistema de mensagens e qualquer servidor JEE (por exemplo, Apache Tomcat) pode atuar como um servidor de aplicativos.
Declaração do problema
Havia uma necessidade de implementar uma solução que permitisse receber arquivos grandes (> 100 Mb) do cliente e transferi-los para outro sistema geograficamente remoto, e na direção oposta - transferir arquivos desse sistema para o cliente como resposta. Em vista do canal de rede não confiável entre a rede do cliente e a rede do aplicativo, é usado um sistema de mensagens que garante a entrega garantida entre eles.
A solução de nível superior inclui três componentes:
- Serviço REST - cuja tarefa é fornecer ao cliente a oportunidade de transferir o arquivo (ou solicitação).
- MQ - é responsável pela transmissão de mensagens entre diferentes redes.
- Aplicativo - um aplicativo responsável por armazenar arquivos e emiti-los mediante solicitação.
Neste artigo, descrevo um método para implementar um serviço REST, cujas tarefas incluem:
- Recebendo um arquivo de um cliente.
- Transfira o arquivo recebido para o MQ.
- Transferindo um arquivo do MQ para o cliente como resposta.
Método de solução
Devido ao grande tamanho do arquivo transmitido, não é possível colocá-lo completamente na RAM; além disso, também há uma restrição no lado do MQ - o tamanho máximo de uma mensagem no MQ não pode exceder 100 Mb. Assim, minha decisão será baseada nos seguintes princípios:
- Receber um arquivo e salvá-lo na fila do MQ deve ser realizado no modo de streaming, sem colocar o arquivo inteiro na memória.
- Na fila, o arquivo MQ será colocado como um conjunto de pequenas mensagens.
Graficamente, a alocação de arquivos no lado do cliente, serviço REST e MQ é mostrada abaixo:
No lado do cliente, o arquivo está completamente localizado no sistema de arquivos, no serviço REST, apenas uma parte do arquivo é armazenada na RAM e, no lado do MQ, cada parte do arquivo é colocada como uma mensagem separada.
Desenvolvimento de um serviço REST
Para maior clareza do método de solução proposto, será desenvolvido um serviço REST de demonstração contendo dois métodos:
- upload - recebe um arquivo do cliente e o grava na fila do MQ, retorna o identificador do grupo de mensagens (no formato base64) como resposta.
- download - recebe o identificador do grupo de mensagens (no formato base64) do cliente e retorna o arquivo armazenado na fila do MQ.
Método para receber um arquivo de um cliente (upload)
A tarefa do método é obter o fluxo do arquivo recebido e, em seguida, gravá-lo na fila do MQ.
Recebendo o fluxo do arquivo recebido
Para receber um arquivo de entrada do cliente, o método espera um objeto com a interface com.ibm.websphere.jaxrs20.multipart.IMultipartBody como um parâmetro de entrada, que fornece a capacidade de obter um link para o fluxo do arquivo recebido
@PUT @Path("upload") public Response upload(IMultipartBody body) { ... IAttachment attachment = body.getAttachment("file"); InputStream inputStream = attachment.getDataHandler().getInputStream(); ... }
Essa interface (IMultipartBody) está localizada no archive JAR com.ibm.websphere.appserver.api.jaxrs20_1.0.21.jar, está incluída no IBM Liberty Server e está localizada na pasta: <
WLP_INSTALLATION_PATH > / dev / api / ibm.
Nota:
- WLP_INSTALLATION_PATH - caminho para o diretório do WebSphere Liberty Profile.
- Espera-se que o cliente transfira o arquivo no parâmetro denominado "arquivo".
- Se você estiver usando um servidor de aplicativos diferente, poderá usar a biblioteca alternativa do Apache CXF.
Stream salvando um arquivo no MQ
O método recebe o fluxo do arquivo de entrada, o nome da fila do MQ em que o arquivo deve ser gravado e o identificador do grupo de mensagens que será usado para ligar as mensagens. O identificador do grupo é gerado no lado do serviço, por exemplo, com o utilitário org.apache.commons.lang3.RandomStringUtils:
String groupId = RandomStringUtils.randomAscii(24);
O algoritmo para salvar o arquivo de entrada no MQ consiste nas seguintes etapas:
- Inicialização de objetos de conexão do MQ.
- Leitura cíclica de uma parte do arquivo recebido até que o arquivo seja totalmente lido:
- Uma parte dos dados do arquivo é registrada como uma mensagem separada no MQ.
- Cada mensagem no arquivo possui seu próprio número de série (propriedade "JMSXGroupSeq").
- Todas as mensagens no arquivo têm o mesmo valor de grupo (propriedade "JMSXGroupID").
- A última mensagem possui um sinal indicando que esta mensagem é final (propriedade "JMS_IBM_Last_Msg_In_Group").
- A constante SEGMENT_SIZE contém o tamanho da porção. Por exemplo, 1Mb.
public void write(InputStream inputStream, String queueName, String groupId) throws IOException, JMSException { try ( Connection connection = connectionFactory.createConnection(); Session session = connection.createSession(); MessageProducer producer = session.createProducer(session.createQueue(queueName)); ) { byte[] buffer = new byte[SEGMENT_SIZE]; BytesMessage message = null; for(int readBytesSize = 1, sequenceNumber = 1; readBytesSize > 0; sequenceNumber++) { readBytesSize = inputStream.read(buffer); if (message != null) { if (readBytesSize < 1) { message.setBooleanProperty("JMS_IBM_Last_Msg_In_Group", true); } producer.send(message); } if (readBytesSize > 0) { message = session.createBytesMessage(); message.setStringProperty("JMSXGroupID", groupId); message.setIntProperty("JMSXGroupSeq", sequenceNumber); if (readBytesSize == SEGMENT_SIZE) { message.writeBytes(buffer); } else { message.writeBytes(Arrays.copyOf(buffer, readBytesSize)); } } } } }
Método para enviar um arquivo para o cliente (download)
O método obtém o identificador de um grupo de mensagens no formato base64, pelo qual ele lê mensagens da fila do MQ e envia como resposta no modo de streaming.
Obtendo o ID do Grupo de Mensagens
O método recebe o identificador do grupo de mensagens como um parâmetro de entrada.
@PUT @Path("download") public Response download(@QueryParam("groupId") String groupId) { ... }
Streaming de uma resposta para um cliente
Para transferir um arquivo para o cliente, armazenado como um conjunto de mensagens separadas no MQ, no modo de streaming, crie uma classe com a interface javax.ws.rs.core.StreamingOutput:
public class MQStreamingOutput implements StreamingOutput { private String groupId; private String queueName; public MQStreamingOutput(String groupId, String queueName) { super(); this.groupId = groupId; this.queueName = queueName; } @Override public void write(OutputStream outputStream) throws IOException, WebApplicationException { try { new MQWorker().read(outputStream, queueName, groupId); } catch(NamingException | JMSException e) { e.printStackTrace(); new IOException(e); } finally { outputStream.flush(); outputStream.close(); } } }
Na classe, implementamos o método write, que recebe uma referência de entrada para o fluxo de saída no qual as mensagens do MQ serão gravadas. Também adicionei o nome da fila e o identificador do grupo cujas mensagens serão lidas para a classe.
Um objeto desta classe será passado como parâmetro para criar uma resposta para o cliente:
@GET @Path("download") public Response download(@QueryParam("groupId") String groupId) { ResponseBuilder responseBuilder = null; try { MQStreamingOutput streamingOutput = new MQStreamingOutput(new String(Utils.decodeBase64(groupId)), Utils.QUEUE_NAME); responseBuilder = Response.ok(streamingOutput); } catch(Exception e) { e.printStackTrace(); responseBuilder.status(Status.INTERNAL_SERVER_ERROR).entity(e.getMessage()); } return responseBuilder.build(); }
Stream lendo um arquivo do MQ
O algoritmo para ler mensagens do MQ para o fluxo de saída consiste nas seguintes etapas:
- Inicialização de objetos de conexão do MQ.
- Leitura cíclica de mensagens do MQ até que uma mensagem com o sinal de finalização no grupo seja lida (propriedade "JMS_IBM_Last_Msg_In_Group"):
- Antes de cada mensagem ser lida da fila, é definido um filtro (messageSelector), no qual o identificador do grupo de mensagens e o número de série da mensagem no grupo são definidos.
- O conteúdo da mensagem lida é gravado no fluxo de saída.
public void read(OutputStream outputStream, String queueName, String groupId) throws IOException, JMSException { try( Connection connection = connectionFactory.createConnection(); Session session = connection.createSession(); ) { connection.start(); Queue queue = session.createQueue(queueName); int sequenceNumber = 1; for(boolean isMessageExist = true; isMessageExist == true; ) { String messageSelector = "JMSXGroupID='" + groupId.replaceAll("'", "''") + "' AND JMSXGroupSeq=" + sequenceNumber++; try( MessageConsumer consumer = session.createConsumer(queue, messageSelector); ) { BytesMessage message = (BytesMessage) consumer.receiveNoWait(); if (message == null) { isMessageExist = false; } else { byte[] buffer = new byte[(int) message.getBodyLength()]; message.readBytes(buffer); outputStream.write(buffer); if (message.getBooleanProperty("JMS_IBM_Last_Msg_In_Group")) { isMessageExist = false; } } } } } }
Chamada de serviço REST
Para testar o serviço, usarei a ferramenta de curvatura.
Upload de arquivo
curl -X PUT -F file=@<__> http://localhost:9080/Demo/rest/service/upload
A resposta será uma string base64 que contém o identificador do grupo de mensagens, que indicaremos no próximo método para obter o arquivo.
Recebendo um arquivo
curl -X GET http://localhost:9080/Demo/rest/service/download?groupId=<base64____> -o <_____>
Conclusão
O artigo examinou a abordagem para o desenvolvimento de um serviço REST que permite transmitir e receber grandes dados na fila do sistema de mensagens, bem como lê-los na fila para retornar como resposta. Esse método reduz o uso de recursos e, portanto, aumenta o rendimento da solução.
Materiais adicionais
Mais informações sobre a interface IMultipartBody usada para receber o fluxo de arquivos de entrada é um
link .
Uma biblioteca alternativa para receber arquivos no modo de streaming nos serviços REST é o
Apache CXF .
A interface StreamingOutput para transmitir uma resposta REST a um cliente é um
link .