Bagaimana saya bisa mengubah RDD ( org.apache.spark.rdd.RDD[org.apache.spark.sql.Row]
) ke Dataframe org.apache.spark.sql.DataFrame
. Saya mengkonversi dataframe menjadi rdd menggunakan .rdd
. Setelah mengolahnya, saya ingin kembali dalam bingkai data. Bagaimana saya bisa melakukan ini?
scala
apache-spark
apache-spark-sql
rdd
pengguna568109
sumber
sumber
Jawaban:
SqlContext
memiliki sejumlahcreateDataFrame
metode yang membuatDataFrame
suatuRDD
. Saya membayangkan salah satunya akan bekerja untuk konteks Anda.Sebagai contoh:
sumber
Kode ini berfungsi sempurna dari Spark 2.x dengan Scala 2.11
Impor kelas yang diperlukan
Buat
SparkSession
Obyek, dan Ini diaspark
Mari kita
RDD
membuatnyaDataFrame
Metode 1
Menggunakan
SparkSession.createDataFrame(RDD obj)
.Metode 2
Menggunakan
SparkSession.createDataFrame(RDD obj)
dan menentukan nama kolom.Metode 3 (Jawaban aktual untuk pertanyaan)
Cara ini membutuhkan input yang
rdd
harus bertipeRDD[Row]
.buat skema
Sekarang terapkan keduanya
rowsRdd
danschema
kecreateDataFrame()
sumber
Dengan asumsi RDD Anda [baris] disebut rdd, Anda dapat menggunakan:
sumber
Catatan: Jawaban ini awalnya diposting di sini
Saya memposting jawaban ini karena saya ingin membagikan detail tambahan tentang opsi yang tersedia yang tidak saya temukan di jawaban lain
Untuk membuat DataFrame dari RDD of Rows, ada dua opsi utama:
1) Seperti yang sudah ditunjukkan, Anda dapat menggunakan
toDF()
yang dapat diimpor olehimport sqlContext.implicits._
. Namun, pendekatan ini hanya berfungsi untuk jenis RDD berikut:RDD[Int]
RDD[Long]
RDD[String]
RDD[T <: scala.Product]
(sumber: Scaladoc dari
SQLContext.implicits
objek)Tanda tangan terakhir sebenarnya berarti bahwa ia dapat bekerja untuk RDD tupel atau RDD kelas kasus (karena tupel dan kelas kasus adalah subclass dari
scala.Product
).Jadi, untuk menggunakan pendekatan ini untuk
RDD[Row]
, Anda harus memetakannya keRDD[T <: scala.Product]
. Ini dapat dilakukan dengan memetakan setiap baris ke kelas kasus khusus atau ke tupel, seperti dalam cuplikan kode berikut:atau
Kelemahan utama dari pendekatan ini (menurut saya) adalah bahwa Anda harus secara eksplisit mengatur skema DataFrame yang dihasilkan dalam fungsi peta, kolom demi kolom. Mungkin ini bisa dilakukan secara terprogram jika Anda tidak tahu skema sebelumnya, tetapi keadaan bisa sedikit berantakan di sana. Jadi, sebagai alternatif, ada opsi lain:
2) Anda dapat menggunakan
createDataFrame(rowRDD: RDD[Row], schema: StructType)
seperti pada jawaban yang diterima, yang tersedia di objek SQLContext . Contoh untuk mengonversi RDD dari DataFrame lama:Perhatikan bahwa tidak perlu mengatur kolom skema secara eksplisit. Kami menggunakan kembali skema DF lama, yang
StructType
berkelas dan dapat dengan mudah diperpanjang. Namun, pendekatan ini kadang-kadang tidak mungkin, dan dalam beberapa kasus bisa jadi kurang efisien daripada yang pertama.sumber
import sqlContext.implicits.
Misalkan Anda memiliki
DataFrame
dan ingin melakukan beberapa modifikasi pada bidang data dengan mengubahnya menjadiRDD[Row]
.Untuk mengkonversi kembali ke
DataFrame
dariRDD
kita perlu mendefinisikan tipe struktur dariRDD
.Jika tipe data itu
Long
maka akan menjadi sepertiLongType
dalam struktur.Jika
String
kemudianStringType
dalam struktur.Sekarang Anda dapat mengubah RDD ke DataFrame menggunakan metode createDataFrame .
sumber
Berikut adalah contoh sederhana dari mengubah Daftar Anda menjadi Spark RDD dan kemudian mengubah RDD Spark itu menjadi Dataframe.
Harap dicatat bahwa saya telah menggunakan scala REPL Spark-shell untuk mengeksekusi kode berikut, Here sc adalah instance dari SparkContext yang secara implisit tersedia di Spark-shell. Semoga ini menjawab pertanyaan Anda.
sumber
Metode 1: (Scala)
Metode 2: (Scala)
Metode 1: (Python)
Metode 2: (Python)
Mengekstraksi nilai dari objek baris dan kemudian menerapkan kelas kasus untuk mengkonversi rdd ke DF
sumber
Pada versi percikan yang lebih baru (2.0+)
sumber
Dengan asumsi val spark adalah produk dari SparkSession.builder ...
Langkah yang sama, tetapi dengan deklarasi val lebih sedikit:
sumber
Saya mencoba menjelaskan solusinya menggunakan masalah jumlah kata . 1. Baca file menggunakan sc
Metode untuk membuat DF
Baca file menggunakan spark
Rdd ke Dataframe
val df = sc.textFile ("D: // cca175 / data /") .toDF ("t1") df.show
Metode 1
Buat jumlah kata RDD ke Dataframe
Metode2
Buat Dataframe dari Rdd
Metode3
Tentukan Skema
import org.apache.spark.sql.types._
skema val = StructType baru (). tambahkan (StructField ("word", StringType, true)). add (StructField ("count", StringType, true))
Buat RowRDD
Buat DataFrame dari RDD dengan skema
val df = spark.createDataFrame (rowRdd, schema)
df.show
sumber
Untuk mengonversi Array [Baris] ke DataFrame atau Dataset, yang berikut ini berfungsi dengan elegan:
Katakanlah, skema adalah StructType untuk baris, lalu
sumber