Muat file CSV dengan Spark

110

Saya baru mengenal Spark dan saya mencoba membaca data CSV dari file dengan Spark. Inilah yang saya lakukan:

sc.textFile('file.csv')
    .map(lambda line: (line.split(',')[0], line.split(',')[1]))
    .collect()

Saya berharap panggilan ini memberi saya daftar dari dua kolom pertama file saya tetapi saya mendapatkan kesalahan ini:

File "<ipython-input-60-73ea98550983>", line 1, in <lambda>
IndexError: list index out of range

meskipun file CSV saya lebih dari satu kolom.

Kernael
sumber

Jawaban:

63

Apakah Anda yakin bahwa semua baris memiliki minimal 2 kolom? Bisakah Anda mencoba sesuatu seperti, hanya untuk memeriksa ?:

sc.textFile("file.csv") \
    .map(lambda line: line.split(",")) \
    .filter(lambda line: len(line)>1) \
    .map(lambda line: (line[0],line[1])) \
    .collect()

Atau, Anda dapat mencetak pelakunya (jika ada):

sc.textFile("file.csv") \
    .map(lambda line: line.split(",")) \
    .filter(lambda line: len(line)<=1) \
    .collect()
G Quintana
sumber
Itu dia, satu baris dengan hanya satu kolom, terima kasih.
Kernael
2
Lebih baik mengurai menggunakan csvpustaka bawaan untuk menangani semua pelolosan karena hanya memisahkan dengan koma tidak akan berfungsi jika, katakanlah, ada koma dalam nilainya.
sudo
4
Ada banyak alat untuk mengurai csv, jangan menemukan kembali roda
Stephen
2
Kode ini akan rusak jika ada koma di dalam tanda kutip. Parsing csv lebih rumit daripada hanya memisahkan ",".
Alceu Costa
Ini istirahat untuk koma. Ini sangat buruk.
rjurney
184

Spark 2.0.0+

Anda dapat menggunakan sumber data csv bawaan secara langsung:

spark.read.csv(
    "some_input_file.csv", header=True, mode="DROPMALFORMED", schema=schema
)

atau

(spark.read
    .schema(schema)
    .option("header", "true")
    .option("mode", "DROPMALFORMED")
    .csv("some_input_file.csv"))

tanpa menyertakan dependensi eksternal apa pun.

Spark <2.0.0 :

Alih-alih penguraian manual, yang jauh dari sepele dalam kasus umum, saya akan merekomendasikan spark-csv:

Pastikan bahwa Spark CSV termasuk dalam jalur ( --packages, --jars, --driver-class-path)

Dan muat data Anda sebagai berikut:

(df = sqlContext
    .read.format("com.databricks.spark.csv")
    .option("header", "true")
    .option("inferschema", "true")
    .option("mode", "DROPMALFORMED")
    .load("some_input_file.csv"))

Ini dapat menangani pemuatan, inferensi skema, menjatuhkan baris yang salah format dan tidak memerlukan data yang lewat dari Python ke JVM.

Catatan :

Jika Anda mengetahui skemanya, lebih baik hindari inferensi skema dan teruskan ke DataFrameReader. Dengan asumsi Anda memiliki tiga kolom - integer, double dan string:

from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType

schema = StructType([
    StructField("A", IntegerType()),
    StructField("B", DoubleType()),
    StructField("C", StringType())
])

(sqlContext
    .read
    .format("com.databricks.spark.csv")
    .schema(schema)
    .option("header", "true")
    .option("mode", "DROPMALFORMED")
    .load("some_input_file.csv"))
nol323
sumber
6
Jika Anda melakukan ini, jangan lupa untuk menyertakan paket databricks csv saat Anda membuka shell pyspark atau menggunakan spark-submit. Misalnya, pyspark --packages com.databricks:spark-csv_2.11:1.4.0(pastikan untuk mengubah versi databricks / spark ke versi yang telah Anda instal).
Galen Long
Apakah csvContext atau sqlContext di pyspark? Karena dalam scala Anda memerlukan csvContext
Geoffrey Anderson
28
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

df = spark.read.csv("/home/stp/test1.csv",header=True,sep="|");

print(df.collect())
y durga prasad
sumber
gunakan 'sep not' separator 'sebagai berikut: df = spark.read.csv ("/ home / stp / test1.csv", header = True, sep = "|")
Grant Shannon
18

Dan opsi lain yang terdiri dari membaca file CSV menggunakan Pandas dan kemudian mengimpor Pandas DataFrame ke Spark.

Sebagai contoh:

from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd

sc = SparkContext('local','example')  # if using locally
sql_sc = SQLContext(sc)

