Baca streaming dua kali

127

Bagaimana Anda membaca inputstream yang sama dua kali? Apakah mungkin untuk menyalinnya?

Saya perlu mendapatkan gambar dari web, menyimpannya secara lokal dan kemudian mengembalikan gambar yang disimpan. Saya hanya berpikir akan lebih cepat menggunakan aliran yang sama daripada memulai aliran baru ke konten yang diunduh dan kemudian membacanya lagi.

Warpzit
sumber
1
Mungkin menggunakan tandai dan setel ulang
Vyacheslav Shylkin

Jawaban:

114

Anda dapat menggunakan org.apache.commons.io.IOUtils.copyuntuk menyalin konten InputStream ke array byte, dan kemudian berulang kali membaca dari array byte menggunakan ByteArrayInputStream. Misalnya:

ByteArrayOutputStream baos = new ByteArrayOutputStream();
org.apache.commons.io.IOUtils.copy(in, baos);
byte[] bytes = baos.toByteArray();

// either
while (needToReadAgain) {
    ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
    yourReadMethodHere(bais);
}

// or
ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
while (needToReadAgain) {
    bais.reset();
    yourReadMethodHere(bais);
}
Paul Grime
sumber
1
Saya rasa ini adalah satu-satunya solusi yang valid karena mark tidak didukung untuk semua jenis.
Warpzit
3
@Paul Grime: IOUtils.toByeArray secara internal memanggil metode salin dari dalam juga.
Ankit
4
Seperti yang dikatakan @Ankit, solusi ini tidak valid untuk saya, karena input dibaca secara internal dan tidak dapat digunakan kembali.
Xtreme Biker
30
Saya tahu komentar ini kedaluwarsa, tetapi, di sini, di opsi pertama, jika Anda membaca inputstream sebagai array byte, bukankah itu berarti Anda memuat semua data ke memori? yang bisa menjadi masalah besar jika Anda memuat sesuatu seperti file besar?
jaxkodex
2
Seseorang bisa menggunakan IOUtils.toByteArray (InputStream) untuk mendapatkan array byte dalam satu panggilan.
berguna
30

Bergantung dari mana InputStream berasal, Anda mungkin tidak dapat menyetel ulang. Anda dapat memeriksa apakah mark()dan reset()didukung menggunakan markSupported().

Jika ya, Anda dapat memanggil reset()InputStream untuk kembali ke awal. Jika tidak, Anda perlu membaca InputStream dari sumbernya lagi.

Kevin Parker
sumber
1
InputStream tidak mendukung 'mark' - Anda dapat memanggil mark pada IS tetapi tidak melakukan apa pun. Demikian juga, memanggil reset pada IS akan memunculkan pengecualian.
ayahuasca
4
@ayahuasca InputStreamsubsclasses like BufferedInputStreamdoes support 'mark'
Dmitry Bogdanovich
10

jika InputStreamdukungan Anda menggunakan mark, maka Anda dapat mark()inputStream dan kemudian reset(). jika Anda InputStremtidak mendukung tanda maka Anda dapat menggunakan kelas java.io.BufferedInputStream, sehingga Anda dapat menyematkan aliran Anda di dalam BufferedInputStreamseperti ini

    InputStream bufferdInputStream = new BufferedInputStream(yourInputStream);
    bufferdInputStream.mark(some_value);
    //read your bufferdInputStream 
    bufferdInputStream.reset();
    //read it again
wannas
sumber
1
Aliran input yang di-buffer hanya dapat menandai kembali ke ukuran buffer, jadi jika sumbernya tidak sesuai, Anda tidak bisa kembali ke awal.
L. Blanc
@ L.Blanc maaf, tapi sepertinya itu tidak benar. Perhatikan BufferedInputStream.fill(), ada bagian "buffer tumbuh", di mana ukuran buffer baru dibandingkan hanya dengan marklimitdan MAX_BUFFER_SIZE.
eugene82
8

Anda dapat menggabungkan aliran input dengan PushbackInputStream. PushbackInputStream memungkinkan untuk unread (" write back ") byte yang sudah dibaca, jadi Anda bisa melakukan seperti ini:

public class StreamTest {
  public static void main(String[] args) throws IOException {
    byte[] bytes = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9 };

    InputStream originalStream = new ByteArrayInputStream(bytes);

    byte[] readBytes = getBytes(originalStream, 3);
    printBytes(readBytes); // prints: 1 2 3

    readBytes = getBytes(originalStream, 3);
    printBytes(readBytes); // prints: 4 5 6

    // now let's wrap it with PushBackInputStream

    originalStream = new ByteArrayInputStream(bytes);

    InputStream wrappedStream = new PushbackInputStream(originalStream, 10); // 10 means that maximnum 10 characters can be "written back" to the stream

