Mengganti nama kolom DataFrame di Spark Scala

93

Saya mencoba untuk mengonversi semua nama header / kolom a DataFramedi Spark-Scala. sampai sekarang saya datang dengan kode berikut yang hanya menggantikan satu nama kolom.

for( i <- 0 to origCols.length - 1) {
  df.withColumnRenamed(
    df.columns(i), 
    df.columns(i).toLowerCase
  );
}
Sam
sumber

Jawaban:

239

Jika strukturnya datar:

val df = Seq((1L, "a", "foo", 3.0)).toDF
df.printSchema
// root
//  |-- _1: long (nullable = false)
//  |-- _2: string (nullable = true)
//  |-- _3: string (nullable = true)
//  |-- _4: double (nullable = false)

hal paling sederhana yang dapat Anda lakukan adalah menggunakan toDFmetode:

val newNames = Seq("id", "x1", "x2", "x3")
val dfRenamed = df.toDF(newNames: _*)

dfRenamed.printSchema
// root
// |-- id: long (nullable = false)
// |-- x1: string (nullable = true)
// |-- x2: string (nullable = true)
// |-- x3: double (nullable = false)

Jika Anda ingin mengganti nama kolom individu, Anda dapat menggunakan selectdengan alias:

df.select($"_1".alias("x1"))

yang dapat dengan mudah digeneralisasikan ke beberapa kolom:

val lookup = Map("_1" -> "foo", "_3" -> "bar")

df.select(df.columns.map(c => col(c).as(lookup.getOrElse(c, c))): _*)

atau withColumnRenamed:

df.withColumnRenamed("_1", "x1")

yang digunakan dengan foldLeftuntuk mengganti nama beberapa kolom:

lookup.foldLeft(df)((acc, ca) => acc.withColumnRenamed(ca._1, ca._2))

Dengan struktur bersarang ( structs), satu opsi yang memungkinkan adalah mengganti nama dengan memilih seluruh struktur:

val nested = spark.read.json(sc.parallelize(Seq(
    """{"foobar": {"foo": {"bar": {"first": 1.0, "second": 2.0}}}, "id": 1}"""
)))

nested.printSchema
// root
//  |-- foobar: struct (nullable = true)
//  |    |-- foo: struct (nullable = true)
//  |    |    |-- bar: struct (nullable = true)
//  |    |    |    |-- first: double (nullable = true)
//  |    |    |    |-- second: double (nullable = true)
//  |-- id: long (nullable = true)

@transient val foobarRenamed = struct(
  struct(
    struct(
      $"foobar.foo.bar.first".as("x"), $"foobar.foo.bar.first".as("y")
    ).alias("point")
  ).alias("location")
).alias("record")

nested.select(foobarRenamed, $"id").printSchema
// root
//  |-- record: struct (nullable = false)
//  |    |-- location: struct (nullable = false)
//  |    |    |-- point: struct (nullable = false)
//  |    |    |    |-- x: double (nullable = true)
//  |    |    |    |-- y: double (nullable = true)
//  |-- id: long (nullable = true)

Perhatikan bahwa ini dapat memengaruhi nullabilitymetadata. Kemungkinan lain adalah mengganti nama dengan casting:

nested.select($"foobar".cast(
  "struct<location:struct<point:struct<x:double,y:double>>>"
).alias("record")).printSchema

// root
//  |-- record: struct (nullable = true)
//  |    |-- location: struct (nullable = true)
//  |    |    |-- point: struct (nullable = true)
//  |    |    |    |-- x: double (nullable = true)
//  |    |    |    |-- y: double (nullable = true)

atau:

import org.apache.spark.sql.types._

nested.select($"foobar".cast(
  StructType(Seq(
    StructField("location", StructType(Seq(
      StructField("point", StructType(Seq(
        StructField("x", DoubleType), StructField("y", DoubleType)))))))))
).alias("record")).printSchema

