Deserializing daftar secara asinkron menggunakan System.Text.Json

11

Katakanlah saya meminta file json besar yang berisi daftar banyak objek. Saya tidak ingin mereka berada di memori sekaligus, tetapi saya lebih suka membaca dan memprosesnya satu per satu. Jadi saya perlu mengubah System.IO.Streamaliran async menjadi IAsyncEnumerable<T>. Bagaimana cara menggunakan System.Text.JsonAPI baru untuk melakukan ini?

private async IAsyncEnumerable<T> GetList<T>(Uri url, CancellationToken cancellationToken = default)
{
    using (var httpResponse = await httpClient.GetAsync(url, cancellationToken))
    {
        using (var stream = await httpResponse.Content.ReadAsStreamAsync())
        {
            // Probably do something with JsonSerializer.DeserializeAsync here without serializing the entire thing in one go
        }
    }
}
Rick de Water
sumber
1
Anda mungkin perlu sesuatu seperti metode DeserializeAsync
Pavel Anikhouski
2
Maaf, tampaknya metode di atas memuat seluruh aliran ke dalam memori. Anda dapat membaca data dengan potongan menggunakan asynchonously Utf8JsonReader, silakan lihat beberapa sampel github dan di thread yang ada juga
Pavel Anikhouski
GetAsyncdengan sendirinya kembali ketika seluruh respons diterima. Anda perlu menggunakan SendAsync`HttpCompletionOption.ResponseContentRead` sebagai gantinya. Setelah Anda memilikinya, Anda dapat menggunakan JsonTextReader JSON.NET . Menggunakan System.Text.Jsonuntuk ini tidak semudah yang ditunjukkan masalah ini . Fungsionalitas tidak tersedia dan menerapkannya dalam alokasi rendah menggunakan struct tidak sepele
Panagiotis Kanavos
Masalah dengan deserialisasi dalam potongan adalah bahwa Anda harus tahu kapan Anda memiliki potongan lengkap untuk deserialize. Ini akan sulit untuk diselesaikan secara bersih untuk kasus-kasus umum. Ini akan membutuhkan penguraian sebelumnya, yang bisa jadi merupakan tradeoff yang buruk dalam hal kinerja. Agak sulit untuk menggeneralisasi. Tetapi jika Anda menerapkan pembatasan Anda sendiri pada JSON Anda, katakan "satu objek menempati tepat 20 baris dalam file", maka Anda pada dasarnya dapat deserialize secara asinkron dengan membaca file dalam potongan async. Anda akan membutuhkan json besar untuk melihat manfaat di sini, saya akan bayangkan.
DetectivePikachu
Sepertinya seseorang sudah menjawab pertanyaan serupa di sini dengan kode lengkap.
Panagiotis Kanavos

Jawaban:

4

Ya, serializer JSON (de) yang benar-benar streaming akan menjadi peningkatan kinerja yang bagus, di banyak tempat.

Sayangnya, System.Text.Jsontidak melakukan ini saat ini. Saya tidak yakin apakah itu akan terjadi di masa depan - saya harap begitu! Deserialisasi JSON yang benar-benar mengalir ternyata agak menantang.

Anda dapat memeriksa apakah Utf8Json yang sangat cepat mendukungnya, mungkin.

Namun, mungkin ada solusi khusus untuk situasi spesifik Anda, karena persyaratan Anda tampaknya membatasi kesulitan.

Idenya adalah untuk secara manual membaca satu item dari array pada suatu waktu. Kami memanfaatkan fakta bahwa setiap item dalam daftar, dengan sendirinya, adalah objek JSON yang valid.

Anda dapat melewati secara manual melewati [(untuk item pertama) atau ,(untuk setiap item berikutnya). Maka saya pikir taruhan terbaik Anda adalah menggunakan .NET Core Utf8JsonReaderuntuk menentukan di mana objek saat ini berakhir, dan memberi makan byte yang dipindai JsonDeserializer.

Dengan cara ini, Anda hanya melakukan sedikit buffering pada satu objek pada suatu waktu.

Dan karena kita berbicara kinerja, Anda bisa mendapatkan input dari PipeReader, saat Anda melakukannya. :-)

