Saya baru mengenal Spark dan saya mencoba membaca data CSV dari file dengan Spark. Inilah yang saya lakukan:
sc.textFile('file.csv')
.map(lambda line: (line.split(',')[0], line.split(',')[1]))
.collect()
Saya berharap panggilan ini memberi saya daftar dari dua kolom pertama file saya tetapi saya mendapatkan kesalahan ini:
File "<ipython-input-60-73ea98550983>", line 1, in <lambda>
IndexError: list index out of range
meskipun file CSV saya lebih dari satu kolom.
python
csv
apache-spark
pyspark
Kernael
sumber
sumber
csv
pustaka bawaan untuk menangani semua pelolosan karena hanya memisahkan dengan koma tidak akan berfungsi jika, katakanlah, ada koma dalam nilainya.","
.Spark 2.0.0+
Anda dapat menggunakan sumber data csv bawaan secara langsung:
atau
tanpa menyertakan dependensi eksternal apa pun.
Spark <2.0.0 :
Alih-alih penguraian manual, yang jauh dari sepele dalam kasus umum, saya akan merekomendasikan
spark-csv
:Pastikan bahwa Spark CSV termasuk dalam jalur (
--packages
,--jars
,--driver-class-path
)Dan muat data Anda sebagai berikut:
Ini dapat menangani pemuatan, inferensi skema, menjatuhkan baris yang salah format dan tidak memerlukan data yang lewat dari Python ke JVM.
Catatan :
Jika Anda mengetahui skemanya, lebih baik hindari inferensi skema dan teruskan ke
DataFrameReader
. Dengan asumsi Anda memiliki tiga kolom - integer, double dan string:sumber
pyspark --packages com.databricks:spark-csv_2.11:1.4.0
(pastikan untuk mengubah versi databricks / spark ke versi yang telah Anda instal).sumber
Dan opsi lain yang terdiri dari membaca file CSV menggunakan Pandas dan kemudian mengimpor Pandas DataFrame ke Spark.
Sebagai contoh:
sumber
Memisahkan dengan koma juga akan memisahkan koma yang ada di dalam bidang (mis.
a,b,"1,2,3",c
), Jadi tidak disarankan. Jawaban zero323 bagus jika Anda ingin menggunakan DataFrames API, tetapi jika Anda ingin tetap menggunakan Spark dasar, Anda dapat mengurai csvs di Python dasar dengan modul csv :EDIT: Seperti yang disebutkan @muon di komentar, ini akan memperlakukan header seperti baris lainnya sehingga Anda harus mengekstraknya secara manual. Misalnya,
header = rdd.first(); rdd = rdd.filter(lambda x: x != header)
(pastikan untuk tidak mengubahheader
sebelum filter mengevaluasi). Tetapi pada titik ini, Anda mungkin lebih baik menggunakan parser csv bawaan.sumber
StringIO
.csv
dapat menggunakan iterable b)__next__
tidak boleh digunakan secara langsung dan akan gagal pada baris kosong. Lihatlah flatMap c) Akan jauh lebih efisien untuk digunakanmapPartitions
daripada menginisialisasi pembaca pada setiap baris :)rdd.mapPartitions(lambda x: csv.reader(x))
bekerja sambilrdd.map(lambda x: csv.reader(x))
melempar kesalahan? Saya berharap keduanya melempar samaTypeError: can't pickle _csv.reader objects
. Ini juga sepertinyamapPartitions
secara otomatis memanggil beberapa yang setara dengan "readlines" padacsv.reader
objek, di mana denganmap
, saya perlu memanggil__next__
secara eksplisit untuk mengeluarkan daftar daricsv.reader
. 2) Di manaflatMap
masuk? Hanya meneleponmapPartitions
saja sudah berhasil bagi saya.rdd.mapPartitions(lambda x: csv.reader(x))
bekerja karenamapPartitions
mengharapkan suatuIterable
objek. Jika Anda ingin lebih eksplisit Anda dapat memahami atau membuat ekspresi.map
saja tidak berfungsi karena tidak mengulangi objek. Oleh karena itu saran saya untuk menggunakanflatMap(lambda x: csv.reader([x]))
yang akan mengulangi pembaca. TapimapPartitions
jauh lebih baik di sini.Ini ada di PYSPARK
Kemudian Anda bisa memeriksanya
sumber
Jika Anda ingin memuat csv sebagai dataframe, Anda dapat melakukan hal berikut:
Ini bekerja dengan baik untukku.
sumber
Ini sejalan dengan apa yang awalnya disarankan JP Mercier tentang penggunaan Pandas, tetapi dengan modifikasi besar: Jika Anda membaca data ke dalam Pandas dalam beberapa bagian, data tersebut akan lebih mudah dibentuk. Artinya, Anda dapat mengurai file yang jauh lebih besar daripada yang sebenarnya dapat ditangani Pandas sebagai satu bagian dan meneruskannya ke Spark dalam ukuran yang lebih kecil. (Ini juga menjawab komentar tentang mengapa seseorang ingin menggunakan Spark jika mereka dapat memuat semuanya ke dalam Pandas.)
sumber
Sekarang, ada juga opsi lain untuk file csv umum: https://github.com/seahboonsiew/pyspark-csv sebagai berikut:
Asumsikan kita memiliki konteks berikut
Pertama, distribusikan pyspark-csv.py ke pelaksana menggunakan SparkContext
Membaca data csv melalui SparkContext dan mengonversinya menjadi DataFrame
sumber
Jika data csv Anda kebetulan tidak berisi baris baru di salah satu bidang, Anda dapat memuat data Anda dengan
textFile()
dan menguraikannyasumber
Jika Anda memiliki satu atau lebih baris dengan jumlah kolom lebih sedikit atau lebih dari 2 dalam dataset maka kesalahan ini mungkin muncul.
Saya juga baru mengenal Pyspark dan mencoba membaca file CSV. Kode berikut berhasil untuk saya:
Dalam kode ini saya menggunakan dataset dari kaggle, tautannya adalah: https://www.kaggle.com/carrie1/ecommerce-data
1. Tanpa menyebutkan skema:
Sekarang periksa kolom: sdfData.columns
Outputnya adalah:
Periksa tipe data untuk setiap kolom:
Ini akan memberikan bingkai data dengan semua kolom dengan tipe data sebagai TipeTipe
2. Dengan skema: Jika Anda mengetahui skema atau ingin mengubah tipe data dari kolom mana pun di tabel di atas, gunakan ini (katakanlah saya memiliki kolom berikut dan ingin mereka dalam tipe data tertentu untuk masing-masing)
Sekarang periksa skema untuk tipe data setiap kolom:
Diedit: Kita juga dapat menggunakan baris kode berikut tanpa menyebutkan skema secara eksplisit:
Outputnya adalah:
Outputnya akan terlihat seperti ini:
sumber
Saat menggunakan
spark.read.csv
, saya menemukan bahwa menggunakan opsiescape='"'
danmultiLine=True
memberikan solusi yang paling konsisten untuk standar CSV , dan menurut pengalaman saya bekerja paling baik dengan file CSV yang diekspor dari Google Spreadsheet.Itu adalah,
sumber
import pyspark as spark
?spark
sudah diinisialisasi. Dalam skrip yang dikirimkan olehspark-submit
, Anda dapat membuatnya sebagaifrom pyspark.sql import SparkSession; spark = SparkSession.builder.getOrCreate()
.