Type mismatch error in scala

+2 votes
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.functions.{col, lit, when}
import org.apache.spark.sql.types._
import org.apache.spark.storage.StorageLevel._
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.expressions.Window

object accounts_ver_creation  
{  
def main(args: Array[String])  
{  
val conf = new SparkConf()  
val sc=new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
val high_date = "9999-12-31 00:00:00"
val df_source_main = sqlContext.sql("SELECT * FROM sqoop.accounts")
val df_source_aud = sqlContext.sql("SELECT * FROM sqoop.accounts_aud")
val df_u = df_source_aud.filter(col("action_code") === "U")
val df_d = df_source_aud.filter(col("action_code") === "D")
val raw_column_names = df_source_main.columns
val column_names = raw_column_names  ++ Array("is_current", "is_updated", "is_deleted", "start_date", "enddate","tot")
val res = ddf_source_main.withColumn("tot",df_source_main.groupBy("account_number").agg(count("account_number"))).withColumn("ver",df_source_aud.groupBy("account_number").agg(count("account_number"))).select(column_names.map(col): _*)

val df_target_main = df_source_main.withColumn("start_date", df_source_main("update_date")).withColumn("enddate", lit(high_date)).withColumn("is_current", lit(1)).withColumn("is_updated", lit(false)).withColumn("is_deleted", lit(false)).select(column_names.map(col): _*)
val df_target_u = df_d.withColumn("enddate", df_u("end_date")).withColumn("start_date", df_u("create_date")).withColumn("is_current", lit(0)).withColumn("is_deleted", lit(false)).select(column_names.map(col): _*)
val df_target_d = df_u.withColumn("enddate", df_d("end_update")).withColumn("start_date", df_d("update_date")).withColumn("is_current", lit(0)).withColumn("is_deleted", lit(true)).select(column_names.map(col): _*)
val df_merge = df_target_main.unionAll(df_target_u).unionAll(df_target_d)  
val df_merge_final = df_merge.withColumn("version_key", df_source_main.groupBy("account_number").agg(count("account_number").as("total")).join(df_source_aud.groupBy("account_number").agg(count("account_number").as("version_key")), Seq("account_number"), "left").na.fill(0).orderBy(col("version_key").desc))
sqlContext.sql("DROP TABLE IF EXISTS sqoop.accounts_ver")
df_merge_final.write.format("orc").option("path", "/user/hive/warehouse/sqoop.db/accounts_ver").mode("overwrite").saveAsTable("sqoop.accounts_ver")
}
}

Error Message:

<console>:240: error: type mismatch;
 found   : org.apache.spark.sql.DataFrame
 required: org.apache.spark.sql.Column
Aug 16, 2019 in Apache Spark by anonymous
12,400 views
Which line are you getting this error on? The error tells at some point in the code you are trying to use the Dataframe but you have to use a Column

1 answer to this question.

+2 votes

Hello,

Your problem is here:

val df_merge_final = df_merge
.withColumn("version_key", df_source_main.groupBy("account_number").agg(count("account_number").as("total")).join(df_source_aud.groupBy("account_number").agg(count("account_number").as("version_key")), Seq("account_number"), "left").na.fill(0).orderBy(col("version_key").desc))

You are trying to put an entire dataframe into a column, this is not allowed. If you want to get last version key you need to process that dataframe and collect the last row and after that put the value into the column.Tell me if you want an example.

Also: Try to use aliases, your code it's going to look prettier 

Hope this helps!

If you need to know more about Scala, join Scala course today and become the expert.

Thanks!!

answered Dec 13, 2019 by Alexandru
• 510 points

Related Questions In Apache Spark

0 votes
1 answer
0 votes
1 answer

Getting error while connecting zookeeper in Kafka - Spark Streaming integration

I guess you need provide this kafka.bootstrap.servers ...READ MORE

answered May 24, 2018 in Apache Spark by Shubham
• 13,490 points
2,799 views
0 votes
1 answer
0 votes
1 answer

Error reading avro dataset in spark

For avro, you need to download and ...READ MORE

answered Feb 4, 2019 in Apache Spark by Omkar
• 69,220 points
2,248 views
+1 vote
2 answers
+1 vote
1 answer

Hadoop Mapreduce word count Program

Firstly you need to understand the concept ...READ MORE

answered Mar 16, 2018 in Data Analytics by nitinrawat895
• 11,380 points
11,027 views
0 votes
1 answer

hadoop.mapred vs hadoop.mapreduce?

org.apache.hadoop.mapred is the Old API  org.apache.hadoop.mapreduce is the ...READ MORE

answered Mar 16, 2018 in Data Analytics by nitinrawat895
• 11,380 points
2,535 views
+2 votes
11 answers

hadoop fs -put command?

Hi, You can create one directory in HDFS ...READ MORE

answered Mar 16, 2018 in Big Data Hadoop by nitinrawat895
• 11,380 points
108,829 views
+1 vote
1 answer

Cannot resolve Error In Spark when filter records with two where condition

Try df.where($"cola".isNotNull && $"cola" =!= "" && !$"colb".isin(2,3)) your ...READ MORE

answered Dec 13, 2019 in Apache Spark by Alexandru
• 510 points

edited Dec 13, 2019 by Alexandru 2,677 views
webinar REGISTER FOR FREE WEBINAR X
REGISTER NOW
webinar_success Thank you for registering Join Edureka Meetup community for 100+ Free Webinars each month JOIN MEETUP GROUP