Spark, The word itself is enough to generate a spark in every Hadoop engineer’s mind. An in-memory processing tool which is lightning-fast in cluster computing. Compared to MapReduce, the in-memory data sharing makes RDDs 10-100x faster than network and disk sharing and all this is possible because of RDDs (Resilient Distributed Data sets). The key points we focus today in this RDD using Spark article are:
- Need for RDDs
- What are RDDs?
- Features of RDDs
- Creation of RDDs using Spark
- Operations performed on RDDs
- RDDs using Spark: Pokemon Use Case
Need for RDDs?
The world is evolving with Artificial Intelligence and Data Science because of the advancement in Machine Learning. Algorithms based on Regression, Clustering, and Classification which runs on Distributed Iterative Computation fashion that includes Reusing and Sharing of data among multiple computing units.
The traditional MapReduce techniques needed a Stable Intermediate and Distributed storage like HDFS comprising repetitive computations with data replications and data serialization, which made the process a lot slower. Finding a solution was never easy.
This is where RDDs (Resilient Distributed Datasets) comes to the big picture.
RDDs are easy to use and effortless to create as data is imported from data sources and dropped into RDDs. Further, the operations are applied to process them. They are a distributed collection of memory with permissions as Read-only and most importantly, they are Fault-tolerant.
If any data partition of an RDD is lost, it can be regenerated by applying the same transformation operation on that lost partition in lineage, rather than processing all the data from scratch. This kind of approach in real time scenarios can make miracles happen in situations of data loss or when a system is down.
What are RDDs?
RDD or (Resilient Distributed Data set) is a fundamental data structure in Spark. The term Resilient defines the ability that generates the data automatically or data rolling back to the original state when an unexpected calamity occurs with a probability of data loss.
The data written into RDDs is partitioned and stored into multiple executable nodes. If an executing node fails in the run time, then it instantly gets the back up from the next executable node. This is why RDDs are considered as an advanced type of data structures when compared to other traditional data structures. RDDs can store structured, unstructured and semi-structured data.
Let’s move ahead with our RDD using Spark blog and learn about the unique features of RDDs which gives it an edge over other types of data structures.
Features of RDD
- In-Memory(RAM) Computations: The concept of In-Memory computation takes the data processing to a faster and efficient stage where the overall performance of the system is upgraded.
- Lazy Evaluation: The term Lazy evaluation says the transformations are applied to the data in RDD, but the output is not generated. Instead, the applied transformations are logged.
- Persistence: The resultant RDDs are always reusable.
- Coarse-Grained Operations: The user can apply transformations to all elements in data sets through map, filter or group by operations.
- Fault Tolerant: If there is a loss of data, the system can roll back to its original state by using the logged transformations.
- Immutability: Data defined, retrieved or created cannot be changed once it is logged into the system. In case if you need to access and modify the existing RDD, you must create a new RDD by applying a set of Transformation functions on to the current or preceding RDD.
- Partitioning: It is the crucial unit of parallelism in Spark RDD. By default, the number of partitions created is based on your data source. You can even decide the number of partitions you wish to make using custom partition functions.
Creation of RDD using Spark
RDDs can be created in three ways:
- Reading data from parallelized collections
val PCRDD = spark.sparkContext.parallelize(Array("Mon","Tue","Wed","Thu","Fri","Sat"),2) val resultRDD = PCRDD.collect() resultRDD.collect().foreach(println)
- Applying transformation on previous RDDs
val words = spark.sparkContext.parallelize(Seq("Spark","is","a","very","powerful","language")) val wordpair = words.map(w =(w.charAt(0),w)) wordpair.collect().foreach(println)
val Sparkfile = spark.read.textFile("/user/edureka_566977/spark/spark.txt.") Sparkfile.collect()
Operations performed on RDDs:
There are mainly two types of operations which are performed on RDDs, namely:
- Transformations
- Actions
Transformations: The operations we apply on RDDs to filter, access and modify the data in parent RDD to generate a successive RDD is called transformation. The new RDD returns a pointer to the previous RDD ensuring the dependency between them.
Transformations are Lazy Evaluations, in other words, the operations applied on the RDD that you are working will be logged but not executed. The system throws a result or exception after triggering the Action.
We can divide transformations into two types as below:
- Narrow Transformations
- Wide Transformations
Narrow Transformations We apply narrow transformations on to a single partition of the parent RDD to generate a new RDD as data required to process the RDD is available on a single partition of the parent RDD. The examples for narrow transformations are:
- map()
- filter()
- flatMap()
- partition()
- mapPartitions()
Wide Transformations: We apply the wide transformation on multiple partitions to generate a new RDD. The data required to process the RDD is available on the multiple partitions of the parent RDD. The examples for wide transformations are :
- reduceBy()
- union()
Actions: Actions instruct Apache Spark to apply computation and pass the result or an exception back to the driver RDD. Few of the actions include:
- collect()
- count()
- take()
- first()
Let us practically apply the operations on RDDs:
IPL(Indian Premier League) is a cricket tournament with it’s hipe at a peak level. So, lets today get our hands on to the IPL data set and execute our RDD using Spark.
- Firstly, let’s download a CSV match data of IPL. After downloading it, it starts to look as an EXCEL file with rows and columns.
In the next step, we fire up the spark and load the matches.csv file from its location, in my case my csv file location is “/user/edureka_566977/test/matches.csv”
Now let us Start with the Transformation part first:
- map():
We use Map Transformation to apply a specific transformation operation on every element of an RDD. Here we create an RDD by name CKfile where store our csv file. We shall create another RDD called States to store the city details.
spark2-shell val CKfile = sc.textFile("/user/edureka_566977/test/matches.csv") CKfile.collect.foreach(println) val states = CKfile.map(_.split(",")(2)) states.collect().foreach(println)
- filter():
Filter transformation, the name itself describes its use. We use this transformation operation to filter out the selective data out of a collection of data given. We apply filter operation here to get the records of the IPL matches of the year 2017 and store it in fil RDD.
val fil = CKfile.filter(line => line.contains("2017")) fil.collect().foreach(println)
- flatMap():
We apply flatMap is a transformation operation to each of the elements of an RDD to create a newRDD. It is similar to Map transformation. here we apply Flatmap to spit out the matches of Hyderabad city and store the data into filRDD RDD.
val filRDD = fil.flatMap(line => line.split("Hyderabad")).collect()
- partition():
Every data we write into an RDD is split into a certain number of partitions. We use this transformation to find the number of partitions the data is actually split into.
val fil = CKfile.filter(line => line.contains("2017")) fil.partitions.size
- mapPartitions():
We consider MapPatitions as an alternative of Map() and foreach() together. We use mapPartitions here to find the number of rows we have in our fil RDD.
val fil = CKfile.filter(line => line.contains("2016")) fil.mapPartitions(idx => Array(idx.size).iterator).collect
- reduceBy():
We use ReduceBy() on Key-Value pairs. We used this transformation on our csv file to find the player with the highest Man of the matches.
val ManOfTheMatch = CKfile.map(_.split(",")(13)) val MOTMcount = ManOfTheMatch.map(WINcount => (WINcount,1)) val ManOTH = MOTMcount.reduceByKey((x,y) => x+y).map(tup => (tup._2,tup._1))sortByKey(false) ManOTH.take(10).foreach(println)
- union():
The name explains it all, We use union transformation is to club two RDDs together. Here we are creating two RDDs namely fil and fil2. fil RDD contains the records of 2017 IPL matches and fil2 RDD contains 2016 IPL match record.
val fil = CKfile.filter(line => line.contains("2017")) val fil2 = CKfile.filter(line => line.contains("2016")) val uninRDD = fil.union(fil2)
Let us start with the Action part where we show actual output:
- collect():
Collect is the action which we use to display the contents in an RDD.
val CKfile = sc.textFile("/user/edureka_566977/test/matches.csv") CKfile.collect.foreach(println)
- count():
Count is an action that we use to count the number of records present in an RDD.Here we are using this operation to count the total number of records in our matches.csv file.
val CKfile = sc.textFile("/user/edureka_566977/test/matches.csv") CKfile.count()
- take():
Take is an Action operation similar to collect but the only difference is it can print any selective number of rows as per user request. Here we apply the following code to print the top ten leading reports.
val statecountm = Scount.reduceByKey((x,y) => x+y).map(tup => (tup._2,tup._1))sortByKey(false) statecountm.collect().foreach(println) statecountm.take(10).foreach(println)
- first():
First() is an action operation similar to collect() and take() it used to print the topmost report s the output Here we use the first() operation to find the maximum number of matches played in a particular city and we get Mumbai as the output.
val CKfile = sc.textFile("/user/edureka_566977/test/matches.csv") val states = CKfile.map(_.split(",")(2)) val Scount = states.map(Scount => (Scount,1)) scala> val statecount = Scount.reduceByKey((x,y)=> x+y).collect.foreach(println) Scount.reduceByKey((x,y)=> x+y).collect.foreach(println) val statecountm = Scount.reduceByKey((x,y)=> x+y).map(tup => (tup._2,tup._1))sortByKey(false) statecountm.first()
Redefine your data analytics workflow and unleash the true potential of big data with Pyspark Certification.
To make our process our learning RDD using Spark, even more, interesting, I have come up with an interesting use case.
RDD using Spark: Pokemon Use Case
- Firstly, Let us download a Pokemon.csv file and load it to the spark-shell as we did to the Matches.csv file.
val PokemonDataRDD1 = sc.textFile("/user/edureka_566977/PokemonFile/PokemonData.csv") PokemonDataRDD1.collect().foreach(println)
Pokemons are actually available in a large variety, Let us find a few varieties.
- Removing schema from Pokemon.csv file
We might not need the Schema of Pokemon.csv file. Hence, we remove it.
val Head = PokemonDataRDD1.first() val NoHeader = PokemonDataRDD1.filter(line => !line.equals(Head))
- Finding the number of partitions our pokemon.csv is distributed into.
println("No.ofpartitions="+NoHeader.partitions.size)
- Water Pokemon
Finding the number of Water pokemon
val WaterRDD = PokemonDataRDD1.filter(line => line.contains("Water")) WaterRDD.collect().foreach(println)
- Fire Pokemon
Finding the number of Fire pokemon
val FireRDD = PokemonDataRDD1.filter(line => line.contains("Fire")) FireRDD.collect().foreach(println)
- We can also detect the population of a different type of pokemon using the count function
WaterRDD.count() FireRDD.count()
- Since I like the game of defensive strategy let us find the pokemon with maximum defence.
val defenceList = NoHeader.map{x => x.split(',')}.map{x => (x(6).toDouble)} println("Highest_Defence : "+defenceList.max())
- We know the maximum defense strength value but we don’t know which pokemon is it. so, let us find which is that pokemon.
val defWithPokemonName = NoHeader.map{x => x.split(',')}.map{x => (x(6).toDouble,x(1))} val MaxDefencePokemon = defWithPokemonName.groupByKey.takeOrdered(1)(Ordering[Double].reverse.on(_._1)) MaxDefencePokemon.foreach(println)
- Now let us sort out the pokemon with least Defence
val minDefencePokemon = defenceList.distinct.sortBy(x => x.toDouble,true,1) minDefencePokemon.take(5).foreach(println)
- Now let us see the Pokemon with a less defensive strategy.
val PokemonDataRDD2 = sc.textFile("/user/edureka_566977/PokemonFile/PokemonData.csv") val Head2 = PokemonDataRDD2.first() val NoHeader2 = PokemonDataRDD2.filter(line => !line.equals(Head)) val defWithPokemonName2 = NoHeader2.map{x => x.split(',')}.map{x => (x(6).toDouble,x(1))} val MinDefencePokemon2 = defWithPokemonName2.groupByKey.takeOrdered(1)(Ordering[Double].on(_._1)) MinDefencePokemon2.foreach(println)
So, with this, we come to an end of this RDD using Spark article. I hope we sparked a little light upon your knowledge about RDDs, their features and the various types of operations that can be performed on them.
This article based on Apache Spark and Scala Certification Training is designed to prepare you for the Cloudera Hadoop and Spark Developer Certification Exam (CCA175). You will get an in-depth knowledge on Apache Spark and the Spark Ecosystem, which includes Spark RDD, Spark SQL, Spark MLlib and Spark Streaming. You will get comprehensive knowledge on Scala Programming language, HDFS, Sqoop, Flume, Spark GraphX and Messaging System such as Kafka.