Pyspark
PySpark Notes
Basic Workflow with PySpark with ETL.
PySpark serves as the foundation of any reporting, machine learning or data science exercise.
Launching PySpark on a local machine.
SparkSession entry point
Pyspark uses a builder pattern through the SparkSession.builder.
from pyspark.sql import SparkSession
spark = (SparkSession
.builder.
appName("Analysis")
.getOrCreate())
SparkSession is superset of SparkContxt. It waraps the sparkcontext and provides functionality for interacting with the Spark SQL API.
The SparkSession object is a more recent addition to the PySpark API, making its way in version 2.0.
Older PySpark Code :
Log Level of PySpark
Log Levels :
Log Level | Description |
---|---|
off | no logging at all |
fatal | only fatal errors or when spark cluster crashes. |
error | fataL + recoverable errors. |
warn | warnings |
info | runtime into. |
debug | debug information on jobs. |
trace | verbose debug logs. |
all | everything pyspark can print. |
Here are the major steps we need our program to perform.
- Read
- Token
- Clean
- Count
- Answers
Reading Data into a data frame with spark.read
- The RDD
- The DataFrame.
RDD was used for a long time. It looks like a distributed collection of objects or rows.
Reading data into a dataframe is done through a DataFrameReader
object which can be accessed through spark.read
PySpark recognizes a few files formats
- CSV
- JSON
- TEXT
ORC : Optimized Row Columnar.
Pyspark defaults to using parquet when reading and writing files.
Printing The Schema
Splitting our lines of text into arrays or words.
from pyspark.sql.functions import split
product_title = ecommerce_data.select(split(title.value, " ").alias("product_name"))
product_title.show()
select() method : used for selecting data.
alias() method to renamae transformed columns.
Selecting only one column.
from pyspark.sql.functions import col
Renaming columns : alias()
Explode Function
Say there is a column with array as type. The explode function takes each element in the array and gives it a row / record on the dataframe.
using Regular Expressions to clean data.
from pyspark.sql.functions import regexp_extract
words_clean = words_lower.select(regexp_extract(col("word_lower"), "[a-z]+", 0).alias("word"))
Filtering Rows
Removing Columns with empty string values :
3. Submitting and scaling your first Pyspark program.
Table of Contents
- Summarizing data using groupby and simple aggregate function.
- ordering results for display.
- writing data from a data frame.
- spark-submit to launch your program in batch mode.
- simplifying pyspark writing writing using method chaining.
- scaling your program to multiple ifles at once.
Grouping records - Counting word frequencies
Counting records using the GroupedData
object and perform an aggregation function -
counting the items on eac hgroup.
We count the number of each word by creating groups : one for each word.
Once those groups are formed, we can perform an aggregation function on each one of them.
The easiest way to count record occurence is to use the groupby()
method, passing the columns we wish to group as a parameter.
The groupby()
method returns a GroupedData and awaits further instructions.
Once we apply the count() method, we get back a data frame containing the grouping column word, as well as the count column containing the number of occurences for each word.
groups = words_nonull.groupby(col("word"))
print(groups)
results = words_nonull.groupby(col("word")).count()
print(results)
results.show()
Ordering the results on the screen using orderBy
the orderBy function can be used to order results.
Writing Data from a data frame.
Funtions used to write :
To reduce the number of partitions we apply the
coalesce
method.
Pyspark Program
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, lower, regexp_extract, split
spark = SparSessoin.builder.appName("analysis").getOrCreate()
book = spark.read.text("filename.txt")
lines = book.select(split(book.value, " ").alias("line"))
words = lines.select(explode(col("line")).alias("word"))
words_lower = words.select(lower(col("word")).alias("word"))
words_clean = words_lower.select(regexp_extract(col("word"), "[a-z]*", 0).alias("word"))
words_nonull = words_clean.where(col("word") != "")
results = words_nonull.groupby(col("word")).count()
resutlts.orderBy("count", ascending = False).show(10)
results.coalesce(1).write.csv("filename.csv")
Simplying dependencies with pyspark import convensions.
from pyspark.sql.functions import col, explode, lower, regexp_extract, split
import pyspark.sql.functions as F
Analyzing tabular data with pyspark.sql
contents
Reading delimited data into a pyspark data frame. understanding how pyspark represents tabular data in a dataframe ingesting and exploring tabular data or relational data. selecting, manipulating, renaming and deleting columns in a data frame. summarizing data frames for quick exploration.
Creating our SparkSession object to start using pyspark