将数据从REST服务流传输到MQ队列

哈Ha!

在本文中,我将介绍一种开发REST服务的方法,该服务允许您接收文件并将其以流模式保存到消息传递系统中,而无需将整个文件存储在该服务的一侧。 还将描述相反的情况,其中客户端将接收位于消息传递系统中的文件作为响应。

为了清楚起见,我将在JEE7上提供针对IBM WebSphere Liberty Server应用程序服务器的已开发服务代码的示例,而IBM MQ将充当消息传递系统。
但是,所描述的方法适用于其他类似的平台,即 任何JMS API提供程序都可以充当消息传递系统,任何JEE服务器(例如Apache Tomcat)都可以充当应用程序服务器。

问题陈述


需要实现一种解决方案,该解决方案既允许从客户端接收大文件(> 100 Mb),又将它们传输到另一个地理位置较远的系统,并且方向相反-将文件从该系统传输到客户端作为答案。 鉴于客户端网络和应用程序网络之间的网络通道不可靠,因此使用消息传递系统来确保它们之间的传递有保证。

顶级解决方案包括三个组件:

  1. REST服务-其任务是为客户端提供传输文件(或请求)的机会。
  2. MQ-负责不同网络之间的消息传输。
  3. 应用程序-负责存储文件并根据要求发布文件的应用程序。

图片

在本文中,我描述了一种实现REST服务的方法,其任务包括:

  • 从客户端接收文件。
  • 将收到的文件传输到MQ。
  • 将文件从MQ传输到客户端作为响应。

解决方法


由于传输文件的大小很大,因此无法将其完全放置在RAM中,此外,MQ端也有限制-MQ中一条消息的最大大小不能超过100 Mb。 因此,我的决定将基于以下原则:

  • 接收文件并将其保存在MQ队列中应在流模式下执行,而不要将整个文件都放在内存中。
  • 在队列中,MQ文件将作为一组小消息放置。

图形化显示了客户端,REST服务和MQ上的文件分配,如下所示:

图片

在客户端,文件完全位于文件系统上,在REST服务中,仅文件的一部分存储在RAM中,而在MQ端,文件的每个部分作为单独的消息放置。

REST服务的开发


为了使提出的解决方案方法更加清晰,将开发一个演示REST服务,其中包含两种方法:

  • upload-从客户端接收文件并将其写入MQ队列,返回消息组标识符(以base64格式)作为响应。
  • download-从客户端接收消息组标识符(采用base64格式),并返回存储在MQ队列中的文件。

从客户端接收文件的方法(上传)


该方法的任务是获取传入文件的流,然后将其写入MQ队列。

接收传入文件的流


要从客户端接收输入文件,该方法需要一个以com.ibm.websphere.jaxrs20.multipart.IMultipartBody接口作为输入参数的对象,该对象提供了获取到传入文件流的链接的功能。

@PUT @Path("upload") public Response upload(IMultipartBody body) { ... IAttachment attachment = body.getAttachment("file"); InputStream inputStream = attachment.getDataHandler().getInputStream(); ... } 

此接口(IMultipartBody)位于com.ibm.websphere.appserver.api.jaxrs20_1.0.21.jar JAR归档文件中,该文件已包含在IBM Liberty Server中,并且位于以下文件夹中:< WLP_INSTALLATION_PATH > / dev / api / ibm。

注意事项:

  • WLP_INSTALLATION_PATH -WebSphere Liberty Profile目录的路径。
  • 预期客户端将在名为“ file”的参数中传输文件。
  • 如果使用其他应用程序服务器,则可以使用Apache CXF中的备用库。

流保存MQ中的文件


该方法接收输入文件流,应在其中写入文件的MQ队列的名称以及将用于绑定消息的消息组的标识符。 组标识符是在服务端生成的,例如,使用实用程序org.apache.commons.lang3.RandomStringUtils:

 String groupId = RandomStringUtils.randomAscii(24); 

在MQ中保存输入文件的算法包括以下步骤:

  1. MQ连接对象的初始化。
  2. 循环读取部分传入文件,直到完全读取文件为止:
    1. 一堆文件数据被记录为MQ中的单独消息。
    2. 文件中的每个消息都有其自己的序列号(属性“ JMSXGroupSeq”)。
    3. 文件中的所有消息都具有相同的组值(属性“ JMSXGroupID”)。
    4. 最后一条消息带有指示该消息是最终消息的符号(属性“ JMS_IBM_Last_Msg_In_Group”)。
    5. SEGMENT_SIZE常数包含份量。 例如1Mb。

 public void write(InputStream inputStream, String queueName, String groupId) throws IOException, JMSException { try ( Connection connection = connectionFactory.createConnection(); Session session = connection.createSession(); MessageProducer producer = session.createProducer(session.createQueue(queueName)); ) { byte[] buffer = new byte[SEGMENT_SIZE]; BytesMessage message = null; for(int readBytesSize = 1, sequenceNumber = 1; readBytesSize > 0; sequenceNumber++) { readBytesSize = inputStream.read(buffer); if (message != null) { if (readBytesSize < 1) { message.setBooleanProperty("JMS_IBM_Last_Msg_In_Group", true); } producer.send(message); } if (readBytesSize > 0) { message = session.createBytesMessage(); message.setStringProperty("JMSXGroupID", groupId); message.setIntProperty("JMSXGroupSeq", sequenceNumber); if (readBytesSize == SEGMENT_SIZE) { message.writeBytes(buffer); } else { message.writeBytes(Arrays.copyOf(buffer, readBytesSize)); } } } } } 