Timo
sumber
Ini bukan tentang kinerja sama sekali. Ini bukan tentang deserialization async, yang sudah dilakukannya. Ini tentang akses streaming - memproses elemen JSON karena diurai dari aliran, seperti yang dilakukan JsonTextReader JSON.NET.
Panagiotis Kanavos
Kelas yang relevan di Utf8Json adalah JsonReader dan seperti yang dikatakan penulis, ini aneh. JSON.NET's JsonTextReader dan System.Text.Json's Utf8JsonReader berbagi keanehan yang sama - Anda harus mengulang dan memeriksa jenis elemen saat ini saat Anda pergi.
Panagiotis Kanavos
@PanagiotisKanavos Ah, ya, streaming. Itu kata yang saya cari! Saya memperbarui kata "asinkron" ke "streaming". Saya percaya alasan ingin streaming membatasi penggunaan memori, yang merupakan masalah kinerja. Mungkin OP bisa mengkonfirmasi.
Timo
Performa bukan berarti kecepatan. Tidak peduli seberapa cepat deserializer itu, jika Anda harus memproses item 1M, Anda tidak ingin menyimpannya dalam RAM, atau menunggu semuanya untuk di-deserialisasi sebelum Anda dapat memproses yang pertama.
Panagiotis Kanavos
Semantik, temanku! Aku senang kita berusaha mencapai hal yang sama.
Timo
4

TL; DR Ini bukan hal sepele


Sepertinya seseorang telah memposting kode lengkap untuk Utf8JsonStreamReaderstruct yang membaca buffer dari aliran dan memberi mereka ke Utf8JsonRreader, yang memungkinkan deserialisasi dengan mudah JsonSerializer.Deserialize<T>(ref newJsonReader, options);. Kode juga tidak sepele. Pertanyaan terkait ada di sini dan jawabannya ada di sini .

Itu tidak cukup - HttpClient.GetAsyncakan kembali hanya setelah seluruh respons diterima, pada dasarnya buffering semua dalam memori.

Untuk menghindari ini, HttpClient.GetAsync (string, HttpCompletionOption) harus digunakan dengan HttpCompletionOption.ResponseHeadersRead.

Loop deserialisasi juga harus memeriksa token pembatalan, dan apakah keluar atau melempar jika diberi sinyal. Kalau tidak, perulangan akan berlangsung sampai seluruh aliran diterima dan diproses.

Kode ini didasarkan pada contoh jawaban terkait dan menggunakan HttpCompletionOption.ResponseHeadersReaddan memeriksa token pembatalan. Itu dapat mengurai string JSON yang berisi array item yang tepat, misalnya:

[{"prop1":123},{"prop1":234}]

Panggilan pertama untuk jsonStreamReader.Read()bergerak ke awal array sedangkan yang kedua bergerak ke awal objek pertama. Loop itu sendiri berakhir ketika ujung array ( ]) terdeteksi.

private async IAsyncEnumerable<T> GetList<T>(Uri url, CancellationToken cancellationToken = default)
{
    //Don't cache the entire response
    using var httpResponse = await httpClient.GetAsync(url,                               
                                                       HttpCompletionOption.ResponseHeadersRead,  
                                                       cancellationToken);
    using var stream = await httpResponse.Content.ReadAsStreamAsync();
    using var jsonStreamReader = new Utf8JsonStreamReader(stream, 32 * 1024);

    jsonStreamReader.Read(); // move to array start
    jsonStreamReader.Read(); // move to start of the object

    while (jsonStreamReader.TokenType != JsonTokenType.EndArray)
    {
        //Gracefully return if cancellation is requested.
        //Could be cancellationToken.ThrowIfCancellationRequested()
        if(cancellationToken.IsCancellationRequested)
        {
            return;
        }

        // deserialize object
        var obj = jsonStreamReader.Deserialize<T>();
        yield return obj;

        // JsonSerializer.Deserialize ends on last token of the object parsed,
        // move to the first token of next object
        jsonStreamReader.Read();
    }
}

Fragmen JSON, alias streaming JSON alias ... *

Ini cukup umum dalam skenario streaming atau pencatatan skenario untuk menambahkan objek JSON individual ke file, satu elemen per baris misalnya:

{"eventId":1}
{"eventId":2}
...
{"eventId":1234567}

Ini bukan dokumen JSON yang valid tetapi masing-masing fragmen valid. Ini memiliki beberapa keuntungan untuk data besar / skenario yang sangat bersamaan. Menambahkan acara baru hanya membutuhkan menambahkan baris baru ke file, bukan mem-parsing dan membangun kembali seluruh file. Pemrosesan , khususnya pemrosesan paralel lebih mudah karena dua alasan:

  • Elemen individual dapat diambil satu per satu, cukup dengan membaca satu baris dari aliran.
  • File input dapat dengan mudah dipartisi dan dibagi melintasi batas garis, memberi makan setiap bagian ke proses pekerja yang terpisah, misalnya dalam cluster Hadoop, atau hanya utas yang berbeda dalam aplikasi: Hitung titik perpecahan misalnya dengan membagi panjang dengan jumlah pekerja , lalu cari baris baru pertama. Memberi makan semuanya sampai titik itu ke pekerja yang terpisah.

