PySpark CheatSheet: Spark RDD with Python

Last updated on May 22,2019 15.5K Views
Kurt is a Big Data and Data Science Expert, working as a... Kurt is a Big Data and Data Science Expert, working as a Research Analyst at Edureka. He is keen to work with Machine Learning,...

PySpark CheatSheet: Spark RDD with Python

edureka.co

Apache Spark is one of the best frameworks when it comes to Big Data analytics. No sooner this powerful technology integrates with a simple yet efficient language like Python, it gives us an extremely handy and easy to use API called PySpark. In this cheat sheet, we are going to explore one of the building blocks of PySpark called Resilient Distributed Dataset or more popularly known as PySpark RDD.

Pyspark Cheat Sheet

Resilient Distributed Datasets (RDDs) are a distributed memory abstraction that helps a programmer to perform in-memory computations on large clusters that too in a fault-tolerant manner.

It’s one of the pioneers in the schema-less data structure, that can handle both structured and unstructured data. The in-memory data sharing makes RDDs 10-100x faster than network and disk sharing.

Initialization and Configurations

Initialization

Let’s see how to start Pyspark and enter the shell

  • Go to the folder where Pyspark is installed
  • Run the following command
$ ./sbin/start-all.sh
$ spark-shell

Now that spark is up and running, we need to initialize spark context, which is the heart of any spark application.

>>> from pyspark import SparkContext
>>> sc = SparkContext(master = 'local[2]')

Configurations

>>> from pyspark import SparkConf, SparkContext
>>> val conf = (SparkConf()
           .setMaster("local[2]")
           .setAppName("Edureka CheatSheet")
           .set("spark.executor.memory", "1g"))
>>> val sc = SparkContext(conf = conf)

Spark Context Inspection

Now once, spark context is initialized, it’s time to check if all the versions are correct or not. We need to check the default parameters being used by SparkContext.

# SparkContext Version
>>> sc.version
# Python Version
>>> sc.pythonVer
# Application Name
>>> sc.appName
# Application ID
>>> sc.applicationId 
# Master URL
>>> sc.master 
# Installed Spark Path
>>> str(sc.sparkHome) 
# Retreive Spark User Currently using SparkContext
>>> str(sc.sparkUser())
# Get default level of Parallelism
>>> sc.defaultParallelism 
# Get minimum number of Partitions
>>> sc.defaultMinPartitions

Data Loading

Creating RDDs

Once the whole environment is set up. Now it’s time to learn how to create RDD. RDD (Resilient Distributed Datasets) act as the backbone of Pyspark. There are 3 ways to create RDDs. Let’s see each one of them.

# Using Parallelized Collections
>>> rdd = sc.parallelize([('Jim',24),('Hope', 25),('Sue', 26)])
>>> rdd = sc.parallelize([('a',9),('b',7),('c',10)])
>>> num_rdd = sc.parallelize(range(1,5000))

# From other RDDs
>>> new_rdd = rdd.groupByKey()
>>> new_rdd = rdd.map(lambda x: (x,1))

# From a text File
>>> tfile_rdd = sc.textFile("/path/of_file/*.txt")

# Reading directory of Text Files
>>> tfile_rdd = sc.wholeTextFiles("/path/of_directory/")

RDD Statistics

Once we have loaded the data into RDD. We can see the statistics of that particular RDD using the following commands. We can get the mean, variance, maximum, minimum and much more.

# Maximum Value of RDD elements
>>> rdd.max()
# Minimum Value of RDD elements
>>> rdd.min()
# Mean value of RDD elements
>>> rdd.mean()
# Standard Deviation of RDD elements
>>> rdd.stdev()
# Get the Summary Statistics
Count, Mean, Stdev, Max & Min
>>> rdd.stats()

# Number of Partitions
>>> rdd.getNumPartitions()

Transformations and Actions

Transformations

These are the operations which are applied to an RDD to create a new RDD. Transformations follow the principle of Lazy Evaluations. This allows you to execute the operations at any time by just calling an action on the data.

# map
Return a new RDD by applying a function to each element of this RDD
>>> rdd = sc.parallelize(["b", "a", "c"])
>>> rdd.map(lambda x: (x, 1))
[('a', 1), ('b', 1), ('c', 1)]

# flatMap
Return a new RDD by first applying a function to all elements of this RDD, and then flattening the results.
>>> rdd = sc.parallelize([2, 3, 4])
>>> rdd.flatMap(lambda x: range(1, x))
[1, 1, 1, 2, 2, 3]

# mapPartitions
Return a new RDD by applying a function to each partition of this RDD.
>>> rdd = sc.parallelize([1, 2, 3, 4], 2)
>>> def f(iterator): yield sum(iterator)
>>> rdd.mapPartitions(f).collect()
[3, 7]

# filter
Return a new RDD containing only the elements that satisfy a predicate.
>>> rdd = sc.parallelize([1, 2, 3, 4, 5])
>>> rdd.filter(lambda x: x % 2 == 0).collect()
[2, 4]

# distinct
Return a new RDD containing the distinct elements in this RDD.
>>> sorted(sc.parallelize([1, 1, 2, 3]).distinct().collect())
[1, 2, 3]

Actions

Actions are the operations which are applied on an RDD to instruct Apache Spark to apply computation and pass the result back to the driver. Let’s have a look at a few of those actions.