    readBytes = getBytes(wrappedStream, 3);
    printBytes(readBytes); // prints 1 2 3

    ((PushbackInputStream) wrappedStream).unread(readBytes, 0, readBytes.length);

    readBytes = getBytes(wrappedStream, 3);
    printBytes(readBytes); // prints 1 2 3


  }

  private static byte[] getBytes(InputStream is, int howManyBytes) throws IOException {
    System.out.print("Reading stream: ");

    byte[] buf = new byte[howManyBytes];

    int next = 0;
    for (int i = 0; i < howManyBytes; i++) {
      next = is.read();
      if (next > 0) {
        buf[i] = (byte) next;
      }
    }
    return buf;
  }

  private static void printBytes(byte[] buffer) throws IOException {
    System.out.print("Reading stream: ");

    for (int i = 0; i < buffer.length; i++) {
      System.out.print(buffer[i] + " ");
    }
    System.out.println();
  }


}

Harap dicatat bahwa PushbackInputStream menyimpan buffer internal byte sehingga benar-benar membuat buffer di memori yang menyimpan byte "ditulis kembali".

Mengetahui pendekatan ini kita bisa melangkah lebih jauh dan menggabungkannya dengan FilterInputStream. FilterInputStream menyimpan aliran input asli sebagai delegasi. Hal ini memungkinkan untuk membuat definisi kelas baru yang memungkinkan data asli " belum dibaca " secara otomatis. Definisi kelas ini adalah sebagai berikut:

public class TryReadInputStream extends FilterInputStream {
  private final int maxPushbackBufferSize;

  /**
  * Creates a <code>FilterInputStream</code>
  * by assigning the  argument <code>in</code>
  * to the field <code>this.in</code> so as
  * to remember it for later use.
  *
  * @param in the underlying input stream, or <code>null</code> if
  *           this instance is to be created without an underlying stream.
  */
  public TryReadInputStream(InputStream in, int maxPushbackBufferSize) {
    super(new PushbackInputStream(in, maxPushbackBufferSize));
    this.maxPushbackBufferSize = maxPushbackBufferSize;
  }

  /**
   * Reads from input stream the <code>length</code> of bytes to given buffer. The read bytes are still avilable
   * in the stream
   *
   * @param buffer the destination buffer to which read the data
   * @param offset  the start offset in the destination <code>buffer</code>
   * @aram length how many bytes to read from the stream to buff. Length needs to be less than
   *        <code>maxPushbackBufferSize</code> or IOException will be thrown
   *
   * @return number of bytes read
   * @throws java.io.IOException in case length is
   */
  public int tryRead(byte[] buffer, int offset, int length) throws IOException {
    validateMaxLength(length);

    // NOTE: below reading byte by byte instead of "int bytesRead = is.read(firstBytes, 0, maxBytesOfResponseToLog);"
    // because read() guarantees to read a byte

    int bytesRead = 0;

    int nextByte = 0;

    for (int i = 0; (i < length) && (nextByte >= 0); i++) {
      nextByte = read();
      if (nextByte >= 0) {
        buffer[offset + bytesRead++] = (byte) nextByte;
      }
    }

    if (bytesRead > 0) {
      ((PushbackInputStream) in).unread(buffer, offset, bytesRead);
    }

    return bytesRead;

  }

  public byte[] tryRead(int maxBytesToRead) throws IOException {
    validateMaxLength(maxBytesToRead);

    ByteArrayOutputStream baos = new ByteArrayOutputStream(); // as ByteArrayOutputStream to dynamically allocate internal bytes array instead of allocating possibly large buffer (if maxBytesToRead is large)

    // NOTE: below reading byte by byte instead of "int bytesRead = is.read(firstBytes, 0, maxBytesOfResponseToLog);"
    // because read() guarantees to read a byte

    int nextByte = 0;

    for (int i = 0; (i < maxBytesToRead) && (nextByte >= 0); i++) {
      nextByte = read();
      if (nextByte >= 0) {
        baos.write((byte) nextByte);
      }
    }

    byte[] buffer = baos.toByteArray();

    if (buffer.length > 0) {
      ((PushbackInputStream) in).unread(buffer, 0, buffer.length);
    }

    return buffer;

  }

  private void validateMaxLength(int length) throws IOException {
    if (length > maxPushbackBufferSize) {
      throw new IOException(
        "Trying to read more bytes than maxBytesToRead. Max bytes: " + maxPushbackBufferSize + ". Trying to read: " +
        length);
    }
  }

}

Kelas ini memiliki dua metode. Satu untuk membaca buffer yang ada (definisi analog dengan panggilan public int read(byte b[], int off, int len)kelas InputStream). Kedua yang mengembalikan buffer baru (ini mungkin lebih efektif jika ukuran buffer untuk dibaca tidak diketahui).

