Fungsi jendela :
Sesuatu seperti ini harus melakukan trik:
import org.apache.spark.sql.functions.{row_number, max, broadcast}
import org.apache.spark.sql.expressions.Window
val df = sc.parallelize(Seq(
(0,"cat26",30.9), (0,"cat13",22.1), (0,"cat95",19.6), (0,"cat105",1.3),
(1,"cat67",28.5), (1,"cat4",26.8), (1,"cat13",12.6), (1,"cat23",5.3),
(2,"cat56",39.6), (2,"cat40",29.7), (2,"cat187",27.9), (2,"cat68",9.8),
(3,"cat8",35.6))).toDF("Hour", "Category", "TotalValue")
val w = Window.partitionBy($"hour").orderBy($"TotalValue".desc)
val dfTop = df.withColumn("rn", row_number.over(w)).where($"rn" === 1).drop("rn")
dfTop.show
// +----+--------+----------+
// |Hour|Category|TotalValue|
// +----+--------+----------+
// | 0| cat26| 30.9|
// | 1| cat67| 28.5|
// | 2| cat56| 39.6|
// | 3| cat8| 35.6|
// +----+--------+----------+
Metode ini akan tidak efisien jika kemiringan data yang signifikan.
Agregasi SQL biasa diikuti olehjoin
:
Atau Anda dapat bergabung dengan bingkai data gabungan:
val dfMax = df.groupBy($"hour".as("max_hour")).agg(max($"TotalValue").as("max_value"))
val dfTopByJoin = df.join(broadcast(dfMax),
($"hour" === $"max_hour") && ($"TotalValue" === $"max_value"))
.drop("max_hour")
.drop("max_value")
dfTopByJoin.show
// +----+--------+----------+
// |Hour|Category|TotalValue|
// +----+--------+----------+
// | 0| cat26| 30.9|
// | 1| cat67| 28.5|
// | 2| cat56| 39.6|
// | 3| cat8| 35.6|
// +----+--------+----------+
Ini akan menyimpan nilai duplikat (jika ada lebih dari satu kategori per jam dengan nilai total yang sama). Anda dapat menghapus ini sebagai berikut:
dfTopByJoin
.groupBy($"hour")
.agg(
first("category").alias("category"),
first("TotalValue").alias("TotalValue"))
Menggunakan pemesanan lebihstructs
:
Rapi, meskipun tidak diuji dengan sangat baik, trik yang tidak memerlukan fungsi gabungan atau jendela:
val dfTop = df.select($"Hour", struct($"TotalValue", $"Category").alias("vs"))
.groupBy($"hour")
.agg(max("vs").alias("vs"))
.select($"Hour", $"vs.Category", $"vs.TotalValue")
dfTop.show
// +----+--------+----------+
// |Hour|Category|TotalValue|
// +----+--------+----------+
// | 0| cat26| 30.9|
// | 1| cat67| 28.5|
// | 2| cat56| 39.6|
// | 3| cat8| 35.6|
// +----+--------+----------+
Dengan API DataSet (Spark 1.6+, 2.0+):
Spark 1.6 :
case class Record(Hour: Integer, Category: String, TotalValue: Double)
df.as[Record]
.groupBy($"hour")
.reduce((x, y) => if (x.TotalValue > y.TotalValue) x else y)
.show
// +---+--------------+
// | _1| _2|
// +---+--------------+
// |[0]|[0,cat26,30.9]|
// |[1]|[1,cat67,28.5]|
// |[2]|[2,cat56,39.6]|
// |[3]| [3,cat8,35.6]|
// +---+--------------+
Spark 2.0 atau lebih baru :
df.as[Record]
.groupByKey(_.Hour)
.reduceGroups((x, y) => if (x.TotalValue > y.TotalValue) x else y)
Dua metode terakhir dapat memanfaatkan gabungan sisi peta dan tidak memerlukan pengocokan penuh sehingga sebagian besar waktu harus menunjukkan kinerja yang lebih baik dibandingkan dengan fungsi jendela dan gabungan. Tongkat ini juga dapat digunakan dengan Streaming Terstruktur dalam completed
mode keluaran.
Jangan gunakan :
df.orderBy(...).groupBy(...).agg(first(...), ...)
Ini mungkin terlihat bekerja (terutama dalam local
mode) tetapi tidak dapat diandalkan (lihat SPARK-16207 , kredit untuk Tzach Zohar untuk menghubungkan masalah JIRA yang relevan , dan SPARK-30335 ).
Catatan yang sama berlaku untuk
df.orderBy(...).dropDuplicates(...)
yang secara internal menggunakan rencana eksekusi yang setara.
Untuk Spark 2.0.2 dengan pengelompokan berdasarkan beberapa kolom:
sumber
Ini adalah persis sama zero323 's jawaban tetapi dalam SQL cara query.
Dengan asumsi bahwa dataframe dibuat dan terdaftar sebagai
Fungsi jendela:
Agregasi SQL biasa diikuti dengan bergabung:
Menggunakan pemesanan lebih dari struct:
Cara dataSets dan jangan lakukan sama dengan di jawaban asli
sumber
Polanya adalah kelompok dengan tombol => melakukan sesuatu untuk setiap kelompok misalnya mengurangi => kembali ke bingkai data
Saya pikir abstraksi Dataframe agak rumit dalam hal ini jadi saya menggunakan fungsionalitas RDD
sumber
Solusi di bawah ini hanya melakukan satu groupBy dan mengekstrak baris dataframe Anda yang berisi maxValue dalam satu kesempatan. Tidak perlu Bergabung lagi, atau Windows.
sumber
Cara yang baik untuk melakukan ini dengan api dataframe adalah menggunakan logika argmax seperti itu
sumber
Di sini Anda dapat melakukannya seperti ini -
sumber
Kita dapat menggunakan fungsi jendela rank () (di mana Anda akan memilih peringkat rank = 1) hanya menambahkan angka untuk setiap baris grup (dalam hal ini akan menjadi jam)
ini sebuah contoh. (dari https://github.com/jaceklaskowski/mastering-apache-spark-book/blob/master/spark-sql-functions.adoc#rank )
sumber