Menggunakan StreamReader

Cara alokasikan-y untuk melakukan ini adalah dengan menggunakan TextReader, membaca satu baris sekaligus dan menguraikannya dengan JsonSerializer.Deserialize :

using var reader=new StreamReader(stream);
string line;
//ReadLineAsync() doesn't accept a CancellationToken 
while((line=await reader.ReadLineAsync()) != null)
{
    var item=JsonSerializer.Deserialize<T>(line);
    yield return item;

    if(cancellationToken.IsCancellationRequested)
    {
        return;
    }
}

Itu jauh lebih sederhana daripada kode yang deserializes array yang tepat. Ada dua masalah:

  • ReadLineAsync tidak menerima token pembatalan
  • Setiap iterasi mengalokasikan string baru, salah satu hal yang ingin kami hindari dengan menggunakan System.Text.Json

Ini mungkin cukup meskipun mencoba untuk menghasilkan ReadOnlySpan<Byte>buffer yang dibutuhkan oleh JsonSerializer. Pergeseran tidak sepele.

Pipelines dan SequenceReader

Untuk menghindari semua lokasi, kita perlu mendapat ReadOnlySpan<byte>dari stream. Untuk melakukan ini, Anda perlu menggunakan pipa System.IO.Pipeline dan struct SequenceReader . Pengantar SequenceReader karya Steve Gordon menjelaskan bagaimana kelas ini dapat digunakan untuk membaca data dari aliran menggunakan pembatas.

Sayangnya, SequenceReaderini adalah struct ref yang artinya tidak dapat digunakan dalam metode async atau lokal. Karena itulah Steve Gordon dalam artikelnya menciptakan a

private static SequencePosition ReadItems(in ReadOnlySequence<byte> sequence, bool isCompleted)

metode untuk membaca item membentuk ReadOnlySequence dan mengembalikan posisi akhir, sehingga PipeReader dapat melanjutkan darinya. Sayangnya kami ingin mengembalikan IEnumerable atau IAsyncEnumerable, dan metode iterator tidak suka inatau outparameter baik.

Kami dapat mengumpulkan barang-barang yang telah di-deserialisasi dalam Daftar atau Antrian dan mengembalikannya sebagai hasil tunggal, tetapi itu masih akan mengalokasikan daftar, buffer, atau simpul dan harus menunggu semua item dalam buffer untuk di-deserialisasi sebelum kembali:

private static (SequencePosition,List<T>) ReadItems(in ReadOnlySequence<byte> sequence, bool isCompleted)

Kita membutuhkan sesuatu yang bertindak seperti enumerable tanpa memerlukan metode iterator, bekerja dengan async dan tidak buffering semuanya jalan.

Menambahkan Saluran untuk menghasilkan IAsyncEnumerable

ChannelReader.ReadAllAsync mengembalikan IAsyncEnumerable. Kami dapat mengembalikan ChannelReader dari metode yang tidak bisa berfungsi sebagai iterator dan masih menghasilkan aliran elemen tanpa caching.

Mengadaptasi kode Steve Gordon untuk menggunakan saluran, kami mendapatkan ReadItems (ChannelWriter ...) dan ReadLastItemmetode. Yang pertama, membaca satu item pada satu waktu, hingga menggunakan baris baru ReadOnlySpan<byte> itemBytes. Ini dapat digunakan oleh JsonSerializer.Deserialize. Jika ReadItemstidak dapat menemukan pembatas, ia mengembalikan posisinya sehingga PipelineReader dapat menarik potongan berikutnya dari aliran.

