Streaming data dari layanan REST ke antrian MQ

Halo, Habr!

Pada artikel ini, saya akan menjelaskan cara untuk mengembangkan layanan REST yang memungkinkan Anda untuk menerima file dan menyimpannya ke sistem pesan dalam mode streaming tanpa perlu menyimpan seluruh file di sisi layanan. Skenario terbalik juga akan dijelaskan di mana klien akan menerima sebagai respons file yang terletak di sistem pesan.

Untuk kejelasan, saya akan memberikan contoh kode layanan yang dikembangkan pada JEE7 untuk server aplikasi IBM WebSphere Liberty Server, dan IBM MQ akan bertindak sebagai sistem pengiriman pesan.
Namun, metode yang dideskripsikan cocok untuk platform serupa lainnya, mis. penyedia API JMS apa pun dapat bertindak sebagai sistem pengiriman pesan, dan server JEE apa pun (misalnya, Apache Tomcat) dapat bertindak sebagai server aplikasi.

Pernyataan masalah


Ada kebutuhan untuk mengimplementasikan solusi yang akan memungkinkan keduanya menerima file besar (> 100 Mb) dari klien dan mentransfernya ke sistem lain yang jauh secara geografis, dan sebaliknya - mentransfer file dari sistem ini ke klien sebagai jawaban. Mengingat saluran jaringan yang tidak dapat diandalkan antara jaringan klien dan jaringan aplikasi, sistem pengiriman pesan digunakan untuk memastikan pengiriman yang terjamin di antara mereka.

Solusi tingkat atas mencakup tiga komponen:

  1. Layanan REST - tugasnya adalah memberi klien kesempatan untuk mentransfer file (atau permintaan).
  2. MQ - bertanggung jawab untuk pengiriman pesan antar jaringan yang berbeda.
  3. Aplikasi - aplikasi yang bertanggung jawab untuk menyimpan file dan mengeluarkannya berdasarkan permintaan.

gambar

Dalam artikel ini, saya menjelaskan metode untuk mengimplementasikan layanan REST, tugas-tugasnya meliputi:

  • Menerima file dari klien.
  • Transfer file yang diterima ke MQ.
  • Mentransfer file dari MQ ke klien sebagai tanggapan.

Metode solusi


Karena ukuran besar dari file yang ditransmisikan, tidak mungkin untuk menempatkannya sepenuhnya dalam RAM, apalagi, ada juga pembatasan di sisi MQ - ukuran maksimum satu pesan di MQ tidak dapat melebihi 100 Mb. Dengan demikian, keputusan saya akan didasarkan pada prinsip-prinsip berikut:

  • Menerima file dan menyimpannya dalam antrian MQ harus dilakukan dalam mode streaming, tanpa menempatkan seluruh file dalam memori.
  • Dalam antrian, file MQ akan ditempatkan sebagai satu set pesan kecil.

Secara grafis, alokasi file di sisi klien, layanan REST dan MQ ditunjukkan di bawah ini:

gambar

Di sisi klien, file sepenuhnya terletak pada sistem file, dalam layanan REST, hanya sebagian file disimpan dalam RAM, dan di sisi MQ, setiap bagian dari file ditempatkan sebagai pesan terpisah.

Pengembangan layanan REST


Untuk kejelasan metode solusi yang diusulkan, layanan REST demo akan dikembangkan yang berisi dua metode:

  • unggah - menerima file dari klien dan menulisnya ke antrian MQ, mengembalikan pengidentifikasi grup pesan (dalam format base64) sebagai tanggapan.
  • unduh - menerima pengidentifikasi grup pesan (dalam format base64) dari klien dan mengembalikan file yang disimpan dalam antrian MQ.

Metode untuk menerima file dari klien (unggah)


Tugas metode ini adalah untuk mendapatkan aliran file yang masuk dan kemudian menulisnya ke antrian MQ.

Mengambil File Masuk


Untuk menerima file input dari klien, metode ini mengharapkan objek dengan antarmuka com.ibm.websphere.jaxrs20.multipart.IMultipartBody sebagai parameter input, yang menyediakan kemampuan untuk mendapatkan tautan ke aliran file yang masuk