向客户端发送文件的方法(下载)


该方法获得base64格式的一组消息的标识符,通过该方法,它从MQ队列中读取消息并将其作为响应以流方式发送。

获取消息组ID


该方法接收消息组标识符作为输入参数。

 @PUT @Path("download") public Response download(@QueryParam("groupId") String groupId) { ... } 

流式传输到客户端


要以流模式将文件存储为MQ中的一组单独的消息传输到客户端,请使用javax.ws.rs.core.StreamingOutput接口创建一个类:

 public class MQStreamingOutput implements StreamingOutput { private String groupId; private String queueName; public MQStreamingOutput(String groupId, String queueName) { super(); this.groupId = groupId; this.queueName = queueName; } @Override public void write(OutputStream outputStream) throws IOException, WebApplicationException { try { new MQWorker().read(outputStream, queueName, groupId); } catch(NamingException | JMSException e) { e.printStackTrace(); new IOException(e); } finally { outputStream.flush(); outputStream.close(); } } } 

在该类中,我们实现了write方法,该方法接收对将要写入MQ消息的传出流的输入引用。 我还添加了队列名称和将其消息读取到该类的组的标识符。

此类的对象将作为参数传递以创建对客户端的响应:

 @GET @Path("download") public Response download(@QueryParam("groupId") String groupId) { ResponseBuilder responseBuilder = null; try { MQStreamingOutput streamingOutput = new MQStreamingOutput(new String(Utils.decodeBase64(groupId)), Utils.QUEUE_NAME); responseBuilder = Response.ok(streamingOutput); } catch(Exception e) { e.printStackTrace(); responseBuilder.status(Status.INTERNAL_SERVER_ERROR).entity(e.getMessage()); } return responseBuilder.build(); } 

流从MQ读取文件


从MQ读取消息到传出流的算法包括以下步骤:

  1. MQ连接对象的初始化。
  2. 从MQ循环读取消息,直到读取带有组中终止符号的消息为止(属性“ JMS_IBM_Last_Msg_In_Group”):
    1. 在从队列中读取每个消息之前,先设置一个过滤器(messageSelector),在其中设置消息组标识符和组中的消息序列号。
    2. 已读消息的内容被写入输出流。


 public void read(OutputStream outputStream, String queueName, String groupId) throws IOException, JMSException { try( Connection connection = connectionFactory.createConnection(); Session session = connection.createSession(); ) { connection.start(); Queue queue = session.createQueue(queueName); int sequenceNumber = 1; for(boolean isMessageExist = true; isMessageExist == true; ) { String messageSelector = "JMSXGroupID='" + groupId.replaceAll("'", "''") + "' AND JMSXGroupSeq=" + sequenceNumber++; try( MessageConsumer consumer = session.createConsumer(queue, messageSelector); ) { BytesMessage message = (BytesMessage) consumer.receiveNoWait(); if (message == null) { isMessageExist = false; } else { byte[] buffer = new byte[(int) message.getBodyLength()]; message.readBytes(buffer); outputStream.write(buffer); if (message.getBooleanProperty("JMS_IBM_Last_Msg_In_Group")) { isMessageExist = false; } } } } } } 

REST服务电话


为了测试服务,我将使用curl工具。

文件上传


 curl -X PUT -F file=@<__> http://localhost:9080/Demo/rest/service/upload 

响应将是一个包含消息组标识符的base64字符串,我们将在下一个获取文件的方法中指出该标识符。

接收文件


 curl -X GET http://localhost:9080/Demo/rest/service/download?groupId=<base64____> -o <_____> 

结论


本文探讨了开发REST服务的方法,该方法允许流式传输在邮件系统的队列中接收和保存大数据,以及从队列中读取数据以作为响应返回。 此方法减少了资源的使用,从而提高了解决方案的吞吐量。

附加材料


有关用于接收传入文件流的IMultipartBody接口的更多信息,请参见链接

REST服务中以流模式接收文件的另一个库是Apache CXF

用于将REST响应流传输到客户端的StreamingOutput接口是一个链接

Source: https://habr.com/ru/post/zh-CN424941/


All Articles