Mengimpor konten file csv ke dalam kerangka data pyspark

12

Bagaimana saya bisa mengimpor file .csv ke dalam dataframe pyspark? Saya bahkan mencoba membaca file csv di Pandas dan kemudian mengubahnya menjadi dataframe percikan menggunakan createDataFrame, tetapi masih menunjukkan beberapa kesalahan. Bisakah seseorang membimbing saya melalui ini? Juga, tolong beri tahu saya bagaimana cara mengimpor file xlsx? Saya mencoba mengimpor konten csv ke dalam kerangka data panda dan kemudian mengonversinya menjadi bingkai data percikan, tetapi menunjukkan kesalahan:

"Py4JJavaError" An error occurred while calling o28.applySchemaToPythonRDD. : java.lang.RuntimeException: java.lang.RuntimeException: Unable to instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient 

Kode saya adalah:

from pyspark import SparkContext 
from pyspark.sql import SQLContext 
import pandas as pd 
sqlc=SQLContext(sc) 
df=pd.read_csv(r'D:\BestBuy\train.csv') 
sdf=sqlc.createDataFrame(df) 
neha
sumber
1
Jika Anda memiliki pesan kesalahan, Anda harus mempostingnya; kemungkinan besar memiliki info penting dalam membantu men-debug situasi.
jagartner
Saya mencoba untuk mengimpor konten csv ke dalam kerangka data panda dan kemudian mengubahnya menjadi frame data percikan .... tetapi itu menunjukkan kesalahan seperti "Py4JJavaError" Terjadi kesalahan saat memanggil o28.applySchemaToPythonRDD. : java.lang.RuntimeException: java.lang.RuntimeException: Tidak dapat membuat instantiate org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient
neha
dan kode saya adalah -> dari pyspark import SparkContext dari pyspark.sql import SQLContext import panda as pd sqlc = SQLContext (sc) df = pd.read_csv (r'D: \ BestBuy \ train.csv ') sdf = sqlc.createDataFrame (df) ----> Kesalahan
neha
1
Selamat datang di DataScience.SE! Harap edit posting asli Anda alih-alih menambahkan komentar.
Emre
path file harus dalam HDFS maka hanya Anda yang dapat menjalankan data
Prakash Reddy

Jawaban:

13

"Bagaimana saya bisa mengimpor file .csv ke dalam kerangka data pyspark?" - ada banyak cara untuk melakukan ini; yang paling sederhana adalah memulai pyspark dengan modul spark-csv Databrick. Anda dapat melakukan ini dengan memulai pyspark dengan

pyspark --packages com.databricks:spark-csv_2.10:1.4.0

maka Anda dapat mengikuti langkah-langkah berikut:

from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

df = sqlContext.read.format('com.databricks.spark.csv').options(header='true', inferschema='true').load('cars.csv')

Metode lain adalah membaca dalam file teks sebagai rdd menggunakan

myrdd = sc.textFile("yourfile.csv").map(lambda line: line.split(","))

Kemudian ubah data Anda sehingga setiap item dalam format yang benar untuk skema (yaitu Ints, Strings, Floats, dll.). Anda ingin menggunakannya

>>> from pyspark.sql import Row
>>> Person = Row('name', 'age')
>>> person = rdd.map(lambda r: Person(*r))
>>> df2 = sqlContext.createDataFrame(person)
>>> df2.collect()
[Row(name=u'Alice', age=1)]
>>> from pyspark.sql.types import *
>>> schema = StructType([
...    StructField("name", StringType(), True),
...    StructField("age", IntegerType(), True)])
>>> df3 = sqlContext.createDataFrame(rdd, schema)
>>> df3.collect()
[Row(name=u'Alice', age=1)]

Referensi: http://spark.apache.org/docs/1.6.1/api/python/pyspark.sql.html#pyspark.sql.Row

"Juga, tolong beri tahu saya bagaimana saya bisa mengimpor file xlsx?" - File Excel tidak digunakan dalam "Big Data"; Spark dimaksudkan untuk digunakan dengan file besar atau database. Jika Anda memiliki file Excel yang berukuran 50GB, maka Anda melakukan kesalahan. Excel bahkan tidak akan bisa membuka file sebesar itu; dari pengalaman saya, apa pun di atas 20MB dan Excel mati.

Jon
sumber
Saya pikir mungkin ada masalah dengan pendekatan RDD di atas: bidang mungkin berisi baris baru (meskipun dikelilingi oleh tanda kutip ganda), yaitu, tools.ietf.org/html/rfc4180#section-2 .
flow2k
Anda dapat menggunakan alat untuk mengonversi file xlsx ke csv (hal-hal seperti apis gnumeric atau open office). maka Anda dapat melakukan ilmu data seperti biasa
vpathak
2

Berikut ini bekerja dengan baik untuk saya:

from pyspark.sql.types import *
schema = StructType([StructField("name", StringType(), True),StructField("age", StringType(), True)]
pd_df = pd.read_csv("<inputcsvfile>")
sp_df = spark.createDataFrame(pd_df, schema=schema)
Saurabh Agrawal
sumber
1

Di direktori lokal saya ada file 'temp.csv'. Dari sana, menggunakan instance lokal saya melakukan hal berikut:

>>> from pyspark import SQLContext
>>> from pyspark.sql import Row
>>> sql_c = SQLContext(sc)
>>> d0 = sc.textFile('./temp.csv')
>>> d0.collect()
[u'a,1,.2390', u'b,2,.4390', u'c,3,.2323']
>>> d1 = d0.map(lambda x: x.split(',')).map(lambda x: Row(label = x[0], number = int(x[1]), value = float(x[2])))
>>> d1.take(1)
[Row(label=u'a', number=1, value=0.239)]
>>> df = sql_c.createDataFrame(d1)
>>> df_cut = df[df.number>1]
>>> df_cut.select('label', 'value').collect()
[Row(label=u'b', value=0.439), Row(label=u'c', value=0.2323)]

Jadi d0 adalah file teks mentah yang kami kirim ke RDD percikan. Agar Anda dapat membuat bingkai data, Anda ingin memecah csv terpisah, dan membuat setiap entri menjadi tipe baris, seperti yang saya lakukan saat membuat d1. Langkah terakhir adalah membuat frame data dari RDD.

Jagartner
sumber
0

Anda dapat menggunakan paket spark-csv oleh DataBricks yang melakukan banyak hal untuk Anda secara otomatis, seperti merawat header, menggunakan karakter pelarian, skema otomatis yang menyimpulkan dan sebagainya. Mulai dari Spark 2.0 ada fungsi bawaan untuk berurusan dengan CSV.

Jan van der Vegt
sumber