Sekarang mari kita lihat kelas kita beraksi:

public class StreamTest2 {
  public static void main(String[] args) throws IOException {
    byte[] bytes = new byte[] { 1, 2, 3, 4, 5, 6, 7, 8, 9 };

    InputStream originalStream = new ByteArrayInputStream(bytes);

    byte[] readBytes = getBytes(originalStream, 3);
    printBytes(readBytes); // prints: 1 2 3

    readBytes = getBytes(originalStream, 3);
    printBytes(readBytes); // prints: 4 5 6

    // now let's use our TryReadInputStream

    originalStream = new ByteArrayInputStream(bytes);

    InputStream wrappedStream = new TryReadInputStream(originalStream, 10);

    readBytes = ((TryReadInputStream) wrappedStream).tryRead(3); // NOTE: no manual call to "unread"(!) because TryReadInputStream handles this internally
    printBytes(readBytes); // prints 1 2 3

    readBytes = ((TryReadInputStream) wrappedStream).tryRead(3); 
    printBytes(readBytes); // prints 1 2 3

    readBytes = ((TryReadInputStream) wrappedStream).tryRead(3);
    printBytes(readBytes); // prints 1 2 3

    // we can also call normal read which will actually read the bytes without "writing them back"
    readBytes = getBytes(wrappedStream, 3);
    printBytes(readBytes); // prints 1 2 3

    readBytes = getBytes(wrappedStream, 3);
    printBytes(readBytes); // prints 4 5 6

    readBytes = ((TryReadInputStream) wrappedStream).tryRead(3); // now we can try read next bytes
    printBytes(readBytes); // prints 7 8 9

    readBytes = ((TryReadInputStream) wrappedStream).tryRead(3); 
    printBytes(readBytes); // prints 7 8 9


  }



}
walkeros
sumber
5

Jika Anda menggunakan implementasi InputStream, Anda dapat memeriksa hasil InputStream#markSupported()yang memberi tahu Anda apakah Anda dapat menggunakan metode mark()/ reset().

Jika Anda dapat menandai aliran saat Anda membaca, maka panggil reset()untuk kembali untuk memulai.

Jika tidak bisa, Anda harus membuka aliran lagi.

Solusi lain adalah mengonversi InputStream ke array byte, lalu mengulanginya selama yang Anda butuhkan. Anda dapat menemukan beberapa solusi dalam posting ini Konversikan InputStream ke array byte di Java menggunakan libs pihak ketiga atau tidak. Perhatian, jika konten yang dibaca terlalu besar Anda mungkin mengalami beberapa masalah memori.

Terakhir, jika kebutuhan Anda adalah membaca gambar, gunakan:

BufferedImage image = ImageIO.read(new URL("http://www.example.com/images/toto.jpg"));

Menggunakan ImageIO#read(java.net.URL)juga memungkinkan Anda menggunakan cache.

alain.janinm
sumber
1
sebuah kata peringatan saat menggunakan ImageIO#read(java.net.URL): beberapa server web dan CDN mungkin menolak panggilan kosong (yaitu tanpa Agen Pengguna yang membuat server percaya bahwa panggilan tersebut berasal dari browser web) yang dibuat oleh ImageIO#read. Dalam hal ini, menggunakan URLConnection.openConnection()pengaturan agen pengguna ke koneksi itu + menggunakan `ImageIO.read (InputStream) akan, sebagian besar, melakukan trik.
Clint Eastwood
InputStreambukan antarmuka
Brice
3

Bagaimana tentang:

if (stream.markSupported() == false) {

        // lets replace the stream object
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        IOUtils.copy(stream, baos);
        stream.close();
        stream = new ByteArrayInputStream(baos.toByteArray());
        // now the stream should support 'mark' and 'reset'

    }
Chatterjee Anshuman
sumber
5
Itu ide yang buruk. Anda memasukkan seluruh konten aliran ke dalam memori seperti itu.
Niels Doucet
3

