How to create new column with function in Spark Dataframe

+2 votes

I'm trying to figure out the new dataframe API in Spark. I am facing an issue here that I have a dataframe with 2 columns, "ID" and "Amount". As a generic example, say I want to return a new column called "code" that returns a code based on the value of "Amt". I can write a function something like this:

def coder(myAmt:Integer):String {
  if (myAmt > 100) "Little"
  else "Big"
}

When I try to use it like this:

val DF = sqlContext.parquetFile("hdfs://temp/file.parquet")

DF.withColumn("Code", coder(DF("Amt")))
I get type mismatch errors

found   : org.apache.spark.sql.Column
required: Integer

 

I've tried changing the input type on my function to org.apache.spark.sql.Column but I then I start getting errors with the function compiling because it wants a boolean in the if statement.

Am I doing this wrong? Is there another way to do this than using withColumn?

Thanks in advance.

Jun 26, 2018 in Apache Spark by coldcode
• 2,090 points
88,751 views

14 answers to this question.

+1 vote

Let's say you have "Amt" column in your Schema:

import org.apache.spark.sql.functions._
val DF = sqlContext.parquetFile("hdfs://temp/file.parquet")
val coder: (Int => String) = (arg: Int) => {if (arg < 100) "little" else "big"}
val sqlfunc = udf(coder)
DF.withColumn("Code", sqlfunc(col("Amt")))
I guess withColumn is the right way to add a column.

You can even check out the details of a successful Spark developers with the Pyspark online training

answered Jun 26, 2018 by nitinrawat895
• 11,380 points
This worked for me as I am supposed to replace the $ from the numeric values to get the sum. The replace function was not working for replacing the $ in Databricks DataFrame. Thanks the UDF worked for me.
Hello @yaadhu! Please do register at Edureka Community and upvote the answer if it has helped you.

Thanks!
0 votes
df = sqlContext.createDataFrame(
    [(1, "a", 25.0), (2, "B", -25.0)], ("c1", "c2", "c3"))

from pyspark.sql.functions import udf
from pyspark.sql.types import *

def valueToCategory(value):
   if   value == 1: return 1
   elif value == 2: return 2
   ...
   else: return 0

# NOTE: it seems that calls to udf() must be after SparkContext() is called
udfValueToCategory = udf(valueToCategory, StringType())
df_with_cat = df.withColumn("category", udfValueToCategory("c1"))
answered Dec 7, 2018 by Rahul
0 votes
df.select('*', (df.column_name + 10).alias('new_column'))
answered Dec 7, 2018 by Raj
0 votes
new_col = []
for column in COLUMN_LIST:
    if column in df.columns:
        new_col.append(column)
    else:
        new_col.append(lit(None).cast(StringType()).alias('{0}'.format(column)))

df = df.select(new_col)
answered Dec 7, 2018 by Lakheer
0 votes

You can do it using udf:

a = F.udf(lambda :yourstring,StringType())
a.select(a().alias('new_column')
answered Dec 7, 2018 by Goutam
0 votes
creator = udf(
    lambda val: val,
    StringType()
)
df.withColumn('new_col_name', creator(df.old_col))
answered Dec 7, 2018 by Manoj
0 votes
DF.withColumn("new_col", DF.col("old_col") + 10)
answered Dec 7, 2018 by Vinod
0 votes
import org.apache.spark.sql.functions.lit
    val addColumn :(String)=>String=(data:String)=>{data}
    val ColUDF= udf(addColumn)
     val output = inputDataFrame.withColumn("Name",ColUDF(lit("abcde")))
answered Dec 7, 2018 by Ashok
–1 vote
Dataset<Row> ​newDs = ds.withColumn("new_col",functions.lit(1));

answered Dec 7, 2018 by Lalit
Broken Link - link is not available
0 votes
val df2 = dataFrame .withColumn("F", lit("foo")) .select("F", "A", "B", "C", "D", "E")
answered Dec 7, 2018 by Suman
+3 votes
val coder: (Int => String) = v => if (v > 100) "Big" else "Small"
import org.apache.spark.sql.functions.udf
val coder_udf = udf(coder)
DF.withColumn("Code", coder_udf( DF.col("Amt")))
answered Apr 5, 2019 by anonymous

edited Apr 5, 2019 by Omkar
0 votes
You don't even need to create a function. you can use the when method

DF.withColumn("Code",  when(DF("Amt") > 100 ,"Little").otherwise("Big"))
answered Sep 12, 2020 by Gopalakrishnan
• 170 points
0 votes

Hi,

withColumn() is used to add a new or update an existing column on DataFrame, here, we will see, how to add a new column by using an existing column. The withColumn() function takes two arguments, the first argument is the name of the new column and the second argument is the value of the column in Column type. 

df.withColumn("CopiedColumn",col("salary")* -1) .show(false)
answered Dec 15, 2020 by MD
• 95,460 points
0 votes

Spark withColumn() is a DataFrame function that is used to add a new column to DataFrame, change the value of an existing column, convert the datatype of a column, derive a new column from an existing column, on this post, I will walk you through commonly used DataFrame column operations with Scala examples.

First, let’s create a simple DataFrame to work with.

  import spark.sqlContext.implicits._

  val data = Seq(("111",50000),("222",60000),("333",40000))
  val df = data.toDF("EmpId","Salary")
  df.show(false)

Yields below output

+-----+------+
|EmpId|Salary|
+-----+------+
|111  |50000 |
|222  |60000 |
|333  |40000 |
+-----+------+

Using withColumn() to Add a New Column

withColumn() is used to add a new or update an existing column on DataFrame, here, I will just explain how to add a new column by using an existing column. withColumn() function takes two arguments, the first argument is the name of the new column and the second argument is the value of the column in Column type.

  //Derive a new column from existing
  df.withColumn("CopiedColumn",col("salary")* -1)
    .show(false)

Here, we have added a new column CopiedColumn by multiplying -1 with an existing column Salary. This yields the below output.

+-----+------+------------+
|EmpId|Salary|CopiedColumn|
+-----+------+------------+
|111  |50000 |-50000      |
|222  |60000 |-60000      |
|333  |40000 |-40000      |
+-----+------+------------+

You can also add columns based on some conditions, please refer to Spark Case When and When Otherwise examples

Using Select to Add Column

The above statement can also be written using select() as below and this yields the same as the above output. You can also add multiple columns using select.


answered Feb 19, 2021 by bathina
• 140 points

Related Questions In Apache Spark

+1 vote
1 answer
0 votes
1 answer

How to create a not null column in case class in spark

Hi@Deepak, In your test class you passed empid ...READ MORE

answered May 14, 2020 in Apache Spark by MD
• 95,460 points
5,058 views
0 votes
1 answer
0 votes
1 answer

How to work with Matrix Multiplication in Apache Spark?

Hey, You can follow this below solution for ...READ MORE

answered Jul 31, 2019 in Apache Spark by Gitika
• 65,770 points
7,786 views
0 votes
1 answer

what is Paired RDD and how to create paired RDD in Spark?

Hi, Paired RDD is a distributed collection of ...READ MORE

answered Aug 2, 2019 in Apache Spark by Gitika
• 65,770 points
9,521 views
0 votes
1 answer

How to create paired RDD using subString method in Spark?

Hi, If you have a file with id ...READ MORE

answered Aug 2, 2019 in Apache Spark by Gitika
• 65,770 points
2,707 views
+1 vote
2 answers
0 votes
1 answer

How to convert rdd object to dataframe in spark

SqlContext has a number of createDataFrame methods ...READ MORE

answered May 30, 2018 in Apache Spark by nitinrawat895
• 11,380 points
3,952 views
+1 vote
8 answers

How to replace null values in Spark DataFrame?

Hi, In Spark, fill() function of DataFrameNaFunctions class is used to replace ...READ MORE

answered Dec 15, 2020 in Apache Spark by MD
• 95,460 points
75,354 views
0 votes
1 answer

Different Spark Ecosystem

Spark has various components: Spark SQL (Shark)- for ...READ MORE

answered Jun 4, 2018 in Apache Spark by kurt_cobain
• 9,350 points
870 views
webinar REGISTER FOR FREE WEBINAR X
REGISTER NOW
webinar_success Thank you for registering Join Edureka Meetup community for 100+ Free Webinars each month JOIN MEETUP GROUP