Preface

PipedInputStream and PipedOutputStream are designed to solve cross-thread byte data transfers. They always come in pairs and can only be used on two different threads, using piped input and output streams in one thread can cause deadlocks. In some business scenarios, using pipeline streams will increase the speed of file uploads and reduce CPU and IO overhead, this is very practical, so this article talks about Practical application scenarios for pipeline stream.

Principle Introduction

We know that output streams write data and input streams read data, and PipedInputStream and PipedOutputStream are the same. sink property inside the PipedOutputStream to receive byte data written by the PipedOutputStream. Inside PipedInputStream, a byte array buffer of default 1024 size is defined as a buffer for data transfer. This way, PipedOutputStream writes data to the buffer, and when the buffer is full, it uses notifyAll() to wake up the thread reading the data, and then blocks for 1s before continuing to try to write the data. When the buffer is empty, notifyAll() wakes up the thread that wrote the data and then blocks for 1s before continuing to try to read the data. At this point, reading data from the buffer will return -1, marking the end of the stream when the data has been read.

Usage scenario overview

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
    public static void main(String[] args) {

        try (PipedOutputStream out = new PipedOutputStream();
             PipedInputStream in = new PipedInputStream(out)) {
            new Thread(() -> {
                try {
                    out.write("hello javaisland".getBytes());
                    out.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }

            }).start();
            int receive;
            while ((receive = in.read()) ! = -1) {
                System.err.print((char) receive);
            }

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

The above code demonstrates the scenario of writing data in one thread and then reading it in the main thread, completing the cross-thread data transfer.

Practical application

After a brief understanding of the principle, you may have a question, what is the use of transferring bytes from one thread to another? A brief summary: java applications generate files, and then need to upload to the cloud scenario, you can use pipeline flow. the same business scenario, before understanding the pipeline flow, are first written to the local disk, and then read from the file disk to upload to the cloud. Once you are familiar with the technology, you can omit the step of writing files to local disks to improve efficiency and save CPU and IO overhead.

Case one: EXCEL file export function

For example, there is a file export function, because the exported file is relatively large, the export of the download finished a very long time, so in the case of not using the pipeline flow, will be designed in this way, the page click on the export, the background triggers the export task, and then the data in the database according to the export conditions query out to generate Excel files, and then upload the file to the cloud, and finally generate a link to download the file. The previous approach, as mentioned above, was to first write the file locally, then read it out of the local directory and upload it to the cloud. The following demonstrates a one-step approach to pipeline flow.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
  public static void main(String[] args) {

        try (PipedOutputStream out = new PipedOutputStream();
             PipedInputStream in = new PipedInputStream(out)) {
            new Thread(() -> {
                Listdatabase = new LinkedList<>();
                try {
                    // File generation
                    ExcelUtils.getInstance().exportObjects2Excel(database,out);
                } catch (IOException e) {
                    e.printStackTrace();
                }

            }).start();
            // cloud upload
            ossClient.putObject("test", "test.xlsx",in);

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

Case 2: XML file data transfer This type of requirement is commonly used when interfacing with banks and financial institutions, requiring some xml format data to be uploaded to a specified ftp, or a directory in the cloud for data reconciliation. In fact, from the file upload scenario, and the above case is the same, in memory to generate the file, and then upload to the cloud, the pseudo code is as follows.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
    public static void main(String[] args) {

        try (PipedOutputStream out = new PipedOutputStream();
             PipedInputStream in = new PipedInputStream(out)) {
            new Thread(() -> {
                Listdatabase = new LinkedList<>();
                try(GZIPOutputStream gzipOut = new GZIPOutputStream(out)) {
                    Marshaller marshaller = JAXBContext.newInstance(Object.class).createMarshaller();
                    marshaller.setProperty(marshaller.JAXB_FORMATTED_OUTPUT, Boolean.TRUE);
                    marshaller.marshal(database,gzipOut);
                } catch (IOException | JAXBException e) {
                    e.printStackTrace();
                }

            }).start();
            // cloud upload
            ossClient.putObject("test", "test.xml.gz",in);

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

Case 2 has an extra step of compressing the file, and the final uploaded file is a gzip archive with an xml file inside. This way you can greatly reduce the size of the file and increase the speed of the upload.

Conclusion

PipedInputStream and PipedOutputStream are designed to solve cross-thread byte data transfer. When you need to generate files in memory and upload them to the cloud, remember to use piped streams to improve efficiency and reduce CPU and IO overhead.