pandas_df = pd.read_csv('file.csv')  # assuming the file contains a header
# pandas_df = pd.read_csv('file.csv', names = ['column 1','column 2']) # if no header
s_df = sql_sc.createDataFrame(pandas_df)
JP Mercier
sumber
7
Mengapa OP ingin melakukannya pada percikan jika dia dapat memuat data di panda
WoodChopper
Tidak ingin menginstal atau menentukan dependensi pada setiap spark cluster ....
SummerEla
Panda mengizinkan pemotongan file saat membaca sehingga masih ada kasus penggunaan di sini untuk meminta Pandas menangani penguraian file awal. Lihat jawaban saya di bawah untuk kode.
abby sobh
Perhatian: Panda juga menangani skema kolom dengan cara yang berbeda dari percikan api terutama bila ada bagian yang kosong. Lebih aman untuk hanya memuat csv sebagai string untuk setiap kolom.
AntiPawn79
@WoodChopper Anda dapat menggunakan Pandas sebagai UDF di Spark, bukan?
flow2k
16

Memisahkan dengan koma juga akan memisahkan koma yang ada di dalam bidang (mis. a,b,"1,2,3",c), Jadi tidak disarankan. Jawaban zero323 bagus jika Anda ingin menggunakan DataFrames API, tetapi jika Anda ingin tetap menggunakan Spark dasar, Anda dapat mengurai csvs di Python dasar dengan modul csv :

# works for both python 2 and 3
import csv
rdd = sc.textFile("file.csv")
rdd = rdd.mapPartitions(lambda x: csv.reader(x))

EDIT: Seperti yang disebutkan @muon di komentar, ini akan memperlakukan header seperti baris lainnya sehingga Anda harus mengekstraknya secara manual. Misalnya, header = rdd.first(); rdd = rdd.filter(lambda x: x != header)(pastikan untuk tidak mengubah headersebelum filter mengevaluasi). Tetapi pada titik ini, Anda mungkin lebih baik menggunakan parser csv bawaan.

Galen Long
sumber
1
Anda tidak perlu Hive untuk menggunakan DataFrames. Mengenai solusi Anda: a) Tidak perlu StringIO. csvdapat menggunakan iterable b) __next__tidak boleh digunakan secara langsung dan akan gagal pada baris kosong. Lihatlah flatMap c) Akan jauh lebih efisien untuk digunakan mapPartitionsdaripada menginisialisasi pembaca pada setiap baris :)
zero323
Terima kasih banyak atas koreksinya! Sebelum saya mengedit jawaban saya, saya ingin memastikan bahwa saya mengerti sepenuhnya. 1) Mengapa rdd.mapPartitions(lambda x: csv.reader(x))bekerja sambil rdd.map(lambda x: csv.reader(x))melempar kesalahan? Saya berharap keduanya melempar sama TypeError: can't pickle _csv.reader objects. Ini juga sepertinya mapPartitionssecara otomatis memanggil beberapa yang setara dengan "readlines" pada csv.readerobjek, di mana dengan map, saya perlu memanggil __next__secara eksplisit untuk mengeluarkan daftar dari csv.reader. 2) Di mana flatMapmasuk? Hanya menelepon mapPartitionssaja sudah berhasil bagi saya.
Galen Long
1
rdd.mapPartitions(lambda x: csv.reader(x))bekerja karena mapPartitionsmengharapkan suatu Iterableobjek. Jika Anda ingin lebih eksplisit Anda dapat memahami atau membuat ekspresi. mapsaja tidak berfungsi karena tidak mengulangi objek. Oleh karena itu saran saya untuk menggunakan flatMap(lambda x: csv.reader([x]))yang akan mengulangi pembaca. Tapi mapPartitionsjauh lebih baik di sini.
zero323
1
perhatikan bahwa ini akan membaca header sebagai deretan data, bukan sebagai header
muon
7

Ini ada di PYSPARK

path="Your file path with file name"

df=spark.read.format("csv").option("header","true").option("inferSchema","true").load(path)

Kemudian Anda bisa memeriksanya

df.show(5)
df.count()
jerawat amarnath
sumber
6

Jika Anda ingin memuat csv sebagai dataframe, Anda dapat melakukan hal berikut:

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

df = sqlContext.read.format('com.databricks.spark.csv') \
    .options(header='true', inferschema='true') \
    .load('sampleFile.csv') # this is your csv file

Ini bekerja dengan baik untukku.

Jeril
sumber
@GalenLong jika Anda tidak keberatan, dapatkah Anda membagikan jawaban yang sudah ada
Jeril
Aneh, saya bersumpah ada jawaban lain dengan solusi ini. Mungkin saya bingung dengan pertanyaan lain. Salahku.
Galen Long
5

