Katakanlah saya meminta file json besar yang berisi daftar banyak objek. Saya tidak ingin mereka berada di memori sekaligus, tetapi saya lebih suka membaca dan memprosesnya satu per satu. Jadi saya perlu mengubah System.IO.Stream
aliran async menjadi IAsyncEnumerable<T>
. Bagaimana cara menggunakan System.Text.Json
API baru untuk melakukan ini?
private async IAsyncEnumerable<T> GetList<T>(Uri url, CancellationToken cancellationToken = default)
{
using (var httpResponse = await httpClient.GetAsync(url, cancellationToken))
{
using (var stream = await httpResponse.Content.ReadAsStreamAsync())
{
// Probably do something with JsonSerializer.DeserializeAsync here without serializing the entire thing in one go
}
}
}
c#
.net-core
.net-core-3.0
c#-8.0
system.text.json
Rick de Water
sumber
sumber
Utf8JsonReader
, silakan lihat beberapa sampel github dan di thread yang ada jugaGetAsync
dengan sendirinya kembali ketika seluruh respons diterima. Anda perlu menggunakanSendAsync
`HttpCompletionOption.ResponseContentRead` sebagai gantinya. Setelah Anda memilikinya, Anda dapat menggunakan JsonTextReader JSON.NET . MenggunakanSystem.Text.Json
untuk ini tidak semudah yang ditunjukkan masalah ini . Fungsionalitas tidak tersedia dan menerapkannya dalam alokasi rendah menggunakan struct tidak sepeleJawaban:
Ya, serializer JSON (de) yang benar-benar streaming akan menjadi peningkatan kinerja yang bagus, di banyak tempat.
Sayangnya,
System.Text.Json
tidak melakukan ini saat ini. Saya tidak yakin apakah itu akan terjadi di masa depan - saya harap begitu! Deserialisasi JSON yang benar-benar mengalir ternyata agak menantang.Anda dapat memeriksa apakah Utf8Json yang sangat cepat mendukungnya, mungkin.
Namun, mungkin ada solusi khusus untuk situasi spesifik Anda, karena persyaratan Anda tampaknya membatasi kesulitan.
Idenya adalah untuk secara manual membaca satu item dari array pada suatu waktu. Kami memanfaatkan fakta bahwa setiap item dalam daftar, dengan sendirinya, adalah objek JSON yang valid.
Anda dapat melewati secara manual melewati
[
(untuk item pertama) atau,
(untuk setiap item berikutnya). Maka saya pikir taruhan terbaik Anda adalah menggunakan .NET CoreUtf8JsonReader
untuk menentukan di mana objek saat ini berakhir, dan memberi makan byte yang dipindaiJsonDeserializer
.Dengan cara ini, Anda hanya melakukan sedikit buffering pada satu objek pada suatu waktu.
Dan karena kita berbicara kinerja, Anda bisa mendapatkan input dari
PipeReader
, saat Anda melakukannya. :-)sumber
TL; DR Ini bukan hal sepele
Sepertinya seseorang telah memposting kode lengkap untuk
Utf8JsonStreamReader
struct yang membaca buffer dari aliran dan memberi mereka ke Utf8JsonRreader, yang memungkinkan deserialisasi dengan mudahJsonSerializer.Deserialize<T>(ref newJsonReader, options);
. Kode juga tidak sepele. Pertanyaan terkait ada di sini dan jawabannya ada di sini .Itu tidak cukup -
HttpClient.GetAsync
akan kembali hanya setelah seluruh respons diterima, pada dasarnya buffering semua dalam memori.Untuk menghindari ini, HttpClient.GetAsync (string, HttpCompletionOption) harus digunakan dengan
HttpCompletionOption.ResponseHeadersRead
.Loop deserialisasi juga harus memeriksa token pembatalan, dan apakah keluar atau melempar jika diberi sinyal. Kalau tidak, perulangan akan berlangsung sampai seluruh aliran diterima dan diproses.
Kode ini didasarkan pada contoh jawaban terkait dan menggunakan
HttpCompletionOption.ResponseHeadersRead
dan memeriksa token pembatalan. Itu dapat mengurai string JSON yang berisi array item yang tepat, misalnya:Panggilan pertama untuk
jsonStreamReader.Read()
bergerak ke awal array sedangkan yang kedua bergerak ke awal objek pertama. Loop itu sendiri berakhir ketika ujung array (]
) terdeteksi.Fragmen JSON, alias streaming JSON alias ... *
Ini cukup umum dalam skenario streaming atau pencatatan skenario untuk menambahkan objek JSON individual ke file, satu elemen per baris misalnya:
Ini bukan dokumen JSON yang valid tetapi masing-masing fragmen valid. Ini memiliki beberapa keuntungan untuk data besar / skenario yang sangat bersamaan. Menambahkan acara baru hanya membutuhkan menambahkan baris baru ke file, bukan mem-parsing dan membangun kembali seluruh file. Pemrosesan , khususnya pemrosesan paralel lebih mudah karena dua alasan:
Menggunakan StreamReader
Cara alokasikan-y untuk melakukan ini adalah dengan menggunakan TextReader, membaca satu baris sekaligus dan menguraikannya dengan JsonSerializer.Deserialize :
Itu jauh lebih sederhana daripada kode yang deserializes array yang tepat. Ada dua masalah:
ReadLineAsync
tidak menerima token pembatalanIni mungkin cukup meskipun mencoba untuk menghasilkan
ReadOnlySpan<Byte>
buffer yang dibutuhkan oleh JsonSerializer. Pergeseran tidak sepele.Pipelines dan SequenceReader
Untuk menghindari semua lokasi, kita perlu mendapat
ReadOnlySpan<byte>
dari stream. Untuk melakukan ini, Anda perlu menggunakan pipa System.IO.Pipeline dan struct SequenceReader . Pengantar SequenceReader karya Steve Gordon menjelaskan bagaimana kelas ini dapat digunakan untuk membaca data dari aliran menggunakan pembatas.Sayangnya,
SequenceReader
ini adalah struct ref yang artinya tidak dapat digunakan dalam metode async atau lokal. Karena itulah Steve Gordon dalam artikelnya menciptakan ametode untuk membaca item membentuk ReadOnlySequence dan mengembalikan posisi akhir, sehingga PipeReader dapat melanjutkan darinya. Sayangnya kami ingin mengembalikan IEnumerable atau IAsyncEnumerable, dan metode iterator tidak suka
in
atauout
parameter baik.Kami dapat mengumpulkan barang-barang yang telah di-deserialisasi dalam Daftar atau Antrian dan mengembalikannya sebagai hasil tunggal, tetapi itu masih akan mengalokasikan daftar, buffer, atau simpul dan harus menunggu semua item dalam buffer untuk di-deserialisasi sebelum kembali:
Kita membutuhkan sesuatu yang bertindak seperti enumerable tanpa memerlukan metode iterator, bekerja dengan async dan tidak buffering semuanya jalan.
Menambahkan Saluran untuk menghasilkan IAsyncEnumerable
ChannelReader.ReadAllAsync mengembalikan IAsyncEnumerable. Kami dapat mengembalikan ChannelReader dari metode yang tidak bisa berfungsi sebagai iterator dan masih menghasilkan aliran elemen tanpa caching.
Mengadaptasi kode Steve Gordon untuk menggunakan saluran, kami mendapatkan ReadItems (ChannelWriter ...) dan
ReadLastItem
metode. Yang pertama, membaca satu item pada satu waktu, hingga menggunakan baris baruReadOnlySpan<byte> itemBytes
. Ini dapat digunakan olehJsonSerializer.Deserialize
. JikaReadItems
tidak dapat menemukan pembatas, ia mengembalikan posisinya sehingga PipelineReader dapat menarik potongan berikutnya dari aliran.Ketika kita mencapai potongan terakhir dan tidak ada pembatas lain, ReadLastItem` membaca byte yang tersisa dan membatalkan deserialisasi.
Kode ini hampir identik dengan Steve Gordon. Alih-alih menulis ke Konsol, kami menulis ke ChannelWriter.
The
DeserializeToChannel<T>
Metode menciptakan pembaca Pipeline di atas sungai, menciptakan saluran dan mulai tugas pekerja yang mem-parsing potongan dan dorongan mereka untuk saluran:ChannelReader.ReceiveAllAsync()
dapat digunakan untuk mengkonsumsi semua item melaluiIAsyncEnumerable<T>
:sumber
Rasanya Anda perlu menanamkan pembaca aliran Anda sendiri. Anda harus membaca byte satu demi satu dan berhenti segera setelah definisi objek selesai. Memang levelnya cukup rendah. Dengan demikian Anda TIDAK AKAN memuat seluruh file ke dalam RAM, melainkan mengambil bagian yang Anda hadapi. Apakah itu tampaknya menjadi jawaban?
sumber
Mungkin Anda bisa menggunakan
Newtonsoft.Json
serializer? https://www.newtonsoft.com/json/help/html/Performance.htmTerutama lihat bagian:
Edit
Anda bisa mencoba penghilangan nilai dari JsonTextReader, mis
sumber
I don't want them to be in memory all at once, but I would rather read and process them one by one.
Kelas yang relevan di JSON.NET adalah JsonTextReader.