// root
//  |-- record: struct (nullable = true)
//  |    |-- location: struct (nullable = true)
//  |    |    |-- point: struct (nullable = true)
//  |    |    |    |-- x: double (nullable = true)
//  |    |    |    |-- y: double (nullable = true)
nol323
sumber
Hai @ zero323 Saat menggunakan withColumnRenamed saya mendapatkan AnalysisException tidak dapat menyelesaikan 'CC8. 1 'kolom masukan yang diberikan ... Gagal meskipun CC8.1 tersedia di DataFrame, harap pandu.
unk1102
@ u449355 Tidak jelas bagi saya apakah ini kolom bersarang atau yang berisi titik. Dalam kasus selanjutnya, backticks harus berfungsi (setidaknya dalam beberapa kasus dasar).
zero323
1
apa : _*)artinya didf.select(df.columns.map(c => col(c).as(lookup.getOrElse(c, c))): _*)
Anton Kim
1
Untuk menjawab pertanyaan Anton Kim: : _*adalah scala yang disebut operator "percikan". Ini pada dasarnya meledakkan hal yang mirip larik ke dalam daftar yang tidak terkendali, yang berguna saat Anda ingin meneruskan larik ke fungsi yang menggunakan sejumlah argumen, tetapi tidak memiliki versi yang membutuhkan List[]. Jika Anda sama sekali tidak asing dengan Perl, itulah perbedaan antara some_function(@my_array) # "splatted"dan some_function(\@my_array) # not splatted ... in perl the backslash "\" operator returns a reference to a thing.
Mylo Stone
1
Pernyataan ini sangat tidak jelas bagi saya df.select(df.columns.map(c => col(c).as(lookup.getOrElse(c, c))): _*).. Bisakah Anda menguraikannya? terutama lookup.getOrElse(c,c)bagiannya.
Aetos
19

Bagi Anda yang tertarik dengan versi PySpark (sebenarnya sama di Scala - lihat komentar di bawah):

    merchants_df_renamed = merchants_df.toDF(
        'merchant_id', 'category', 'subcategory', 'merchant')

    merchants_df_renamed.printSchema()

Hasil:

root
| - merchant_id: integer (nullable = true)
| - kategori: string (nullable = true)
| - subkategori: string (nullable = true)
| - merchant: string (nullable = true)

Tagar
sumber
1
Dengan menggunakan toDF()untuk mengganti nama kolom di DataFrame harus berhati-hati. Metode ini bekerja lebih lambat dari yang lain. Saya memiliki DataFrame berisi 100 juta catatan dan permintaan hitungan sederhana di atasnya mengambil ~ 3s, sedangkan permintaan yang sama dengan toDF()metode take ~ 16s. Tetapi ketika menggunakan select col AS col_newmetode untuk mengganti nama saya mendapatkan ~ 3s lagi. Lebih dari 5 kali lebih cepat! Spark 2.3.2.3
Ihor Konovalenko
6
def aliasAllColumns(t: DataFrame, p: String = "", s: String = ""): DataFrame =
{
  t.select( t.columns.map { c => t.col(c).as( p + c + s) } : _* )
}

Jika tidak jelas, ini menambahkan prefiks dan sufiks ke setiap nama kolom saat ini. Ini dapat berguna ketika Anda memiliki dua tabel dengan satu atau lebih kolom memiliki nama yang sama, dan Anda ingin menggabungkannya tetapi masih dapat menghilangkan ambiguitas kolom dalam tabel resultan. Pasti akan menyenangkan jika ada cara serupa untuk melakukan ini dalam SQL "normal".

Mylo Stone
sumber
pasti menyukainya, bagus dan elegan
theblu elephantom
1

Misalkan df dataframe memiliki 3 kolom id1, name1, price1 dan Anda ingin mengganti namanya menjadi id2, name2, price2

val list = List("id2", "name2", "price2")
import spark.implicits._
val df2 = df.toDF(list:_*)
df2.columns.foreach(println)

Saya menemukan pendekatan ini berguna dalam banyak kasus.

Jagadeesh Verri
sumber
0

tow table join tidak mengubah nama kunci yang digabungkan

// method 1: create a new DF
day1 = day1.toDF(day1.columns.map(x => if (x.equals(key)) x else s"${x}_d1"): _*)

// method 2: use withColumnRenamed
for ((x, y) <- day1.columns.filter(!_.equals(key)).map(x => (x, s"${x}_d1"))) {
    day1 = day1.withColumnRenamed(x, y)
}

berhasil!

Colin Wang
sumber
0
Sometime we have the column name is below format in SQLServer or MySQL table

Ex  : Account Number,customer number

But Hive tables do not support column name containing spaces, so please use below solution to rename your old column names.

Solution:

val renamedColumns = df.columns.map(c => df(c).as(c.replaceAll(" ", "_").toLowerCase()))
df = df.select(renamedColumns: _*)
R. RamkumarYugo
sumber