Ini sejalan dengan apa yang awalnya disarankan JP Mercier tentang penggunaan Pandas, tetapi dengan modifikasi besar: Jika Anda membaca data ke dalam Pandas dalam beberapa bagian, data tersebut akan lebih mudah dibentuk. Artinya, Anda dapat mengurai file yang jauh lebih besar daripada yang sebenarnya dapat ditangani Pandas sebagai satu bagian dan meneruskannya ke Spark dalam ukuran yang lebih kecil. (Ini juga menjawab komentar tentang mengapa seseorang ingin menggunakan Spark jika mereka dapat memuat semuanya ke dalam Pandas.)

from pyspark import SparkContext
from pyspark.sql import SQLContext
import pandas as pd

sc = SparkContext('local','example')  # if using locally
sql_sc = SQLContext(sc)

Spark_Full = sc.emptyRDD()
chunk_100k = pd.read_csv("Your_Data_File.csv", chunksize=100000)
# if you have headers in your csv file:
headers = list(pd.read_csv("Your_Data_File.csv", nrows=0).columns)

for chunky in chunk_100k:
    Spark_Full +=  sc.parallelize(chunky.values.tolist())

YourSparkDataFrame = Spark_Full.toDF(headers)
# if you do not have headers, leave empty instead:
# YourSparkDataFrame = Spark_Full.toDF()
YourSparkDataFrame.show()
abby sobh
sumber
5

Sekarang, ada juga opsi lain untuk file csv umum: https://github.com/seahboonsiew/pyspark-csv sebagai berikut:

Asumsikan kita memiliki konteks berikut

sc = SparkContext
sqlCtx = SQLContext or HiveContext

Pertama, distribusikan pyspark-csv.py ke pelaksana menggunakan SparkContext

import pyspark_csv as pycsv
sc.addPyFile('pyspark_csv.py')

Membaca data csv melalui SparkContext dan mengonversinya menjadi DataFrame

plaintext_rdd = sc.textFile('hdfs://x.x.x.x/blah.csv')
dataframe = pycsv.csvToDataFrame(sqlCtx, plaintext_rdd)
optimis
sumber
3

Jika data csv Anda kebetulan tidak berisi baris baru di salah satu bidang, Anda dapat memuat data Anda dengan textFile()dan menguraikannya

import csv
import StringIO

def loadRecord(line):
    input = StringIO.StringIO(line)
    reader = csv.DictReader(input, fieldnames=["name1", "name2"])
    return reader.next()

input = sc.textFile(inputFile).map(loadRecord)
iec2011007
sumber
2

Jika Anda memiliki satu atau lebih baris dengan jumlah kolom lebih sedikit atau lebih dari 2 dalam dataset maka kesalahan ini mungkin muncul.

Saya juga baru mengenal Pyspark dan mencoba membaca file CSV. Kode berikut berhasil untuk saya:

Dalam kode ini saya menggunakan dataset dari kaggle, tautannya adalah: https://www.kaggle.com/carrie1/ecommerce-data

1. Tanpa menyebutkan skema:

from pyspark.sql import SparkSession  
scSpark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example: Reading CSV file without mentioning schema") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

sdfData = scSpark.read.csv("data.csv", header=True, sep=",")
sdfData.show()

Sekarang periksa kolom: sdfData.columns

Outputnya adalah:

['InvoiceNo', 'StockCode','Description','Quantity', 'InvoiceDate', 'CustomerID', 'Country']

Periksa tipe data untuk setiap kolom:

sdfData.schema
StructType(List(StructField(InvoiceNo,StringType,true),StructField(StockCode,StringType,true),StructField(Description,StringType,true),StructField(Quantity,StringType,true),StructField(InvoiceDate,StringType,true),StructField(UnitPrice,StringType,true),StructField(CustomerID,StringType,true),StructField(Country,StringType,true)))

Ini akan memberikan bingkai data dengan semua kolom dengan tipe data sebagai TipeTipe

2. Dengan skema: Jika Anda mengetahui skema atau ingin mengubah tipe data dari kolom mana pun di tabel di atas, gunakan ini (katakanlah saya memiliki kolom berikut dan ingin mereka dalam tipe data tertentu untuk masing-masing)

from pyspark.sql import SparkSession  
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import DoubleType, IntegerType, StringType
    schema = StructType([\
        StructField("InvoiceNo", IntegerType()),\
        StructField("StockCode", StringType()), \
        StructField("Description", StringType()),\
        StructField("Quantity", IntegerType()),\
        StructField("InvoiceDate", StringType()),\
        StructField("CustomerID", DoubleType()),\
        StructField("Country", StringType())\
    ])

scSpark = SparkSession \
    .builder \
    .appName("Python Spark SQL example: Reading CSV file with schema") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

