Kolom gabungan di Apache Spark DataFrame

116

Bagaimana kita menggabungkan dua kolom di Apache Spark DataFrame? Apakah ada fungsi di Spark SQL yang dapat kita gunakan?

Nipun
sumber

Jawaban:

175

Dengan SQL mentah, Anda dapat menggunakan CONCAT:

  • Dengan Python

    df = sqlContext.createDataFrame([("foo", 1), ("bar", 2)], ("k", "v"))
    df.registerTempTable("df")
    sqlContext.sql("SELECT CONCAT(k, ' ',  v) FROM df")
    
  • Di Scala

    import sqlContext.implicits._
    
    val df = sc.parallelize(Seq(("foo", 1), ("bar", 2))).toDF("k", "v")
    df.registerTempTable("df")
    sqlContext.sql("SELECT CONCAT(k, ' ',  v) FROM df")
    

Sejak Spark 1.5.0 Anda dapat menggunakan concatfungsi dengan DataFrame API:

  • Dengan Python:

    from pyspark.sql.functions import concat, col, lit
    
    df.select(concat(col("k"), lit(" "), col("v")))
    
  • Di Scala:

    import org.apache.spark.sql.functions.{concat, lit}
    
    df.select(concat($"k", lit(" "), $"v"))
    

Ada juga concat_wsfungsi yang mengambil pemisah string sebagai argumen pertama.

nol323
sumber
46

Inilah cara Anda dapat melakukan penamaan khusus

import pyspark
from pyspark.sql import functions as sf
sc = pyspark.SparkContext()
sqlc = pyspark.SQLContext(sc)
df = sqlc.createDataFrame([('row11','row12'), ('row21','row22')], ['colname1', 'colname2'])
df.show()

memberi,

+--------+--------+
|colname1|colname2|
+--------+--------+
|   row11|   row12|
|   row21|   row22|
+--------+--------+

buat kolom baru dengan menggabungkan:

df = df.withColumn('joined_column', 
                    sf.concat(sf.col('colname1'),sf.lit('_'), sf.col('colname2')))
df.show()

+--------+--------+-------------+
|colname1|colname2|joined_column|
+--------+--------+-------------+
|   row11|   row12|  row11_row12|
|   row21|   row22|  row21_row22|
+--------+--------+-------------+
muon
sumber
4
litmembuat kolom_
muon
34

Salah satu opsi untuk menggabungkan kolom string di Spark Scala adalah menggunakan concat.

Diperlukan untuk memeriksa nilai nol . Karena jika salah satu kolom adalah null, hasilnya akan menjadi null meskipun salah satu kolom lain memiliki informasi.

Menggunakan concatdan withColumn:

val newDf =
  df.withColumn(
    "NEW_COLUMN",
    concat(
      when(col("COL1").isNotNull, col("COL1")).otherwise(lit("null")),
      when(col("COL2").isNotNull, col("COL2")).otherwise(lit("null"))))

Menggunakan concatdan select:

val newDf = df.selectExpr("concat(nvl(COL1, ''), nvl(COL2, '')) as NEW_COLUMN")

Dengan kedua pendekatan Anda akan memiliki NEW_COLUMN yang nilainya merupakan rangkaian kolom: COL1 dan COL2 dari df asli Anda.

Ignacio Alorre
sumber
1
Saya mencoba metode Anda di pyspark tetapi tidak berhasil, peringatan "col should be Column".
Simson
@Samson maaf, saya hanya memeriksa API Scala
Ignacio Alorre
3
@IgnacioAlorre Jika Anda menggunakan concat_wsalih-alih concat, Anda dapat menghindari memeriksa NULL.
Aswath K
18

Jika Anda ingin melakukannya menggunakan DF, Anda dapat menggunakan udf untuk menambahkan kolom baru berdasarkan kolom yang ada.

val sqlContext = new SQLContext(sc)
case class MyDf(col1: String, col2: String)

//here is our dataframe
val df = sqlContext.createDataFrame(sc.parallelize(
    Array(MyDf("A", "B"), MyDf("C", "D"), MyDf("E", "F"))
))

//Define a udf to concatenate two passed in string values
val getConcatenated = udf( (first: String, second: String) => { first + " " + second } )

//use withColumn method to add a new column called newColName
df.withColumn("newColName", getConcatenated($"col1", $"col2")).select("newColName", "col1", "col2").show()
Denmark Shrestha
sumber
12

Dari Spark 2.3 ( SPARK-22771 ) Spark SQL mendukung operator penggabungan ||.

Sebagai contoh;

val df = spark.sql("select _c1 || _c2 as concat_column from <table_name>")
Krishas
sumber
10

Berikut cara lain untuk melakukan ini untuk pyspark:

#import concat and lit functions from pyspark.sql.functions 
from pyspark.sql.functions import concat, lit

#Create your data frame
countryDF = sqlContext.createDataFrame([('Ethiopia',), ('Kenya',), ('Uganda',), ('Rwanda',)], ['East Africa'])

#Use select, concat, and lit functions to do the concatenation
personDF = countryDF.select(concat(countryDF['East Africa'], lit('n')).alias('East African'))

#Show the new data frame
personDF.show()

----------RESULT-------------------------

84
+------------+
|East African|
+------------+
|   Ethiopian|
|      Kenyan|
|     Ugandan|
|     Rwandan|
+------------+
Teddy Belay
sumber
7

Berikut adalah saran jika Anda tidak mengetahui nomor atau nama kolom di Dataframe.

val dfResults = dfSource.select(concat_ws(",",dfSource.columns.map(c => col(c)): _*))
wones0120
sumber
4

concat (* cols)

v1.5 dan lebih tinggi

Menggabungkan beberapa kolom input menjadi satu kolom. Fungsi ini bekerja dengan string, biner, dan kolom array yang kompatibel.

Misalnya: new_df = df.select(concat(df.a, df.b, df.c))


concat_ws (sep, * cols)

v1.5 dan lebih tinggi

Mirip dengan concattetapi menggunakan pemisah yang ditentukan.

Misalnya: new_df = df.select(concat_ws('-', df.col1, df.col2))


map_concat (* cols)

v2.4 dan lebih tinggi

Digunakan untuk menggabungkan peta, mengembalikan gabungan semua peta yang diberikan.

Misalnya: new_df = df.select(map_concat("map1", "map2"))


Menggunakan operator string concat ( ||):

v2.3 dan lebih tinggi

Misalnya: df = spark.sql("select col_a || col_b || col_c as abc from table_x")

Referensi: Spark sql doc

Ani Menon
sumber
2

Di Spark 2.3.0, Anda dapat melakukan:

spark.sql( """ select '1' || column_a from table_a """)
Charlie 木匠
sumber
1

Di Java, Anda dapat melakukan ini untuk menggabungkan beberapa kolom. Kode sampel adalah untuk memberi Anda skenario dan cara menggunakannya untuk pemahaman yang lebih baik.

SparkSession spark = JavaSparkSessionSingleton.getInstance(rdd.context().getConf());
Dataset<Row> reducedInventory = spark.sql("select * from table_name")
                        .withColumn("concatenatedCol",
                                concat(col("col1"), lit("_"), col("col2"), lit("_"), col("col3")));


class JavaSparkSessionSingleton {
    private static transient SparkSession instance = null;

    public static SparkSession getInstance(SparkConf sparkConf) {
        if (instance == null) {
            instance = SparkSession.builder().config(sparkConf)
                    .getOrCreate();
        }
        return instance;
    }
}

Kode diatas concatenated col1, col2, col3 dipisahkan dengan "_" untuk membuat kolom dengan nama "concatenatedCol".

wandermonk.dll
sumber
1

Apakah kita memiliki sintaks java yang sesuai dengan proses di bawah ini

val dfResults = dfSource.select(concat_ws(",",dfSource.columns.map(c => col(c)): _*))
Roopesh MB
sumber
0

Cara lain untuk melakukannya di pySpark menggunakan sqlContext ...

#Suppose we have a dataframe:
df = sqlContext.createDataFrame([('row1_1','row1_2')], ['colname1', 'colname2'])

# Now we can concatenate columns and assign the new column a name 
df = df.select(concat(df.colname1, df.colname2).alias('joined_colname'))
Gur
sumber
0

Memang, ada beberapa abstraksi bawaan yang bagus untuk Anda capai penggabungan tanpa perlu menerapkan fungsi khusus. Karena Anda menyebutkan Spark SQL, jadi saya menduga Anda mencoba meneruskannya sebagai perintah deklaratif melalui spark.sql (). Jika demikian, Anda dapat menyelesaikannya secara langsung dengan meneruskan perintah SQL seperti: SELECT CONCAT(col1, '<delimiter>', col2, ...) AS concat_column_name FROM <table_name>;

Selain itu, dari Spark 2.3.0, Anda dapat menggunakan perintah sesuai dengan: SELECT col1 || col2 AS concat_column_name FROM <table_name>;

Di mana, adalah pembatas pilihan Anda (bisa juga berupa ruang kosong) dan merupakan tabel sementara atau permanen yang Anda coba baca.


sumber
0

Kita juga dapat menggunakan SelectExpr dengan mudah. df1.selectExpr ("*", "upper (_2 || _3) as new")

Deepak Saxena
sumber