@PUT @Path("upload") public Response upload(IMultipartBody body) { ... IAttachment attachment = body.getAttachment("file"); InputStream inputStream = attachment.getDataHandler().getInputStream(); ... } 

Antarmuka (IMultipartBody) ini terletak di com.ibm.websphere.appserver.api.jaxrs20_1.0.21.jar arsip JAR, disertakan dengan IBM Liberty Server dan terletak di folder: < WLP_INSTALLATION_PATH > / dev / api / ibm.

Catatan:

  • WLP_INSTALLATION_PATH - jalur ke direktori Profil WebSphere Liberty.
  • Diharapkan klien akan mentransfer file dalam parameter bernama "file".
  • Jika Anda menggunakan server aplikasi yang berbeda, Anda dapat menggunakan pustaka alternatif dari Apache CXF.

Streaming menyimpan file dalam MQ


Metode menerima aliran file input, nama antrian MQ di mana file harus ditulis, dan pengidentifikasi grup pesan yang akan digunakan untuk mengikat pesan. Pengenal grup dibuat di sisi layanan, misalnya, dengan utilitas org.apache.commons.lang3.RandomStringUtils:

 String groupId = RandomStringUtils.randomAscii(24); 

Algoritma untuk menyimpan file input dalam MQ terdiri dari langkah-langkah berikut:

  1. Inisialisasi objek koneksi MQ.
  2. Pembacaan siklus sebagian file yang masuk sampai file sepenuhnya dibaca:
    1. Sepotong data file dicatat sebagai pesan terpisah di MQ.
    2. Setiap pesan dalam file memiliki nomor seri sendiri (properti "JMSXGroupSeq").
    3. Semua pesan dalam file memiliki nilai grup yang sama (properti "JMSXGroupID").
    4. Pesan terakhir memiliki tanda yang menunjukkan bahwa pesan ini bersifat final (properti "JMS_IBM_Last_Msg_In_Group").
    5. Konstanta SEGMENT_SIZE berisi ukuran penyajian. Misalnya, 1 MB.

 public void write(InputStream inputStream, String queueName, String groupId) throws IOException, JMSException { try ( Connection connection = connectionFactory.createConnection(); Session session = connection.createSession(); MessageProducer producer = session.createProducer(session.createQueue(queueName)); ) { byte[] buffer = new byte[SEGMENT_SIZE]; BytesMessage message = null; for(int readBytesSize = 1, sequenceNumber = 1; readBytesSize > 0; sequenceNumber++) { readBytesSize = inputStream.read(buffer); if (message != null) { if (readBytesSize < 1) { message.setBooleanProperty("JMS_IBM_Last_Msg_In_Group", true); } producer.send(message); } if (readBytesSize > 0) { message = session.createBytesMessage(); message.setStringProperty("JMSXGroupID", groupId); message.setIntProperty("JMSXGroupSeq", sequenceNumber); if (readBytesSize == SEGMENT_SIZE) { message.writeBytes(buffer); } else { message.writeBytes(Arrays.copyOf(buffer, readBytesSize)); } } } } } 

Metode untuk mengirim file ke klien (unduh)


Metode ini memperoleh pengidentifikasi sekelompok pesan dalam format base64, dimana ia membaca pesan dari antrian MQ dan mengirimkannya sebagai respons dalam mode streaming.

Mendapatkan id grup pesan


Metode ini menerima pengidentifikasi grup pesan sebagai parameter input.

 @PUT @Path("download") public Response download(@QueryParam("groupId") String groupId) { ... } 

Streaming tanggapan ke klien


Untuk mentransfer file ke klien, disimpan sebagai satu set pesan terpisah di MQ, dalam mode streaming, buat kelas dengan antarmuka javax.ws.rs.core.StreamingOutput:

 public class MQStreamingOutput implements StreamingOutput { private String groupId; private String queueName; public MQStreamingOutput(String groupId, String queueName) { super(); this.groupId = groupId; this.queueName = queueName; } @Override public void write(OutputStream outputStream) throws IOException, WebApplicationException { try { new MQWorker().read(outputStream, queueName, groupId); } catch(NamingException | JMSException e) { e.printStackTrace(); new IOException(e); } finally { outputStream.flush(); outputStream.close(); } } } 