sdfData = scSpark.read.csv("data.csv", header=True, sep=",", schema=schema)

Sekarang periksa skema untuk tipe data setiap kolom:

sdfData.schema

StructType(List(StructField(InvoiceNo,IntegerType,true),StructField(StockCode,StringType,true),StructField(Description,StringType,true),StructField(Quantity,IntegerType,true),StructField(InvoiceDate,StringType,true),StructField(CustomerID,DoubleType,true),StructField(Country,StringType,true)))

Diedit: Kita juga dapat menggunakan baris kode berikut tanpa menyebutkan skema secara eksplisit:

sdfData = scSpark.read.csv("data.csv", header=True, inferSchema = True)
sdfData.schema

Outputnya adalah:

StructType(List(StructField(InvoiceNo,StringType,true),StructField(StockCode,StringType,true),StructField(Description,StringType,true),StructField(Quantity,IntegerType,true),StructField(InvoiceDate,StringType,true),StructField(UnitPrice,DoubleType,true),StructField(CustomerID,IntegerType,true),StructField(Country,StringType,true)))

Outputnya akan terlihat seperti ini:

sdfData.show()

+---------+---------+--------------------+--------+--------------+----------+-------+
|InvoiceNo|StockCode|         Description|Quantity|   InvoiceDate|CustomerID|Country|
+---------+---------+--------------------+--------+--------------+----------+-------+
|   536365|   85123A|WHITE HANGING HEA...|       6|12/1/2010 8:26|      2.55|  17850|
|   536365|    71053| WHITE METAL LANTERN|       6|12/1/2010 8:26|      3.39|  17850|
|   536365|   84406B|CREAM CUPID HEART...|       8|12/1/2010 8:26|      2.75|  17850|
|   536365|   84029G|KNITTED UNION FLA...|       6|12/1/2010 8:26|      3.39|  17850|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6|12/1/2010 8:26|      3.39|  17850|
|   536365|    22752|SET 7 BABUSHKA NE...|       2|12/1/2010 8:26|      7.65|  17850|
|   536365|    21730|GLASS STAR FROSTE...|       6|12/1/2010 8:26|      4.25|  17850|
|   536366|    22633|HAND WARMER UNION...|       6|12/1/2010 8:28|      1.85|  17850|
|   536366|    22632|HAND WARMER RED P...|       6|12/1/2010 8:28|      1.85|  17850|
|   536367|    84879|ASSORTED COLOUR B...|      32|12/1/2010 8:34|      1.69|  13047|
|   536367|    22745|POPPY'S PLAYHOUSE...|       6|12/1/2010 8:34|       2.1|  13047|
|   536367|    22748|POPPY'S PLAYHOUSE...|       6|12/1/2010 8:34|       2.1|  13047|
|   536367|    22749|FELTCRAFT PRINCES...|       8|12/1/2010 8:34|      3.75|  13047|
|   536367|    22310|IVORY KNITTED MUG...|       6|12/1/2010 8:34|      1.65|  13047|
|   536367|    84969|BOX OF 6 ASSORTED...|       6|12/1/2010 8:34|      4.25|  13047|
|   536367|    22623|BOX OF VINTAGE JI...|       3|12/1/2010 8:34|      4.95|  13047|
|   536367|    22622|BOX OF VINTAGE AL...|       2|12/1/2010 8:34|      9.95|  13047|
|   536367|    21754|HOME BUILDING BLO...|       3|12/1/2010 8:34|      5.95|  13047|
|   536367|    21755|LOVE BUILDING BLO...|       3|12/1/2010 8:34|      5.95|  13047|
|   536367|    21777|RECIPE BOX WITH M...|       4|12/1/2010 8:34|      7.95|  13047|
+---------+---------+--------------------+--------+--------------+----------+-------+
only showing top 20 rows
Yogesh
sumber
1

Saat menggunakan spark.read.csv, saya menemukan bahwa menggunakan opsi escape='"'dan multiLine=Truememberikan solusi yang paling konsisten untuk standar CSV , dan menurut pengalaman saya bekerja paling baik dengan file CSV yang diekspor dari Google Spreadsheet.

Itu adalah,

#set inferSchema=False to read everything as string
df = spark.read.csv("myData.csv", escape='"', multiLine=True,
     inferSchema=False, header=True)
flow2k
sumber
darimana percikan itu berasal? itu import pyspark as spark?
Luk Aron
@LukAron Dalam shell pyspark, sparksudah diinisialisasi. Dalam skrip yang dikirimkan oleh spark-submit, Anda dapat membuatnya sebagai from pyspark.sql import SparkSession; spark = SparkSession.builder.getOrCreate().
flow2k