# reduce
Reduces the elements of this RDD using the specified commutative 
and associative binary operator. Currently reduces partitions locally.
>>> from operator import add
>>> sc.parallelize([1, 2, 3, 4, 5]).reduce(add)
15
>>> sc.parallelize((2 for _ in range(10))).map(lambda x: 1).cache().reduce(add)
10

# count
Return the number of elements in this RDD.
>>> sc.parallelize([2, 3, 4]).count()
3

# first
Return the first element in this RDD.
>>> sc.parallelize([2, 3, 4]).first()
2

# take 
Take the first "n" num elements of the RDD.
>>> sc.parallelize([2, 3, 4, 5, 6]).cache().take(2)
[2, 3]

# countByValue
Return the count of each unique value in this RDD as a 
dictionary of (value, count) pairs.
>>> sorted(sc.parallelize([1, 2, 1, 2, 2], 2).countByValue().items())
[(1, 2), (2, 3)]

Sorting and Set Operations

Sorting and Grouping

Let’s see how we can perform Sorting on RDDs.

Set Operations

Let’s see how we can perform Set Operations on RDDs.

# sortBy
Sorts this RDD by the given keyfunc
>>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
>>> sc.parallelize(tmp).sortBy(lambda x: x[0]).collect()
[('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]

# sortByKey
Sorts this RDD, which is assumed to consist of (key, value) pairs.
>>> tmp = [('a', 1), ('b', 2), ('1', 3), ('d', 4), ('2', 5)]
>>> sc.parallelize(tmp).sortByKey(True, 1).collect()
[('1', 3), ('2', 5), ('a', 1), ('b', 2), ('d', 4)]

# groupBy
Return an RDD of grouped items.
>>> rdd = sc.parallelize([1, 1, 2, 3, 5, 8])
>>> result = rdd.groupBy(lambda x: x % 2).collect()
>>> sorted([(x, sorted(y)) for (x, y) in result])
[(0, [2, 8]), (1, [1, 1, 3, 5])]

# groupByKey 
Group the values for each key in the RDD into a single sequence.
>>> x = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
>>> map((lambda (x,y): (x, list(y))), sorted(x.groupByKey().collect()))
[('a', [1, 1]), ('b', [1])

# fold
Aggregate the elements of each partition, and then the results for 
all the partitions, using a given associative function and a neutral "zero value."
>>> from operator import add
>>> sc.parallelize([1, 2, 3, 4, 5]).fold(0, add)
15
# _add_ 
Return the union of this RDD and another one.
>>> rdd = sc.parallelize([1, 1, 2, 3])
>>> (rdd + rdd).collect()
[1, 1, 2, 3, 1, 1, 2, 3]

# subtract
Return each value in self that is not contained in other.
>>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)])
>>> y = sc.parallelize([("a", 3), ("c", None)])
>>> sorted(x.subtract(y).collect())
[('a', 1), ('b', 4), ('b', 5)]

# unioin
Return the union of this RDD and another one.
>>> rdd = sc.parallelize([1, 1, 2, 3])
>>> rdd.union(rdd).collect()
[1, 1, 2, 3, 1, 1, 2, 3]

# intersection
Return the intersection of this RDD and another one
>>> rdd1 = sc.parallelize([1, 10, 2, 3, 4, 5])
>>> rdd2 = sc.parallelize([1, 6, 2, 3, 7, 8])
>>> rdd1.intersection(rdd2).collect()
[1, 2, 3]

# cartesian
Return the Cartesian product of this RDD and another one.
>>> rdd = sc.parallelize([1, 2])
>>> sorted(rdd.cartesian(rdd).collect())
[(1, 1), (1, 2), (2, 1), (2, 2)]

Saving RDDs

Saving RDDs

Now that we have seen all the various transformations and Actions. It’s time to save these RDDs in different formats.

# saveAsTextFile
Save this RDD as a text file, using string representations of elements.
>>> rdd.saveAsTextFile("rdd.txt")

# saveAsHadoopFile
Output a Python RDD of key-value pairs (of form RDD[(K, V)]) to any Hadoop file system
>>> rdd.saveAsHadoopFile("hdfs://namenodehost/parent_folder/child_folder",'org.apache.hadoop.mapred.TextOutputFormat')

# saveAsPickleFile 
Save this RDD as a SequenceFile of serialized objects
>>> tmpFile = NamedTemporaryFile(delete=True)
>>> tmpFile.close()
>>> sc.parallelize([1, 2, 'spark', 'rdd']).saveAsPickleFile(tmpFile.name, 3)
>>> sorted(sc.pickleFile(tmpFile.name, 5).collect())
[1, 2, 'rdd', 'spark']

With this, we come to an end to Pyspark RDD Cheat Sheet. Check out the Python Spark Certification Training using PySpark by Edureka, a trusted online learning company with a network of more than 250,000 satisfied learners spread across the globe. Edureka’s Python Spark Certification Training using PySpark is designed to provide you with the knowledge and skills that are required to become a successful Spark Developer using Python and prepare you for the Cloudera Hadoop and Spark Developer Certification Exam (CCA175).

Upcoming Batches For PySpark Certification Training Course
Course NameDateDetails
PySpark Certification Training Course

Class Starts on 22nd February,2025

22nd February

SAT&SUN (Weekend Batch)
View Details
BROWSE COURSES