Di kelas, kami menerapkan metode menulis, yang menerima referensi input ke aliran keluar ke mana pesan dari MQ akan ditulis. Saya juga menambahkan nama antrian dan pengenal grup yang pesannya akan dibacakan ke kelas.

Objek kelas ini akan diteruskan sebagai parameter untuk membuat respons kepada klien:

 @GET @Path("download") public Response download(@QueryParam("groupId") String groupId) { ResponseBuilder responseBuilder = null; try { MQStreamingOutput streamingOutput = new MQStreamingOutput(new String(Utils.decodeBase64(groupId)), Utils.QUEUE_NAME); responseBuilder = Response.ok(streamingOutput); } catch(Exception e) { e.printStackTrace(); responseBuilder.status(Status.INTERNAL_SERVER_ERROR).entity(e.getMessage()); } return responseBuilder.build(); } 

Streaming membaca file dari MQ


Algoritma untuk membaca pesan dari MQ ke aliran keluar terdiri dari langkah-langkah berikut:

  1. Inisialisasi objek koneksi MQ.
  2. Pembacaan siklus pesan dari MQ hingga pesan dengan tanda pengakhiran dalam grup dibaca (properti "JMS_IBM_Last_Msg_In_Group"):
    1. Sebelum setiap pesan dibaca dari antrian, filter (messageSelector) diatur, di mana pengidentifikasi grup pesan dan nomor seri pesan dalam grup diatur.
    2. Isi dari pesan yang sudah dibaca ditulis ke aliran keluar.


 public void read(OutputStream outputStream, String queueName, String groupId) throws IOException, JMSException { try( Connection connection = connectionFactory.createConnection(); Session session = connection.createSession(); ) { connection.start(); Queue queue = session.createQueue(queueName); int sequenceNumber = 1; for(boolean isMessageExist = true; isMessageExist == true; ) { String messageSelector = "JMSXGroupID='" + groupId.replaceAll("'", "''") + "' AND JMSXGroupSeq=" + sequenceNumber++; try( MessageConsumer consumer = session.createConsumer(queue, messageSelector); ) { BytesMessage message = (BytesMessage) consumer.receiveNoWait(); if (message == null) { isMessageExist = false; } else { byte[] buffer = new byte[(int) message.getBodyLength()]; message.readBytes(buffer); outputStream.write(buffer); if (message.getBooleanProperty("JMS_IBM_Last_Msg_In_Group")) { isMessageExist = false; } } } } } } 

Panggilan layanan REST


Untuk menguji layanan, saya akan menggunakan alat keriting.

Unggah file


 curl -X PUT -F file=@<__> http://localhost:9080/Demo/rest/service/upload 

Respons akan berupa string base64 yang berisi pengidentifikasi grup pesan, yang akan kami tunjukkan pada metode selanjutnya untuk mendapatkan file.

Menerima file


 curl -X GET http://localhost:9080/Demo/rest/service/download?groupId=<base64____> -o <_____> 

Kesimpulan


Artikel tersebut memeriksa pendekatan untuk mengembangkan layanan REST yang memungkinkan Anda untuk melakukan streaming dan menerima data besar ke dalam antrian sistem pengiriman pesan, serta membacanya dari antrian untuk kembali sebagai respons. Metode ini mengurangi penggunaan sumber daya, dan dengan demikian meningkatkan throughput solusi.

Bahan tambahan


Informasi lebih lanjut tentang antarmuka IMultipartBody yang digunakan untuk menerima aliran file masuk adalah tautan .

Pustaka alternatif untuk menerima file dalam mode streaming dalam layanan REST adalah Apache CXF .

Antarmuka StreamingOutput untuk streaming tanggapan REST ke klien adalah tautan .

Source: https://habr.com/ru/post/id424941/


All Articles