Untuk memisahkan menjadi InputStreamdua, sambil menghindari memuat semua data dalam memori , lalu memprosesnya secara mandiri:

  1. Buat beberapa OutputStream, tepatnya:PipedOutputStream
  2. Hubungkan setiap PipedOutputStream dengan PipedInputStream, ini PipedInputStreamadalah hasil InputStream.
  3. Hubungkan sumber InputStream dengan yang baru saja dibuat OutputStream. Jadi, semua yang dibaca dari sourcing InputStream, akan ditulis di keduanya OutputStream. Tidak perlu mengimplementasikannya, karena sudah dilakukan di TeeInputStream(commons.io).
  4. Dalam utas terpisah, baca seluruh sumber inputStream, dan secara implisit data input ditransfer ke inputStreams target.

    public static final List<InputStream> splitInputStream(InputStream input) 
        throws IOException 
    { 
        Objects.requireNonNull(input);      
    
        PipedOutputStream pipedOut01 = new PipedOutputStream();
        PipedOutputStream pipedOut02 = new PipedOutputStream();
    
        List<InputStream> inputStreamList = new ArrayList<>();
        inputStreamList.add(new PipedInputStream(pipedOut01));
        inputStreamList.add(new PipedInputStream(pipedOut02));
    
        TeeOutputStream tout = new TeeOutputStream(pipedOut01, pipedOut02);
    
        TeeInputStream tin = new TeeInputStream(input, tout, true);
    
        Executors.newSingleThreadExecutor().submit(tin::readAllBytes);  
    
        return Collections.unmodifiableList(inputStreamList);
    }

Berhati-hatilah untuk menutup inputStreams setelah dikonsumsi, dan tutup utas yang menjalankan: TeeInputStream.readAllBytes()

Dalam kasus ini, Anda perlu membaginya menjadi beberapaInputStream , bukan hanya dua. Ganti dalam fragmen kode sebelumnya kelas TeeOutputStreamuntuk implementasi Anda sendiri, yang akan merangkum List<OutputStream>dan mengganti OutputStreamantarmuka:

public final class TeeListOutputStream extends OutputStream {
    private final List<? extends OutputStream> branchList;

    public TeeListOutputStream(final List<? extends OutputStream> branchList) {
        Objects.requireNonNull(branchList);
        this.branchList = branchList;
    }

    @Override
    public synchronized void write(final int b) throws IOException {
        for (OutputStream branch : branchList) {
            branch.write(b);
        }
    }

    @Override
    public void flush() throws IOException {
        for (OutputStream branch : branchList) {
            branch.flush();
        }
    }

    @Override
    public void close() throws IOException {
        for (OutputStream branch : branchList) {
            branch.close();
        }
    }
}
zeugor
sumber
Tolong, bisakah Anda menjelaskan sedikit lagi langkah 4? Mengapa kita harus memicu pembacaan secara manual? Mengapa pembacaan pipedInputStream TIDAK memicu pembacaan inputStream sumber? Dan mengapa kami melakukan panggilan itu secara asinkron?
Дмитрий Кулешов
2

Ubah inputstream menjadi byte lalu teruskan ke fungsi savefile tempat Anda merakitnya menjadi inputstream. Juga dalam fungsi aslinya gunakan byte untuk digunakan untuk tugas lain

Maneesh
sumber
5
Saya mengatakan ide buruk yang satu ini, array yang dihasilkan bisa sangat besar dan akan merampok memori perangkat.
Kevin Parker
0

Jika ada yang menjalankan aplikasi Spring Boot, dan Anda ingin membaca isi respons a RestTemplate(itulah mengapa saya ingin membaca streaming dua kali), ada cara yang bersih (er) untuk melakukan ini.

Pertama-tama, Anda perlu menggunakan Spring StreamUtilsuntuk menyalin aliran ke String:

String text = StreamUtils.copyToString(response.getBody(), Charset.defaultCharset()))

Tapi itu belum semuanya. Anda juga perlu menggunakan pabrik permintaan yang dapat menyangga streaming untuk Anda, seperti:

ClientHttpRequestFactory factory = new BufferingClientHttpRequestFactory(new SimpleClientHttpRequestFactory());
RestTemplate restTemplate = new RestTemplate(factory);

Atau, jika Anda menggunakan factory bean, maka (ini adalah Kotlin tapi bagaimanapun):

@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
fun createRestTemplate(): RestTemplate = RestTemplateBuilder()
  .requestFactory { BufferingClientHttpRequestFactory(SimpleClientHttpRequestFactory()) }
  .additionalInterceptors(loggingInterceptor)
  .build()

Sumber: https://objectpartners.com/2018/03/01/log-your-resttemplate-request-and-response-without-destroying-the-body/

milosmns
sumber
0

Jika Anda menggunakan RestTemplate untuk melakukan panggilan http Cukup tambahkan pencegat. Badan respons di-cache dengan implementasi ClientHttpResponse. Sekarang inputstream dapat diambil dari respose sebanyak yang kita butuhkan

ClientHttpRequestInterceptor interceptor =  new ClientHttpRequestInterceptor() {

            @Override
            public ClientHttpResponse intercept(HttpRequest request, byte[] body,
                    ClientHttpRequestExecution execution) throws IOException {
                ClientHttpResponse  response = execution.execute(request, body);

                  // additional work before returning response
                  return response 
            }
        };

    // Add the interceptor to RestTemplate Instance 

         restTemplate.getInterceptors().add(interceptor); 
Noman Khan
sumber