You have to install Intellij with scala plugin. Add sdk for scala when it asks and add jdk path.
File New --> Project
Click on Next
Now give a name and create
Now copy paste the code here and save
Now right click on the scala file ==> Run As "file name"
As of now, we do not have any proper document for jar export in IntelliJ
You can refer to the below screenshots to know about how we have done using Eclipse.
Code
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.sql._
import org.apache.spark.sql.SQLContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.functions._
import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.Window
//import org.apache.spark.sql.hive.HiveContext
case class EmpHeader(Emp_Id:Int, First_Name:String, Last_Name:String, Dept_Id:Int, Salary: Int)
case class DeptHeader(Dept_Id:Int, Dept_Name:String)
object sql1 {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("Simple Application").setMaster("local[2]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
//val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)
//val hc = new HiveContext(sc)
import sqlContext. implicits._
val EmpFile = sc.textFile("file:///home/edureka/Desktop/all-files/datsets/emp.txt")
val DeptFile = sc.textFile("file:///home/edureka/Desktop/all-files/datsets/dept.txt")
val EmpDF = EmpFile.map(x=>x.split(",")).map(x=> EmpHeader(x(0).toInt,x(1),x(2),x(3).toInt,x(4).toInt)).toDF //EmpRdd.printSchema
val DeptDF = EmpFile.map(x=>x.split(",")).map(x=> DeptHeader(x(0).toInt,x(1))).toDF
EmpDF.registerTempTable("Employee")
DeptDF.registerTempTable("Department")
//01//max_sal
sqlContext.sql("SELECT * from Employee").groupBy($"Dept_Id").agg(max("Salary").alias("max_solution")).show()
//02//rank_sal
//sqlContext.sql(" SELECT * row_number() OVER(PARTITION BY Emp_Id ORDER BY Salary DESC) AS Rank from Employee").show
//sqlContext.sql("SELECT First_Name, RANK() OVER (ORDER BY Salary) AS rank FROM Employee").show
EmpDF.orderBy(asc("Salary")).show
}}
=====================================================================================
-----------------build.sbt-------------------------
name := "Sql spark"
version := "1.0"
scalaVersion := "2.10.4"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.5.2"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.5.2"
----------------------------------------
=================================Eclipse============================================
=================================spark submit===========================
Build with sbt package
syntax
Please find attached project folder,
Please follow correct folder structure and do sbt package to build or create the jar file required for spark-submit
Project folder -> { [ src -> main -> scala source code.scala ] | [ simple.sbt ] }
Go to terminal -> cd to project folder -> do sbt package
syntax
spark-submit --class <class/object name> <complete jar path>
For example (give exact jar path as per your project)
spark-submit --class sql1 file:///home/edureka/Desktop/all-files/codes/sibanisbt/target/scala-2.10/sql-spark_2.10-1.0.jar
=================Output======================
17/11/02 07:33:39 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have all completed, from pool
17/11/02 07:33:39 INFO scheduler.DAGScheduler: ResultStage 3 (show at sql1.scala:33) finished in 3.917 s
17/11/02 07:33:39 INFO scheduler.DAGScheduler: Job 1 finished: show at sql1.scala:33, took 3.986554 s
+-------+------------+
|Dept_Id|max_solution|
+-------+------------+
| 1| 30000|
| 2| 40000|
| 3| 10000|
+-------+------------+
17/11/02 07:33:39 INFO spark.SparkContext: Starting job: show at sql1.scala:37
17/11/02 07:33:39 INFO scheduler.DAGScheduler: Got job 2 (show at sql1.scala:37) with 2 output partitions
17/11/02 07:33:39 INFO scheduler.DAGScheduler: Final stage: ResultStage 4(show at sql1.scala:37)
17/11/02 07:33:39 INFO scheduler.DAGScheduler: Parents of final stage: List()
17/11/02 07:33:39 INFO scheduler.DAGScheduler: Missing parents: List()
17/11/02 07:33:39 INFO scheduler.DAGScheduler: Submitting ResultStage 4 (MapPartitionsRDD[18] at show at sql1.scala:37), which has no missing parents
17/11/02 07:33:39 INFO storage.MemoryStore: ensureFreeSpace(5176) called with curMem=257917, maxMem=560497950
17/11/02 07:33:39 INFO storage.MemoryStore: Block broadcast_5 stored as values in memory (estimated size 5.1 KB, free 534.3 MB)
17/11/02 07:33:39 INFO storage.MemoryStore: ensureFreeSpace(2827) called with curMem=263093, maxMem=560497950
17/11/02 07:33:39 INFO storage.MemoryStore: Block broadcast_5_piece0 stored as bytes in memory (estimated size 2.8 KB, free 534.3 MB)
17/11/02 07:33:39 INFO storage.BlockManagerInfo: Added broadcast_5_piece0 in memory on localhost:60184 (size: 2.8 KB, free: 534.5 MB)
7/11/02 07:33:39 INFO spark.SparkContext: Created broadcast 5 from broadcast at DAGScheduler.scala:861
17/11/02 07:33:39 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from ResultStage 4 (MapPartitionsRDD[18] at show at sql1.scala:37)
17/11/02 07:33:39 INFO scheduler.TaskSchedulerImpl: Adding task set 4.0 with 2 tasks
17/11/02 07:33:39 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 4.0 (TID 202, localhost, PROCESS_LOCAL, 2221 bytes)
17/11/02 07:33:39 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 4.0 (TID 203, localhost, PROCESS_LOCAL, 2221 bytes)
17/11/02 07:33:39 INFO executor.Executor: Running task 0.0 in stage 4.0 (TID 202)
17/11/02 07:33:39 INFO executor.Executor: Running task 1.0 in stage 4.0 (TID 203)
17/11/02 07:33:39 INFO rdd.HadoopRDD: Input split: file:/home/edureka/Desktop/all-files/datsets/emp.txt:35+35
17/11/02 07:33:39 INFO rdd.HadoopRDD: Input split: file:/home/edureka/Desktop/all-files/datsets/emp.txt:0+35
17/11/02 07:33:39 INFO executor.Executor: Finished task 0.0 in stage 4.0 (TID 202). 3556 bytes result sent to driver
17/11/02 07:33:39 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 4.0 (TID 202) in 50 ms on localhost (1/2)
17/11/02 07:33:39 INFO executor.Executor: Finished task 1.0 in stage 4.0 (TID 203). 3476 bytes result sent to driver
17/11/02 07:33:39 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 4.0 (TID 203) in 52 ms on localhost (2/2)
17/11/02 07:33:39 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 4.0, whose tasks have all completed, from pool
17/11/02 07:33:39 INFO scheduler.DAGScheduler: ResultStage 4 (show at sql1.scala:37) finished in 0.043 s
17/11/02 07:33:39 INFO scheduler.DAGScheduler: Job 2 finished: show at sql1.scala:37, took 0.063828 s
+------+----------+---------+-------+------+
|Emp_Id|First_Name|Last_Name|Dept_Id|Salary|
+------+----------+---------+-------+------+
| 3| Honey| Sing| 3| 10000|
| 1| Guru| Randhawa| 1| 30000|
| 2| Sharry| Mann| 2| 40000|
+------+----------+---------+-------+------+
17/11/02 07:33:39 INFO spark.SparkContext: Invoking stop() from shutdown hook
17/11/02 07:33:39 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/static/sql,null}
17/11/02 07:33:39 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/SQL/execution/json,null}
17/11/02 07:33:39 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/SQL/execution,null}
17/11/02 07:33:39 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/SQL/json,null}
17/11/02 07:33:39 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/SQL,null}
17/11/02 07:33:39 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/metrics/json,null}
17/11/02 07:33:39 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage/kill,null}
17/11/02 07:33:39 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/api,null}
Hope it helps!
If you want to know more about Apache Spark Scala, It's highly recommended to go for Apache Spark certification course today.
Thanks!!