Skip to content

Pyspark

PySpark Notes

Basic Workflow with PySpark with ETL.

PySpark serves as the foundation of any reporting, machine learning or data science exercise.

Launching PySpark on a local machine.

pyspark

SparkSession entry point

Pyspark uses a builder pattern through the SparkSession.builder.

from pyspark.sql import SparkSession

spark = (SparkSession
        .builder.
        appName("Analysis")
        .getOrCreate())

SparkSession is superset of SparkContxt. It waraps the sparkcontext and provides functionality for interacting with the Spark SQL API.

spark.sparkContext

The SparkSession object is a more recent addition to the PySpark API, making its way in version 2.0.

Older PySpark Code :

sc = spark.sparkContext
sqlContext = spark

Log Level of PySpark

spark.sparkContext.setLogLevel("KEYWORD")

Log Levels :

Log Level Description
off no logging at all
fatal only fatal errors or when spark cluster crashes.
error fataL + recoverable errors.
warn warnings
info runtime into.
debug debug information on jobs.
trace verbose debug logs.
all everything pyspark can print.

Here are the major steps we need our program to perform.

  1. Read
  2. Token
  3. Clean
  4. Count
  5. Answers

Reading Data into a data frame with spark.read

  • The RDD
  • The DataFrame.

RDD was used for a long time. It looks like a distributed collection of objects or rows.

Reading data into a dataframe is done through a DataFrameReader object which can be accessed through spark.read

PySpark recognizes a few files formats

  • CSV
  • JSON
  • TEXT

ORC : Optimized Row Columnar.

Pyspark defaults to using parquet when reading and writing files.

data = spark.read.text("filename.txt")

Printing The Schema

data.printSchema()

Splitting our lines of text into arrays or words.

from pyspark.sql.functions import split
product_title = ecommerce_data.select(split(title.value, " ").alias("product_name"))
product_title.show()

select() method : used for selecting data.

alias() method to renamae transformed columns.

Selecting only one column.

book.select(book.value)

from pyspark.sql.functions import col

book.select(col["value"])

Renaming columns : alias()

Explode Function

Say there is a column with array as type. The explode function takes each element in the array and gives it a row / record on the dataframe.

using Regular Expressions to clean data.

from pyspark.sql.functions import regexp_extract 

words_clean = words_lower.select(regexp_extract(col("word_lower"), "[a-z]+", 0).alias("word"))

Filtering Rows

Removing Columns with empty string values :

words_cleaned = words_clean.filter(col("word") != "")

3. Submitting and scaling your first Pyspark program.

Table of Contents

  • Summarizing data using groupby and simple aggregate function.
  • ordering results for display.
  • writing data from a data frame.
  • spark-submit to launch your program in batch mode.
  • simplifying pyspark writing writing using method chaining.
  • scaling your program to multiple ifles at once.

Grouping records - Counting word frequencies

Counting records using the GroupedData object and perform an aggregation function - counting the items on eac hgroup.

We count the number of each word by creating groups : one for each word.

Once those groups are formed, we can perform an aggregation function on each one of them.

The easiest way to count record occurence is to use the groupby() method, passing the columns we wish to group as a parameter.

The groupby() method returns a GroupedData and awaits further instructions.

Once we apply the count() method, we get back a data frame containing the grouping column word, as well as the count column containing the number of occurences for each word.

groups = words_nonull.groupby(col("word"))
print(groups)

results = words_nonull.groupby(col("word")).count()
print(results)
results.show()

Ordering the results on the screen using orderBy

the orderBy function can be used to order results.

results.orderBy("count", ascending = False).show(10)
results.orderBy(col("count").desc()).show(10)

Writing Data from a data frame.

Funtions used to write :

write()
SparkWriter object
results.write.csv("filename.csv")

To reduce the number of partitions we apply the coalesce method.

Pyspark Program

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, lower, regexp_extract, split

spark = SparSessoin.builder.appName("analysis").getOrCreate()

book = spark.read.text("filename.txt")
lines = book.select(split(book.value, " ").alias("line"))
words = lines.select(explode(col("line")).alias("word"))
words_lower = words.select(lower(col("word")).alias("word"))
words_clean = words_lower.select(regexp_extract(col("word"), "[a-z]*", 0).alias("word"))


words_nonull = words_clean.where(col("word") != "")
results = words_nonull.groupby(col("word")).count()

resutlts.orderBy("count", ascending = False).show(10)

results.coalesce(1).write.csv("filename.csv")

Simplying dependencies with pyspark import convensions.

from pyspark.sql.functions import col, explode, lower, regexp_extract, split

import pyspark.sql.functions as F

Analyzing tabular data with pyspark.sql

contents

Reading delimited data into a pyspark data frame. understanding how pyspark represents tabular data in a dataframe ingesting and exploring tabular data or relational data. selecting, manipulating, renaming and deleting columns in a data frame. summarizing data frames for quick exploration.

Creating our SparkSession object to start using pyspark

from pyspark.sql import SparkSession
import pyspark.sql.functions as F

spark = SparkSession.builder.getOrCreate()