Ketika kita mencapai potongan terakhir dan tidak ada pembatas lain, ReadLastItem` membaca byte yang tersisa dan membatalkan deserialisasi.

Kode ini hampir identik dengan Steve Gordon. Alih-alih menulis ke Konsol, kami menulis ke ChannelWriter.

private const byte NL=(byte)'\n';
private const int MaxStackLength = 128;

private static SequencePosition ReadItems<T>(ChannelWriter<T> writer, in ReadOnlySequence<byte> sequence, 
                          bool isCompleted, CancellationToken token)
{
    var reader = new SequenceReader<byte>(sequence);

    while (!reader.End && !token.IsCancellationRequested) // loop until we've read the entire sequence
    {
        if (reader.TryReadTo(out ReadOnlySpan<byte> itemBytes, NL, advancePastDelimiter: true)) // we have an item to handle
        {
            var item=JsonSerializer.Deserialize<T>(itemBytes);
            writer.TryWrite(item);            
        }
        else if (isCompleted) // read last item which has no final delimiter
        {
            var item = ReadLastItem<T>(sequence.Slice(reader.Position));
            writer.TryWrite(item);
            reader.Advance(sequence.Length); // advance reader to the end
        }
        else // no more items in this sequence
        {
            break;
        }
    }

    return reader.Position;
}

private static T ReadLastItem<T>(in ReadOnlySequence<byte> sequence)
{
    var length = (int)sequence.Length;

    if (length < MaxStackLength) // if the item is small enough we'll stack allocate the buffer
    {
        Span<byte> byteBuffer = stackalloc byte[length];
        sequence.CopyTo(byteBuffer);
        var item=JsonSerializer.Deserialize<T>(byteBuffer);
        return item;        
    }
    else // otherwise we'll rent an array to use as the buffer
    {
        var byteBuffer = ArrayPool<byte>.Shared.Rent(length);

        try
        {
            sequence.CopyTo(byteBuffer);
            var item=JsonSerializer.Deserialize<T>(byteBuffer);
            return item;
        }
        finally
        {
            ArrayPool<byte>.Shared.Return(byteBuffer);
        }

    }    
}

The DeserializeToChannel<T>Metode menciptakan pembaca Pipeline di atas sungai, menciptakan saluran dan mulai tugas pekerja yang mem-parsing potongan dan dorongan mereka untuk saluran:

ChannelReader<T> DeserializeToChannel<T>(Stream stream, CancellationToken token)
{
    var pipeReader = PipeReader.Create(stream);    
    var channel=Channel.CreateUnbounded<T>();
    var writer=channel.Writer;
    _ = Task.Run(async ()=>{
        while (!token.IsCancellationRequested)
        {
            var result = await pipeReader.ReadAsync(token); // read from the pipe

            var buffer = result.Buffer;

            var position = ReadItems(writer,buffer, result.IsCompleted,token); // read complete items from the current buffer

            if (result.IsCompleted) 
                break; // exit if we've read everything from the pipe

            pipeReader.AdvanceTo(position, buffer.End); //advance our position in the pipe
        }

        pipeReader.Complete(); 
    },token)
    .ContinueWith(t=>{
        pipeReader.Complete();
        writer.TryComplete(t.Exception);
    });

    return channel.Reader;
}

ChannelReader.ReceiveAllAsync()dapat digunakan untuk mengkonsumsi semua item melalui IAsyncEnumerable<T>:

var reader=DeserializeToChannel<MyEvent>(stream,cts.Token);
await foreach(var item in reader.ReadAllAsync(cts.Token))
{
    //Do something with it 
}    
Panagiotis Kanavos
sumber
0

Rasanya Anda perlu menanamkan pembaca aliran Anda sendiri. Anda harus membaca byte satu demi satu dan berhenti segera setelah definisi objek selesai. Memang levelnya cukup rendah. Dengan demikian Anda TIDAK AKAN memuat seluruh file ke dalam RAM, melainkan mengambil bagian yang Anda hadapi. Apakah itu tampaknya menjadi jawaban?

Sereja Bogolubov
sumber
-2

Mungkin Anda bisa menggunakan Newtonsoft.Jsonserializer? https://www.newtonsoft.com/json/help/html/Performance.htm

Terutama lihat bagian:

Optimalkan Penggunaan Memori

Edit

Anda bisa mencoba penghilangan nilai dari JsonTextReader, mis

using (var textReader = new StreamReader(stream))
using (var reader = new JsonTextReader(textReader))
{
    while (await reader.ReadAsync(cancellationToken))
    {
        yield return reader.Value;
    }
}
Miłosz Wieczorek
sumber
Itu tidak menjawab pertanyaan. Ini bukan tentang kinerja sama sekali, ini tentang akses streaming tanpa memuat semua dalam memori
Panagiotis Kanavos
Sudahkah Anda membuka tautan terkait atau hanya mengatakan apa yang Anda pikirkan? Di tautan yang saya kirim di bagian yang saya sebutkan ada potongan kode tentang cara deserialize JSON dari aliran.
Miłosz Wieczorek
Baca pertanyaannya lagi - OP bertanya bagaimana memproses elemen-elemen tanpa menghilangkan bakteri apapun dalam memori. Bukan hanya membaca dari aliran, tetapi hanya memproses apa yang datang dari aliran. I don't want them to be in memory all at once, but I would rather read and process them one by one.Kelas yang relevan di JSON.NET adalah JsonTextReader.
Panagiotis Kanavos
Bagaimanapun, jawaban hanya tautan tidak dianggap sebagai jawaban yang baik, dan tidak ada dalam tautan itu yang menjawab pertanyaan OP. Tautan ke JsonTextReader akan lebih baik
Panagiotis Kanavos