Description: In this post we are going to learn about the use of some tools to apply on Big Data, that differs a bit from a normal pandas-csv methodology. Working with Big Data usually demands more sophisticated software that can deal with the upload and storage of Giga files. This was a great module I had from “Stack Academy - Big Data for Data Scientist” that, for its massive importance and content, I decided to bring to my portfolio.
Without further ado, some tools we are going to use are:
DataBricks: Azure Databricks is a fully managed service which provides powerful ETL (Extract, Transform and Load), analytics, and machine learning capabilities. It gives a simple collaborative environment to run interactive, and scheduled data analysis workloads.
PySpark: PySpark is a Python-based API for utilizing the Spark framework in combination with Python. While Spark is a Big Data computational engine, Python is a programming language.
SQL: SQL, in full structured query language, computer language designed for eliciting information from databases.
So, let’s start with small .json files
# Reading the data file
arquivo = "/FileStore/tables/shit/2015_summary.json"
inferSchema = True.
A column can be of type String, Double, Long, etc. Using inferSchema=false (default option) will give a dataframe where all columns are strings (StringType). Depending on what you want to do, strings may not work.
For example, if you want to add numbers from different columns, then those columns should be of some numeric type.
header = True.
This will use the first row in the csv file as the dataframe’s column names. Setting header=false (default option) will result in a dataframe with default column names: _c0, _c1, _c2, etc.
flightData2015 = spark\
.read.format("csv")\
.option("inferSchema", "True")\
.option("header", "True")\
.csv(arquivo)
# print the datatypes from the df columns
flightData2015.printSchema()
root -- DEST_COUNTRY_NAME: string (nullable = true) -- ORIGIN_COUNTRY_NAME: string (nullable = true) -- count: integer (nullable = true)
# print the type of the feature
type(flightData2015)
pyspark.sql.dataframe.DataFrame
# return the first 5 lines in array format
flightData2015.take(5)
[Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Romania', count=15), Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Croatia', count=1), Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='Ireland', count=344), Row(DEST_COUNTRY_NAME='Egypt', ORIGIN_COUNTRY_NAME='United States', count=15), Row(DEST_COUNTRY_NAME='United States', ORIGIN_COUNTRY_NAME='India', count=62)]
display(flightData2015.show(3))
DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
United States| Romania| 15|
United States| Croatia| 1|
United States| Ireland| 344|
+-----------------+-------------------+-----+
only showing top 3 rows
flightData2015.count()
256
# turning off the inferSchema, we can spot the difference on the loading time
flightData2015 = spark\
.read\
.option("inferSchema", "False")\
.option("header", "True")\
.json(arquivo)
# reading the json files from a new dir created on databricks (I'm sorry about the name, I just couldn't change when already made)
df = spark\
.read\
.option("inferSchema", "True")\
.option("header", "True")\
.json("/FileStore/tables/shit/*.json")
df.show(10)
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
| United States| Romania| 15|
| United States| Croatia| 1|
| United States| Ireland| 344|
| Egypt| United States| 15|
| United States| India| 62|
| United States| Singapore| 1|
| United States| Grenada| 62|
| Costa Rica| United States| 588|
| Senegal| United States| 40|
| Moldova| United States| 1|
+-----------------+-------------------+-----+
only showing top 10 rows
df.count()
1502
display(df.head(10))
DEST_COUNTRY_NAME ORIGIN_COUNTRY_NAME count
United States Romania 1
United States Ireland 264
United States India 69
Egypt United States 24
Equatorial Guinea United States 1
United States Singapore 25
United States Grenada 54
Costa Rica United States 477
Senegal United States 29
United States Marshall Islands 44
%sql
DROP TABLE IF EXISTS all_files;
# erase the table all_files because I'm going to create a table named all_files on the next block,
# that is the reason why we need to delete an existing one
%sql
CREATE TABLE all_files
USING json
OPTIONS (path "/FileStore/tables/shit/*.json", header "true")
# the data from the "shit" folder are going to fill the table "all_files"
%sql
SELECT * FROM all_files;
DEST_COUNTRY_NAME ORIGIN_COUNTRY_NAME count
United States Romania 15
United States Croatia 1
United States Ireland 344
Egypt United States 15
United States India 62
United States Singapore 1
United States Grenada 62
Costa Rica United States 588
Senegal United States 40
%sql
SELECT count(*) FROM all_files;
%sql
-- selecting the avg of the countries
SELECT DEST_COUNTRY_NAME
,avg(count) AS Quantidade_Paises
FROM all_files
GROUP BY DEST_COUNTRY_NAME
ORDER BY DEST_COUNTRY_NAME;
from pyspark.sql.functions import max
df.select(max("count")).take(1)
Out[97]: [Row(max(count)=370002)]
# Filtering lines with filter
df.filter("count < 2").show(2)
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
| United States| Croatia| 1|
| United States| Singapore| 1|
+-----------------+-------------------+-----+
only showing top 2 rows
# Where, an alias for the filter method
df.where("count < 2").show(2)
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
| United States| Croatia| 1|
| United States| Singapore| 1|
+-----------------+-------------------+-----+
only showing top 2 rows
df.sort("count").show(5)
+--------------------+-------------------+-----+
| DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+--------------------+-------------------+-----+
|Saint Vincent and...| United States| 1|
| Malawi| United States| 1|
| Slovakia| United States| 1|
| Kazakhstan| United States| 1|
| United States| Saint Martin| 1|
+--------------------+-------------------+-----+
only showing top 5 rows
from pyspark.sql.functions import desc, asc, expr
# sorting in desc
df.orderBy(expr("count desc")).show(10)
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
| Suriname| United States| 1|
| United States| Cyprus| 1|
| United States| Gibraltar| 1|
| Cyprus| United States| 1|
| Moldova| United States| 1|
| Burkina Faso| United States| 1|
| United States| Croatia| 1|
| Djibouti| United States| 1|
| Zambia| United States| 1|
| United States| Estonia| 1|
+-----------------+-------------------+-----+
only showing top 10 rows
# descritive statistics
df.describe().show()
+-------+-----------------+-------------------+------------------+
|summary|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count|
+-------+-----------------+-------------------+------------------+
| count| 1502| 1502| 1502|
| mean| null| null|1718.3189081225032|
| stddev| null| null|22300.368619668894|
| min| Afghanistan| Afghanistan| 1|
| max| Zimbabwe| Zimbabwe| 370002|
+-------+-----------------+-------------------+------------------+
from pyspark.sql.functions import lower, upper, col
df.select(col("DEST_COUNTRY_NAME"),lower(col("DEST_COUNTRY_NAME")),upper(lower(col("DEST_COUNTRY_NAME")))).show(10)
+-----------------+------------------------+-------------------------------+
|DEST_COUNTRY_NAME|lower(DEST_COUNTRY_NAME)|upper(lower(DEST_COUNTRY_NAME))|
+-----------------+------------------------+-------------------------------+
| United States| united states| UNITED STATES|
| United States| united states| UNITED STATES|
| United States| united states| UNITED STATES|
| Egypt| egypt| EGYPT|
| United States| united states| UNITED STATES|
| United States| united states| UNITED STATES|
| United States| united states| UNITED STATES|
| Costa Rica| costa rica| COSTA RICA|
| Senegal| senegal| SENEGAL|
| Moldova| moldova| MOLDOVA|
+-----------------+------------------------+-------------------------------+
only showing top 10 rows
df.orderBy("count", "DEST_COUNTRY_NAME").show(5)
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
| Azerbaijan| United States| 1|
| Belarus| United States| 1|
| Belarus| United States| 1|
| Brunei| United States| 1|
| Bulgaria| United States| 1|
+-----------------+-------------------+-----+
only showing top 5 rows
from pyspark.sql.functions import desc, asc
df.orderBy(expr("count desc")).show(2)
df.orderBy(col("count").desc(), col("DEST_COUNTRY_NAME").asc()).show(2)
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
| United States| Romania| 1|
|Equatorial Guinea| United States| 1|
+-----------------+-------------------+-----+
only showing top 2 rows
+-----------------+-------------------+------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME| count|
+-----------------+-------------------+------+
| United States| United States|370002|
| United States| United States|358354|
+-----------------+-------------------+------+
only showing top 2 rows
permissive: Set all fields to NULL when it finds records and situations all records in a column called _corrupt_record (default).
dropMalformed: Deletes a corrupted or unreadable line.
failFast: Fails immediately when it finds a row it doesn’t recognize.
# Reading csv
spark.read.format("csv")
.option("mode", "permissive")
.option("inferSchema", "true")
.option("path", "path/to/file(s)")
.schema(someSchema)
.load()
df = spark.read.format("csv")\
.option("mode", "permissive")\
.option("header", "true")\
.option("inferSchema", "true")\
.load("/FileStore/tables/bronze/2010_12_01.csv")
display(df.head(10))
df.printSchema()
root
|-- InvoiceNo: string (nullable = true)
|-- StockCode: string (nullable = true)
|-- Description: string (nullable = true)
|-- Quantity: integer (nullable = true)
|-- InvoiceDate: string (nullable = true)
|-- UnitPrice: double (nullable = true)
|-- CustomerID: double (nullable = true)
|-- Country: string (nullable = true)
# Using the StructType object can make you define the type of each variable. Invoice was a string and now I want pyspark to change to int
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType, DoubleType, TimestampType
schema_df = StructType([
StructField("InvoiceNo", IntegerType()),
StructField("StockCode", IntegerType()),
StructField("Description", StringType()),
StructField("Quantity", IntegerType()),
StructField("InvoiceDate", TimestampType()),
StructField("UnitPrice", DoubleType()),
StructField("CustomerID", DoubleType()),
StructField("Country", StringType())
])
# checking the schema_df type
type(schema_df)
Out[114]: pyspark.sql.types.StructType
# using the schema() parameter
df = spark.read.format("csv")\
.option("header", "True")\
.schema(schema_df)\
.option("timestampFormat",'yyyy-/MM/DD hh:mm:ss')\
.load("/FileStore/tables/bronze/2010_12_01.csv")
df.printSchema()
root
|-- InvoiceNo: integer (nullable = true)
|-- StockCode: integer (nullable = true)
|-- Description: string (nullable = true)
|-- Quantity: integer (nullable = true)
|-- InvoiceDate: timestamp (nullable = true)
|-- UnitPrice: double (nullable = true)
|-- CustomerID: double (nullable = true)
|-- Country: string (nullable = true)
display(df.head(10))
# We defined the StockCode to be integer, but some values have int AND strings mixed, so it will not be able to transform these values into int
# The mixed values are going to be presented AS NULL by the permissive mode.
# Switching to failfast instead of permissive
df = spark.read.format("csv")\
.option("header", "True")\
.schema(schema_df)\
.option("mode","failfast")\
.option("timestampFormat",'yyyy-/MM/DD hh:mm:ss')\
.load("/FileStore/tables/bronze/2010_12_01.csv")
# StockCode - int
df.printSchema()
root
|-- InvoiceNo: integer (nullable = true)
|-- StockCode: integer (nullable = true)
|-- Description: string (nullable = true)
|-- Quantity: integer (nullable = true)
|-- InvoiceDate: timestamp (nullable = true)
|-- UnitPrice: double (nullable = true)
|-- CustomerID: double (nullable = true)
|-- Country: string (nullable = true)
display(df.collect()) # Reading error, failfast does not allow the error that permisssive does
df_json = spark.read.format("json")\
.option("mode", "FAILFAST")\
.option("inferSchema", "true")\
.load("/FileStore/tables/shit/2010_summary.json")
df_json.printSchema()
root
|-- DEST_COUNTRY_NAME: string (nullable = true)
|-- ORIGIN_COUNTRY_NAME: string (nullable = true)
|-- count: long (nullable = true)
display(df_json.head(10))
# writing csv
df.write.format("csv")\
.mode("overwrite") \
.option("sep", ",") \
.save("/FileStore/tables/bronze/saida_2010_12_01.csv")
file = "/FileStore/tables/bronze/saida_2010_12_01.csv/part-00000-tid-513137111285552141-fa5fcb38-55a1-4a12-ac99-df3fa327627c-83-1-c000.csv"
df = spark.read.format("csv")\
.option("header", "True")\
.option("inferSchema", "True")\
.option("timestampFormat",'yyyy-/MM/DD hh:mm:ss')\
.load(file)
df.show(10)
# slicing the csv
# it created a new dir on the bronze dir
df.repartition(5).write.format("csv")\
.mode("overwrite") \
.option("sep", ",") \
.save("/FileStore/tables/bronze/saida_2010_12_01.csv")
Converting .csv to .parquet
# Reading the csv files (>4GB)
df = spark.read.format("csv")\
.option("header", "True")\
.option("inferSchema","True")\
.load("/FileStore/tables/bigdata/*.csv")
display(df.head(10))
df.printSchema()
root
|-- practice: string (nullable = true)
|-- bnf_code: string (nullable = true)
|-- bnf_name: string (nullable = true)
|-- items: string (nullable = true)
|-- nic: string (nullable = true)
|-- act_cost: string (nullable = true)
|-- quantity: string (nullable = true)
df.count()
Out[5]: 131020089
# writing in parquet file
# try to pay attention on the spark version, always work on the version you converted
df.write.format("parquet")\
.mode("overwrite")\
.save("/FileStore/tables/bronze/df-parquet-file.parquet")
%fs
ls /FileStore/tables/bronze/df-parquet-file.parquet
# reading parquet, way faster than csv and json
# csv took 5min, parquet 96s
df_parquet = spark.read.format("parquet")\
.load("/FileStore/tables/bronze/df-parquet-file.parquet")
# csv took 2min, parquet 10s
df_parquet.count()
Out[7]: 131020089
display(df_parquet.head(10))
# checking files size
display(dbutils.fs.ls("/FileStore/tables/bronze/df-parquet-file.parquet"))
%scala
// script to get size in Gyga
val path="/FileStore/tables/bronze/df-parquet-file.parquet"
val filelist=dbutils.fs.ls(path)
val df_temp = filelist.toDF()
df_temp.createOrReplaceTempView("adlsSize")
%sql
-- query the created view
-- csv 4GB / parquet 1.3k
select round(sum(size)/(1024*1024*1024),3) as sizeInGB from adlsSize
# It is similar to: select * from pg_catalog.pg_tables
# jdbc:postgresql://pgserver-1.postgres.database.azure.com:5432/{your_database}?user=stack_user@pgserver-1&password={your_password}&sslmode=require
# We also need to Put our server to receive remote access at the azure server
pgDF = spark.read.format("jdbc")\
.option("driver", "org.postgresql.Driver")\
.option("url", "jdbc:postgresql://pgserver-1.postgres.database.azure.com:5432/postgres?user=stack_user@pgserver-1&password=Newdata2021!&sslmode=require")\
.option("dbtable", "pg_catalog.pg_tables")\
.option("user", "stack_user").option("password", "Newdata2021!").load()
#dbtable= means that I query the 'pg_catalog.pg_tables' and call the function 'select * from pg_catalog.pg_tables'
#infos to access (user and password)
# printing all the lines
display(pgDF.collect())
# query from schemaname
pgDF.select("schemaname").distinct().show()
+------------------+
| schemaname|
+------------------+
|information_schema|
| public|
| pg_catalog|
+------------------+
# Specific query
# Useful to avoid "select * from."
pgDF = spark.read.format("jdbc")\
.option("driver", "org.postgresql.Driver")\
.option("url", "jdbc:postgresql://pgserver-1.postgres.database.azure.com:5432/postgres?user=stack_user@pgserver-1&password=Newdata2021!&sslmode=require")\
.option("query", "select schemaname,tablename from pg_catalog.pg_tables")\
.option("user", "stack_user").option("password", "Newdata2021!").load()
# query = specific consulting restricting data and selecting columns, avoiding a broad query
display(pgDF.collect())
# creating the "produtos" table from the df data
pgDF.write.mode("overwrite")\
.format("jdbc")\
.option("url", "jdbc:postgresql://pgserver-1.postgres.database.azure.com:5432/postgres?user=stack_user@pgserver-1&password=Newdata2021!&sslmode=require")\
.option("dbtable", "produtos")\
.option("user", "stack_user")\
.option("password", "Newdata2021!")\
.save()
# creating the dataframe df_produtos from the created table
df_produtos = spark.read.format("jdbc")\
.option("driver", "org.postgresql.Driver")\
.option("url", "jdbc:postgresql://pgserver-1.postgres.database.azure.com:5432/postgres?user=stack_user@pgserver-1&password=Newdata2021!&sslmode=require")\
.option("dbtable", "produtos")\
.option("user", "stack_user").option("password", "Newdata2021!").load()
# printing all the rows
display(df_produtos.collect())
df.show(10)
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode| Description|Quantity| InvoiceDate|UnitPrice|CustomerID| Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
| 536365| 85123A|WHITE HANGING HEA...| 6|2010-12-01 08:26:00| 2.55| 17850.0|United Kingdom|
| 536365| 71053| WHITE METAL LANTERN| 6|2010-12-01 08:26:00| 3.39| 17850.0|United Kingdom|
| 536365| 84406B|CREAM CUPID HEART...| 8|2010-12-01 08:26:00| 2.75| 17850.0|United Kingdom|
| 536365| 84029G|KNITTED UNION FLA...| 6|2010-12-01 08:26:00| 3.39| 17850.0|United Kingdom|
| 536365| 84029E|RED WOOLLY HOTTIE...| 6|2010-12-01 08:26:00| 3.39| 17850.0|United Kingdom|
| 536365| 22752|SET 7 BABUSHKA NE...| 2|2010-12-01 08:26:00| 7.65| 17850.0|United Kingdom|
| 536365| 21730|GLASS STAR FROSTE...| 6|2010-12-01 08:26:00| 4.25| 17850.0|United Kingdom|
| 536366| 22633|HAND WARMER UNION...| 6|2010-12-01 08:28:00| 1.85| 17850.0|United Kingdom|
| 536366| 22632|HAND WARMER RED P...| 6|2010-12-01 08:28:00| 1.85| 17850.0|United Kingdom|
| 536367| 84879|ASSORTED COLOUR B...| 32|2010-12-01 08:34:00| 1.69| 13047.0|United Kingdom|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
only showing top 10 rows
# I sum the unit price for each country
df.groupBy("Country").sum("UnitPrice").show()
+--------------+------------------+
| Country| sum(UnitPrice)|
+--------------+------------------+
| Germany| 93.82000000000002|
| France| 55.29|
| EIRE|133.64000000000001|
| Norway| 102.67|
| Australia| 73.9|
|United Kingdom|12428.080000000024|
| Netherlands| 16.85|
+--------------+------------------+
# Showing how much data each country has
df.groupBy("Country").count().show()
+--------------+-----+
| Country|count|
+--------------+-----+
| Germany| 29|
| France| 20|
| EIRE| 21|
| Norway| 73|
| Australia| 14|
|United Kingdom| 2949|
| Netherlands| 2|
+--------------+-----+
# Now we get the min value by country
df.groupBy("Country").min("UnitPrice").show()
+--------------+--------------+
| Country|min(UnitPrice)|
+--------------+--------------+
| Germany| 0.42|
| France| 0.42|
| EIRE| 0.65|
| Norway| 0.29|
| Australia| 0.85|
|United Kingdom| 0.0|
| Netherlands| 1.85|
+--------------+--------------+
# the max price by country
df.groupBy("Country").max("UnitPrice").show()
+--------------+--------------+
| Country|max(UnitPrice)|
+--------------+--------------+
| Germany| 18.0|
| France| 18.0|
| EIRE| 50.0|
| Norway| 7.95|
| Australia| 8.5|
|United Kingdom| 607.49|
| Netherlands| 15.0|
+--------------+--------------+
# the average unit price by country
df.groupBy("Country").avg("UnitPrice").show()
+--------------+------------------+
| Country| avg(UnitPrice)|
+--------------+------------------+
| Germany| 3.235172413793104|
| France| 2.7645|
| EIRE|6.3638095238095245|
| Norway|1.4064383561643836|
| Australia| 5.278571428571429|
|United Kingdom|4.2143370634113335|
| Netherlands| 8.425|
+--------------+------------------+
df.groupBy("Country").mean("UnitPrice").show()
+--------------+------------------+
| Country| avg(UnitPrice)|
+--------------+------------------+
| Germany| 3.235172413793104|
| France| 2.7645|
| EIRE|6.3638095238095245|
| Norway|1.4064383561643836|
| Australia| 5.278571428571429|
|United Kingdom|4.2143370634113335|
| Netherlands| 8.425|
+--------------+------------------+
# GroupBy multiple columns
df.groupBy("Country","CustomerID") \
.sum("UnitPrice") \
.show()
+--------------+----------+------------------+
| Country|CustomerID| sum(UnitPrice)|
+--------------+----------+------------------+
|United Kingdom| 17420.0| 38.99999999999999|
|United Kingdom| 15922.0| 48.5|
|United Kingdom| 16250.0| 47.27|
|United Kingdom| 13065.0| 73.11000000000001|
|United Kingdom| 18074.0|62.150000000000006|
|United Kingdom| 16048.0|12.969999999999999|
| Germany| 12472.0| 49.45|
|United Kingdom| 18085.0| 34.6|
|United Kingdom| 17905.0|109.90000000000003|
|United Kingdom| 17841.0|254.63999999999982|
|United Kingdom| 15291.0| 6.0|
|United Kingdom| 17951.0|22.000000000000004|
|United Kingdom| 13255.0|27.299999999999997|
|United Kingdom| 17690.0| 34.8|
|United Kingdom| 18229.0| 48.65|
|United Kingdom| 15605.0| 58.20000000000002|
|United Kingdom| 18011.0| 66.10999999999999|
|United Kingdom| 17809.0| 1.45|
|United Kingdom| 14307.0|115.35000000000004|
|United Kingdom| 13705.0|183.98999999999998|
+--------------+----------+------------------+
only showing top 20 rows
# Bringing the df from bronze and printing it
df = spark.read.format("csv")\
.option("mode", "permissive")\
.option("header", "true")\
.option("inferSchema", "true")\
.load("/FileStore/tables/bronze/2010_12_01.csv")
df.show()
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
|InvoiceNo|StockCode| Description|Quantity| InvoiceDate|UnitPrice|CustomerID| Country|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
| 536365| 85123A|WHITE HANGING HEA...| 6|2010-12-01 08:26:00| 2.55| 17850.0|United Kingdom|
| 536365| 71053| WHITE METAL LANTERN| 6|2010-12-01 08:26:00| 3.39| 17850.0|United Kingdom|
| 536365| 84406B|CREAM CUPID HEART...| 8|2010-12-01 08:26:00| 2.75| 17850.0|United Kingdom|
| 536365| 84029G|KNITTED UNION FLA...| 6|2010-12-01 08:26:00| 3.39| 17850.0|United Kingdom|
| 536365| 84029E|RED WOOLLY HOTTIE...| 6|2010-12-01 08:26:00| 3.39| 17850.0|United Kingdom|
| 536365| 22752|SET 7 BABUSHKA NE...| 2|2010-12-01 08:26:00| 7.65| 17850.0|United Kingdom|
| 536365| 21730|GLASS STAR FROSTE...| 6|2010-12-01 08:26:00| 4.25| 17850.0|United Kingdom|
| 536366| 22633|HAND WARMER UNION...| 6|2010-12-01 08:28:00| 1.85| 17850.0|United Kingdom|
| 536366| 22632|HAND WARMER RED P...| 6|2010-12-01 08:28:00| 1.85| 17850.0|United Kingdom|
| 536367| 84879|ASSORTED COLOUR B...| 32|2010-12-01 08:34:00| 1.69| 13047.0|United Kingdom|
| 536367| 22745|POPPY'S PLAYHOUSE...| 6|2010-12-01 08:34:00| 2.1| 13047.0|United Kingdom|
| 536367| 22748|POPPY'S PLAYHOUSE...| 6|2010-12-01 08:34:00| 2.1| 13047.0|United Kingdom|
| 536367| 22749|FELTCRAFT PRINCES...| 8|2010-12-01 08:34:00| 3.75| 13047.0|United Kingdom|
| 536367| 22310|IVORY KNITTED MUG...| 6|2010-12-01 08:34:00| 1.65| 13047.0|United Kingdom|
| 536367| 84969|BOX OF 6 ASSORTED...| 6|2010-12-01 08:34:00| 4.25| 13047.0|United Kingdom|
| 536367| 22623|BOX OF VINTAGE JI...| 3|2010-12-01 08:34:00| 4.95| 13047.0|United Kingdom|
| 536367| 22622|BOX OF VINTAGE AL...| 2|2010-12-01 08:34:00| 9.95| 13047.0|United Kingdom|
| 536367| 21754|HOME BUILDING BLO...| 3|2010-12-01 08:34:00| 5.95| 13047.0|United Kingdom|
| 536367| 21755|LOVE BUILDING BLO...| 3|2010-12-01 08:34:00| 5.95| 13047.0|United Kingdom|
| 536367| 21777|RECIPE BOX WITH M...| 4|2010-12-01 08:34:00| 7.95| 13047.0|United Kingdom|
+---------+---------+--------------------+--------+-------------------+---------+----------+--------------+
only showing top 20 rows
df.printSchema()
# We can notice that the InvoiceDate wasn't recognize as a date, but string
root
|-- InvoiceNo: string (nullable = true)
|-- StockCode: string (nullable = true)
|-- Description: string (nullable = true)
|-- Quantity: integer (nullable = true)
|-- InvoiceDate: string (nullable = true)
|-- UnitPrice: double (nullable = true)
|-- CustomerID: double (nullable = true)
|-- Country: string (nullable = true)
from pyspark.sql.functions import *
# importing the date funcions
# current_date()
df.select(current_date().alias("current_date")).show(1) #selecting a column and creating an alias for it
+------------+
|current_date|
+------------+
| 2021-12-06|
+------------+
only showing top 1 row
#date_format()
df.select(col("InvoiceDate"), \
date_format(col("InvoiceDate"), "dd-MM-yyyy hh:mm:ss")\
.alias("date_format")).show()
#date_format : I call the column I want to format and the formatting type I want my data
+-------------------+-------------------+
| InvoiceDate| date_format|
+-------------------+-------------------+
|2010-12-01 08:26:00|01-12-2010 08:26:00|
|2010-12-01 08:26:00|01-12-2010 08:26:00|
|2010-12-01 08:26:00|01-12-2010 08:26:00|
|2010-12-01 08:26:00|01-12-2010 08:26:00|
|2010-12-01 08:26:00|01-12-2010 08:26:00|
|2010-12-01 08:26:00|01-12-2010 08:26:00|
|2010-12-01 08:26:00|01-12-2010 08:26:00|
|2010-12-01 08:28:00|01-12-2010 08:28:00|
|2010-12-01 08:28:00|01-12-2010 08:28:00|
|2010-12-01 08:34:00|01-12-2010 08:34:00|
|2010-12-01 08:34:00|01-12-2010 08:34:00|
|2010-12-01 08:34:00|01-12-2010 08:34:00|
|2010-12-01 08:34:00|01-12-2010 08:34:00|
|2010-12-01 08:34:00|01-12-2010 08:34:00|
|2010-12-01 08:34:00|01-12-2010 08:34:00|
|2010-12-01 08:34:00|01-12-2010 08:34:00|
|2010-12-01 08:34:00|01-12-2010 08:34:00|
|2010-12-01 08:34:00|01-12-2010 08:34:00|
|2010-12-01 08:34:00|01-12-2010 08:34:00|
|2010-12-01 08:34:00|01-12-2010 08:34:00|
+-------------------+-------------------+
only showing top 20 rows
# datediff: return the difference between dates
# So I want to compare the current date (days) with my df date
df.select(col("InvoiceDate"),
datediff(current_date(),col("InvoiceDate")).alias("datediff")
).show()
+-------------------+--------+
| InvoiceDate|datediff|
+-------------------+--------+
|2010-12-01 08:26:00| 4023|
|2010-12-01 08:26:00| 4023|
|2010-12-01 08:26:00| 4023|
|2010-12-01 08:26:00| 4023|
|2010-12-01 08:26:00| 4023|
|2010-12-01 08:26:00| 4023|
|2010-12-01 08:26:00| 4023|
|2010-12-01 08:28:00| 4023|
|2010-12-01 08:28:00| 4023|
|2010-12-01 08:34:00| 4023|
|2010-12-01 08:34:00| 4023|
|2010-12-01 08:34:00| 4023|
|2010-12-01 08:34:00| 4023|
|2010-12-01 08:34:00| 4023|
|2010-12-01 08:34:00| 4023|
|2010-12-01 08:34:00| 4023|
|2010-12-01 08:34:00| 4023|
|2010-12-01 08:34:00| 4023|
|2010-12-01 08:34:00| 4023|
|2010-12-01 08:34:00| 4023|
+-------------------+--------+
only showing top 20 rows
#months_between()
df.select(col("InvoiceDate"),
months_between(current_date(),col("InvoiceDate")).alias("months_between")
).show()
+-------------------+--------------+
| InvoiceDate|months_between|
+-------------------+--------------+
|2010-12-01 08:26:00| 132.1499552|
|2010-12-01 08:26:00| 132.1499552|
|2010-12-01 08:26:00| 132.1499552|
|2010-12-01 08:26:00| 132.1499552|
|2010-12-01 08:26:00| 132.1499552|
|2010-12-01 08:26:00| 132.1499552|
|2010-12-01 08:26:00| 132.1499552|
|2010-12-01 08:28:00| 132.14991039|
|2010-12-01 08:28:00| 132.14991039|
|2010-12-01 08:34:00| 132.14977599|
|2010-12-01 08:34:00| 132.14977599|
|2010-12-01 08:34:00| 132.14977599|
|2010-12-01 08:34:00| 132.14977599|
|2010-12-01 08:34:00| 132.14977599|
|2010-12-01 08:34:00| 132.14977599|
|2010-12-01 08:34:00| 132.14977599|
|2010-12-01 08:34:00| 132.14977599|
|2010-12-01 08:34:00| 132.14977599|
|2010-12-01 08:34:00| 132.14977599|
|2010-12-01 08:34:00| 132.14977599|
+-------------------+--------------+
only showing top 20 rows
# Extracting year, month, next day, week day
df.select(col("InvoiceDate"),
year(col("InvoiceDate")).alias("year"),
month(col("InvoiceDate")).alias("month"),
next_day(col("InvoiceDate"),"Sunday").alias("next_day"),
weekofyear(col("InvoiceDate")).alias("weekofyear")
).show(10)
+-------------------+----+-----+----------+----------+
| InvoiceDate|year|month| next_day|weekofyear|
+-------------------+----+-----+----------+----------+
|2010-12-01 08:26:00|2010| 12|2010-12-05| 48|
|2010-12-01 08:26:00|2010| 12|2010-12-05| 48|
|2010-12-01 08:26:00|2010| 12|2010-12-05| 48|
|2010-12-01 08:26:00|2010| 12|2010-12-05| 48|
|2010-12-01 08:26:00|2010| 12|2010-12-05| 48|
|2010-12-01 08:26:00|2010| 12|2010-12-05| 48|
|2010-12-01 08:26:00|2010| 12|2010-12-05| 48|
|2010-12-01 08:28:00|2010| 12|2010-12-05| 48|
|2010-12-01 08:28:00|2010| 12|2010-12-05| 48|
|2010-12-01 08:34:00|2010| 12|2010-12-05| 48|
+-------------------+----+-----+----------+----------+
only showing top 10 rows
# Day of the week, day of the month, day of the year
df.select(col("InvoiceDate"),
dayofweek(col("InvoiceDate")).alias("dayofweek"),
dayofmonth(col("InvoiceDate")).alias("dayofmonth"),
dayofyear(col("InvoiceDate")).alias("dayofyear"),
).show()
+-------------------+---------+----------+---------+
| InvoiceDate|dayofweek|dayofmonth|dayofyear|
+-------------------+---------+----------+---------+
|2010-12-01 08:26:00| 4| 1| 335|
|2010-12-01 08:26:00| 4| 1| 335|
|2010-12-01 08:26:00| 4| 1| 335|
|2010-12-01 08:26:00| 4| 1| 335|
|2010-12-01 08:26:00| 4| 1| 335|
|2010-12-01 08:26:00| 4| 1| 335|
|2010-12-01 08:26:00| 4| 1| 335|
|2010-12-01 08:28:00| 4| 1| 335|
|2010-12-01 08:28:00| 4| 1| 335|
|2010-12-01 08:34:00| 4| 1| 335|
|2010-12-01 08:34:00| 4| 1| 335|
|2010-12-01 08:34:00| 4| 1| 335|
|2010-12-01 08:34:00| 4| 1| 335|
|2010-12-01 08:34:00| 4| 1| 335|
|2010-12-01 08:34:00| 4| 1| 335|
|2010-12-01 08:34:00| 4| 1| 335|
|2010-12-01 08:34:00| 4| 1| 335|
|2010-12-01 08:34:00| 4| 1| 335|
|2010-12-01 08:34:00| 4| 1| 335|
|2010-12-01 08:34:00| 4| 1| 335|
+-------------------+---------+----------+---------+
only showing top 20 rows
# printing current timestamp, it brings the cluster fuse
df.select(current_timestamp().alias("current_timestamp")
).show(1,truncate=False)
+-----------------------+
|current_timestamp |
+-----------------------+
|2021-12-06 13:09:37.916|
+-----------------------+
only showing top 1 row
# returning hour, min, second
df.select(col("InvoiceDate"),
hour(col("InvoiceDate")).alias("hour"),
minute(col("InvoiceDate")).alias("minute"),
second(col("InvoiceDate")).alias("second")
).show()
+-------------------+----+------+------+
| InvoiceDate|hour|minute|second|
+-------------------+----+------+------+
|2010-12-01 08:26:00| 8| 26| 0|
|2010-12-01 08:26:00| 8| 26| 0|
|2010-12-01 08:26:00| 8| 26| 0|
|2010-12-01 08:26:00| 8| 26| 0|
|2010-12-01 08:26:00| 8| 26| 0|
|2010-12-01 08:26:00| 8| 26| 0|
|2010-12-01 08:26:00| 8| 26| 0|
|2010-12-01 08:28:00| 8| 28| 0|
|2010-12-01 08:28:00| 8| 28| 0|
|2010-12-01 08:34:00| 8| 34| 0|
|2010-12-01 08:34:00| 8| 34| 0|
|2010-12-01 08:34:00| 8| 34| 0|
|2010-12-01 08:34:00| 8| 34| 0|
|2010-12-01 08:34:00| 8| 34| 0|
|2010-12-01 08:34:00| 8| 34| 0|
|2010-12-01 08:34:00| 8| 34| 0|
|2010-12-01 08:34:00| 8| 34| 0|
|2010-12-01 08:34:00| 8| 34| 0|
|2010-12-01 08:34:00| 8| 34| 0|
|2010-12-01 08:34:00| 8| 34| 0|
+-------------------+----+------+------+
only showing top 20 rows
# So we are going to bring some examples of data with missing values
display(dbutils.fs.ls("/databricks-datasets"))
# inferSchema = True
# header = True
arquivo = "dbfs:/databricks-datasets/flights/"
df = spark \
.read.format("csv")\
.option("inferSchema", "True")\
.option("header", "True")\
.csv(arquivo)
df.show()
+--------+-----+--------+------+-----------+
| date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01011245| 6| 602| ABE| ATL|
|01020600| -8| 369| ABE| DTW|
|01021245| -2| 602| ABE| ATL|
|01020605| -4| 602| ABE| ATL|
|01031245| -4| 602| ABE| ATL|
|01030605| 0| 602| ABE| ATL|
|01041243| 10| 602| ABE| ATL|
|01040605| 28| 602| ABE| ATL|
|01051245| 88| 602| ABE| ATL|
|01050605| 9| 602| ABE| ATL|
|01061215| -6| 602| ABE| ATL|
|01061725| 69| 602| ABE| ATL|
|01061230| 0| 369| ABE| DTW|
|01060625| -3| 602| ABE| ATL|
|01070600| 0| 369| ABE| DTW|
|01071725| 0| 602| ABE| ATL|
|01071230| 0| 369| ABE| DTW|
|01070625| 0| 602| ABE| ATL|
|01071219| 0| 569| ABE| ORD|
|01080600| 0| 369| ABE| DTW|
+--------+-----+--------+------+-----------+
only showing top 20 rows
# Filtering the missing values inside my dataset
df.filter("delay is NULL").show()
+--------------------+-----+--------+------+-----------+
| date|delay|distance|origin|destination|
+--------------------+-----+--------+------+-----------+
|Abbotsford BC Can...| null| null| null| null|
| Aberdeen SD USA ABR| null| null| null| null|
| Abilene TX USA ABI| null| null| null| null|
| Akron OH USA CAK| null| null| null| null|
| Alamosa CO USA ALS| null| null| null| null|
| Albany GA USA ABY| null| null| null| null|
| Albany NY USA ALB| null| null| null| null|
|Albuquerque NM US...| null| null| null| null|
|Alexandria LA USA...| null| null| null| null|
|Allentown PA USA ABE| null| null| null| null|
| Alliance NE USA AIA| null| null| null| null|
| Alpena MI USA APN| null| null| null| null|
| Altoona PA USA AOO| null| null| null| null|
| Amarillo TX USA AMA| null| null| null| null|
|Anahim Lake BC Ca...| null| null| null| null|
|Anchorage AK USA ANC| null| null| null| null|
| Appleton WI USA ATW| null| null| null| null|
|Arviat NWT Canada...| null| null| null| null|
|Asheville NC USA AVL| null| null| null| null|
| Aspen CO USA ASE| null| null| null| null|
+--------------------+-----+--------+------+-----------+
only showing top 20 rows
# similar function
df.filter(df.delay.isNull()).show(10)
# Filling NULL with 0 value
df.na.fill(value=0).show()
+--------+-----+--------+------+-----------+
| date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01011245| 6| 602| ABE| ATL|
|01020600| -8| 369| ABE| DTW|
|01021245| -2| 602| ABE| ATL|
|01020605| -4| 602| ABE| ATL|
|01031245| -4| 602| ABE| ATL|
|01030605| 0| 602| ABE| ATL|
|01041243| 10| 602| ABE| ATL|
|01040605| 28| 602| ABE| ATL|
|01051245| 88| 602| ABE| ATL|
|01050605| 9| 602| ABE| ATL|
|01061215| -6| 602| ABE| ATL|
|01061725| 69| 602| ABE| ATL|
|01061230| 0| 369| ABE| DTW|
|01060625| -3| 602| ABE| ATL|
|01070600| 0| 369| ABE| DTW|
|01071725| 0| 602| ABE| ATL|
|01071230| 0| 369| ABE| DTW|
|01070625| 0| 602| ABE| ATL|
|01071219| 0| 569| ABE| ORD|
|01080600| 0| 369| ABE| DTW|
|01081230| 33| 369| ABE| DTW|
|01080625| 1| 602| ABE| ATL|
|01080607| 5| 569| ABE| ORD|
|01081219| 54| 569| ABE| ORD|
|01091215| 43| 602| ABE| ATL|
|01090600| 151| 369| ABE| DTW|
|01091725| 0| 602| ABE| ATL|
|01091230| -4| 369| ABE| DTW|
|01090625| 8| 602| ABE| ATL|
|01091219| 83| 569| ABE| ORD|
+--------+-----+--------+------+-----------+
only showing top 30 rows
# Filling NULL with 0 value (applied just on delay column)
df.na.fill(value=0, subset=['delay']).show()
+--------+-----+--------+------+-----------+
| date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01011245| 6| 602| ABE| ATL|
|01020600| -8| 369| ABE| DTW|
|01021245| -2| 602| ABE| ATL|
|01020605| -4| 602| ABE| ATL|
|01031245| -4| 602| ABE| ATL|
|01030605| 0| 602| ABE| ATL|
|01041243| 10| 602| ABE| ATL|
|01040605| 28| 602| ABE| ATL|
|01051245| 88| 602| ABE| ATL|
|01050605| 9| 602| ABE| ATL|
|01061215| -6| 602| ABE| ATL|
|01061725| 69| 602| ABE| ATL|
|01061230| 0| 369| ABE| DTW|
|01060625| -3| 602| ABE| ATL|
|01070600| 0| 369| ABE| DTW|
|01071725| 0| 602| ABE| ATL|
|01071230| 0| 369| ABE| DTW|
|01070625| 0| 602| ABE| ATL|
|01071219| 0| 569| ABE| ORD|
|01080600| 0| 369| ABE| DTW|
+--------+-----+--------+------+-----------+
only showing top 20 rows
# All the NUll will be filled with empty string
df.na.fill("").show(100)
+--------+-----+--------+------+-----------+
| date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01011245| 6| 602| ABE| ATL|
|01020600| -8| 369| ABE| DTW|
|01021245| -2| 602| ABE| ATL|
|01020605| -4| 602| ABE| ATL|
|01031245| -4| 602| ABE| ATL|
|01030605| 0| 602| ABE| ATL|
|01041243| 10| 602| ABE| ATL|
|01040605| 28| 602| ABE| ATL|
|01051245| 88| 602| ABE| ATL|
|01050605| 9| 602| ABE| ATL|
|01061215| -6| 602| ABE| ATL|
|01061725| 69| 602| ABE| ATL|
|01061230| 0| 369| ABE| DTW|
|01060625| -3| 602| ABE| ATL|
|01070600| 0| 369| ABE| DTW|
|01071725| 0| 602| ABE| ATL|
|01071230| 0| 369| ABE| DTW|
|01070625| 0| 602| ABE| ATL|
|01071219| 0| 569| ABE| ORD|
|01080600| 0| 369| ABE| DTW|
|01081230| 33| 369| ABE| DTW|
|01080625| 1| 602| ABE| ATL|
|01080607| 5| 569| ABE| ORD|
|01081219| 54| 569| ABE| ORD|
|01091215| 43| 602| ABE| ATL|
|01090600| 151| 369| ABE| DTW|
|01091725| 0| 602| ABE| ATL|
|01091230| -4| 369| ABE| DTW|
|01090625| 8| 602| ABE| ATL|
|01091219| 83| 569| ABE| ORD|
|01101215| -5| 602| ABE| ATL|
|01100600| -5| 369| ABE| DTW|
|01101725| 7| 602| ABE| ATL|
|01101230| -8| 369| ABE| DTW|
|01100625| 52| 602| ABE| ATL|
|01101219| 0| 569| ABE| ORD|
|01111215| 127| 602| ABE| ATL|
|01110600| -9| 369| ABE| DTW|
|01110625| -4| 602| ABE| ATL|
|01121215| -5| 602| ABE| ATL|
|01121725| -1| 602| ABE| ATL|
|01131215| 14| 602| ABE| ATL|
|01130600| -7| 369| ABE| DTW|
|01131725| -6| 602| ABE| ATL|
|01131230| -13| 369| ABE| DTW|
|01130625| 29| 602| ABE| ATL|
|01131219| -8| 569| ABE| ORD|
|01140600| -9| 369| ABE| DTW|
|01141725| -9| 602| ABE| ATL|
|01141230| -8| 369| ABE| DTW|
|01140625| -5| 602| ABE| ATL|
|01141219| -10| 569| ABE| ORD|
|01150600| 0| 369| ABE| DTW|
|01151725| -6| 602| ABE| ATL|
|01151230| 0| 369| ABE| DTW|
|01150625| 0| 602| ABE| ATL|
|01150607| 0| 569| ABE| ORD|
|01151219| 0| 569| ABE| ORD|
|01161215| -10| 602| ABE| ATL|
|01160600| -1| 369| ABE| DTW|
|01161725| -6| 602| ABE| ATL|
|01161230| -7| 369| ABE| DTW|
|01160625| -4| 602| ABE| ATL|
|01161219| 68| 569| ABE| ORD|
|01171215| -8| 602| ABE| ATL|
|01170600| -5| 369| ABE| DTW|
|01171725| 5| 602| ABE| ATL|
|01171230| -10| 369| ABE| DTW|
|01170625| -6| 602| ABE| ATL|
|01171219| -10| 569| ABE| ORD|
|01181215| -12| 602| ABE| ATL|
|01180600| -13| 369| ABE| DTW|
|01180625| 0| 602| ABE| ATL|
|01191215| -16| 602| ABE| ATL|
|01191725| -5| 602| ABE| ATL|
|01201215| -8| 602| ABE| ATL|
|01201725| -5| 602| ABE| ATL|
|01201230| -11| 369| ABE| DTW|
|01200625| -7| 602| ABE| ATL|
|01201219| -6| 569| ABE| ORD|
|01210600| 89| 369| ABE| DTW|
|01211725| 0| 602| ABE| ATL|
|01211230| 44| 369| ABE| DTW|
|01210625| -6| 602| ABE| ATL|
|01211219| 9| 569| ABE| ORD|
|01220600| 80| 369| ABE| DTW|
|01221230| -5| 369| ABE| DTW|
|01220625| 333| 602| ABE| ATL|
|01220607| 219| 569| ABE| ORD|
|01221219| 15| 569| ABE| ORD|
|01231215| -12| 602| ABE| ATL|
|01230600| -3| 369| ABE| DTW|
|01231725| 180| 602| ABE| ATL|
|01231230| -5| 369| ABE| DTW|
|01230625| -8| 602| ABE| ATL|
|01231219| -13| 569| ABE| ORD|
|01241215| -11| 602| ABE| ATL|
|01240600| -3| 369| ABE| DTW|
|01241725| 2| 602| ABE| ATL|
|01241230| -5| 369| ABE| DTW|
+--------+-----+--------+------+-----------+
only showing top 100 rows
df.filter("delay is NULL").show()
# Removing null rows, may not be the best practice since you remove the fully row
df.na.drop().show()
+--------+-----+--------+------+-----------+
| date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01011245| 6| 602| ABE| ATL|
|01020600| -8| 369| ABE| DTW|
|01021245| -2| 602| ABE| ATL|
|01020605| -4| 602| ABE| ATL|
|01031245| -4| 602| ABE| ATL|
|01030605| 0| 602| ABE| ATL|
|01041243| 10| 602| ABE| ATL|
|01040605| 28| 602| ABE| ATL|
|01051245| 88| 602| ABE| ATL|
|01050605| 9| 602| ABE| ATL|
|01061215| -6| 602| ABE| ATL|
|01061725| 69| 602| ABE| ATL|
|01061230| 0| 369| ABE| DTW|
|01060625| -3| 602| ABE| ATL|
|01070600| 0| 369| ABE| DTW|
|01071725| 0| 602| ABE| ATL|
|01071230| 0| 369| ABE| DTW|
|01070625| 0| 602| ABE| ATL|
|01071219| 0| 569| ABE| ORD|
|01080600| 0| 369| ABE| DTW|
+--------+-----+--------+------+-----------+
only showing top 20 rows
# Adding a column to a df
df = df.withColumn('Nova Coluna', df['delay']+2) # Calling the name of my new column, which are going to present delay +2
df.show(10)
+--------+-----+--------+------+-----------+-----------+
| date|delay|distance|origin|destination|Nova Coluna|
+--------+-----+--------+------+-----------+-----------+
|01011245| 6| 602| ABE| ATL| 8.0|
|01020600| -8| 369| ABE| DTW| -6.0|
|01021245| -2| 602| ABE| ATL| 0.0|
|01020605| -4| 602| ABE| ATL| -2.0|
|01031245| -4| 602| ABE| ATL| -2.0|
|01030605| 0| 602| ABE| ATL| 2.0|
|01041243| 10| 602| ABE| ATL| 12.0|
|01040605| 28| 602| ABE| ATL| 30.0|
|01051245| 88| 602| ABE| ATL| 90.0|
|01050605| 9| 602| ABE| ATL| 11.0|
+--------+-----+--------+------+-----------+-----------+
only showing top 10 rows
# Renaming a column
df.withColumnRenamed('Nova Coluna','Delay_2').show()
+--------+-----+--------+------+-----------+-------+
| date|delay|distance|origin|destination|Delay_2|
+--------+-----+--------+------+-----------+-------+
|01011245| 6| 602| ABE| ATL| 8.0|
|01020600| -8| 369| ABE| DTW| -6.0|
|01021245| -2| 602| ABE| ATL| 0.0|
|01020605| -4| 602| ABE| ATL| -2.0|
|01031245| -4| 602| ABE| ATL| -2.0|
|01030605| 0| 602| ABE| ATL| 2.0|
|01041243| 10| 602| ABE| ATL| 12.0|
|01040605| 28| 602| ABE| ATL| 30.0|
|01051245| 88| 602| ABE| ATL| 90.0|
|01050605| 9| 602| ABE| ATL| 11.0|
|01061215| -6| 602| ABE| ATL| -4.0|
|01061725| 69| 602| ABE| ATL| 71.0|
|01061230| 0| 369| ABE| DTW| 2.0|
|01060625| -3| 602| ABE| ATL| -1.0|
|01070600| 0| 369| ABE| DTW| 2.0|
|01071725| 0| 602| ABE| ATL| 2.0|
|01071230| 0| 369| ABE| DTW| 2.0|
|01070625| 0| 602| ABE| ATL| 2.0|
|01071219| 0| 569| ABE| ORD| 2.0|
|01080600| 0| 369| ABE| DTW| 2.0|
+--------+-----+--------+------+-----------+-------+
only showing top 20 rows
# Removing the new column
df = df.drop('Nova Coluna')
df.show(10)
+--------+-----+--------+------+-----------+
| date|delay|distance|origin|destination|
+--------+-----+--------+------+-----------+
|01011245| 6| 602| ABE| ATL|
|01020600| -8| 369| ABE| DTW|
|01021245| -2| 602| ABE| ATL|
|01020605| -4| 602| ABE| ATL|
|01031245| -4| 602| ABE| ATL|
|01030605| 0| 602| ABE| ATL|
|01041243| 10| 602| ABE| ATL|
|01040605| 28| 602| ABE| ATL|
|01051245| 88| 602| ABE| ATL|
|01050605| 9| 602| ABE| ATL|
+--------+-----+--------+------+-----------+
only showing top 10 rows
from pyspark.sql.types import LongType
def quadrado(s):
return s * s
# registering in spark database and adjusting the returning type
from pyspark.sql.types import LongType
spark.udf.register("Func_Py_Quadrado", quadrado, LongType())
# naming my registered function("Func_PY..."), then calling the function quadrado and the LongType imported
Out[57]: <function __main__.quadrado(s)>
# generating random values
spark.range(1, 20).show()
+---+
| id|
+---+
| 1|
| 2|
| 3|
| 4|
| 5|
| 6|
| 7|
| 8|
| 9|
| 10|
| 11|
| 12|
| 13|
| 14|
| 15|
| 16|
| 17|
| 18|
| 19|
+---+
# create a view "View_temp" that was created from the random values we made
spark.range(1, 20).createOrReplaceTempView("View_temp")
%sql
-- Using a python function mixed with SQL code
select id, Func_Py_Quadrado(id) as id_ao_quadrado
from View_temp
-- calling the function (Fun_Py...)and bringing the id value (the square) as id_ao_quadrado
from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
Func_Py_Quadrado = udf(quadrado, LongType())
# obviously the square function already exists, but we are going to register as an object type UDF
# Creating a dataframe from my view
df = spark.table("View_temp")
df.show()
+---+
| id|
+---+
| 1|
| 2|
| 3|
| 4|
| 5|
| 6|
| 7|
| 8|
| 9|
| 10|
| 11|
| 12|
| 13|
| 14|
| 15|
| 16|
| 17|
| 18|
| 19|
+---+
display(df.select("id", Func_Py_Quadrado("id").alias("id_quadrado")))
# Calling the Func_... on the "id" feature and naming the result as "id_quadrado"
# So a python function was created and then used inside a df
import numpy as np
import pandas as pd
import databricks.koalas as ks
# As we can see koalas is a project from databricks
pdf = pd.DataFrame({'A': np.random.rand(5),
'B': np.random.rand(5)})
type(pdf)
Out[65]: pandas.core.frame.DataFrame
# Creating a koala df
kdf = ks.DataFrame({'A': np.random.rand(5),
'B': np.random.rand(5)})
type(kdf)
Out[67]: databricks.koalas.frame.DataFrame
# Creating a koala df from a pd df
kdf = ks.DataFrame(pdf)
type(kdf)
Out[68]: databricks.koalas.frame.DataFrame
# other ways to transform
kdf = ks.from_pandas(pdf)
type(kdf)
pdf.head()
A B
0 0.813516 0.527949
1 0.683850 0.468637
2 0.215054 0.058884
3 0.576319 0.378034
4 0.313028 0.762469
kdf.head()
A B
0 0.599634 0.706762
1 0.534563 0.055601
2 0.704126 0.035550
3 0.161522 0.768715
4 0.304619 0.921843
kdf.describe()
A B
count 5.000000 5.000000
mean 0.460893 0.497694
std 0.222420 0.420145
min 0.161522 0.035550
25% 0.304619 0.055601
50% 0.534563 0.706762
75% 0.599634 0.768715
max 0.704126 0.921843
# sorting a df
kdf.sort_values(by='B')
A B
2 0.704126 0.035550
1 0.534563 0.055601
0 0.599634 0.706762
3 0.161522 0.768715
4 0.304619 0.921843
# cell layout setting
from databricks.koalas.config import set_option, get_option
ks.get_option('compute.max_rows')
ks.set_option('compute.max_rows', 2000)
# slice
kdf[['A', 'B']]
A B
0 0.813516 0.527949
1 0.683850 0.468637
2 0.215054 0.058884
3 0.576319 0.378034
4 0.313028 0.762469
# loc
kdf.loc[1:2]
A B
1 0.683850 0.468637
2 0.215054 0.058884
# iloc
kdf.iloc[:3, 1:2]
B
0 0.527949
1 0.468637
2 0.058884
** Using python functions with koala**
def quadrado(x):
return x ** 2
# enabling frame and series data
from databricks.koalas.config import set_option, reset_option
set_option("compute.ops_on_diff_frames", True)
# creating a column from a python function, and to do that we call apply
kdf['C'] = kdf.A.apply(quadrado)
/databricks/spark/python/pyspark/sql/pandas/functions.py:386: UserWarning: In Python 3.6+ and Spark 3.0+, it is preferred to specify type hints for pandas UDF instead of specifying pandas UDF type which will be deprecated in the future releases. See SPARK-28264 for more details.
warnings.warn(
kdf.head()
A B C
0 0.813516 0.527949 0.661809
1 0.683850 0.468637 0.467650
2 0.215054 0.058884 0.046248
3 0.576319 0.378034 0.332144
4 0.313028 0.762469 0.097987
# grouping data
kdf.groupby('A').sum()
B C
A
0.813516 0.527949 0.661809
0.683850 0.468637 0.467650
0.215054 0.058884 0.046248
0.576319 0.378034 0.332144
0.313028 0.762469 0.097987
# grouping multiple columns
kdf.groupby(['A', 'B']).sum()
C
A B
0.813516 0.527949 0.661809
0.683850 0.468637 0.467650
0.215054 0.058884 0.046248
0.576319 0.378034 0.332144
0.313028 0.762469 0.097987
Using SQL on Koalas
# koala df
kdf = ks.DataFrame({'year': [1990, 1997, 2003, 2009, 2014],
'pig': [20, 18, 489, 675, 1776],
'horse': [4, 25, 281, 600, 1900]})
# query on koala df
ks.sql("SELECT * FROM {kdf} WHERE pig > 100")
year pig horse
0 2003 489 281
1 2009 675 600
2 2014 1776 1900
# creates a pd df
pdf = pd.DataFrame({'year': [1990, 1997, 2003, 2009, 2014],
'sheep': [22, 50, 121, 445, 791],
'chicken': [250, 326, 589, 1241, 2118]})
# Query with inner join between pd and koala df
ks.sql('''
SELECT ks.pig, pd.chicken
FROM {kdf} ks INNER JOIN {pdf} pd
ON ks.year = pd.year
ORDER BY ks.pig, pd.chicken''')
# select the features pig from koala and chicken from pd
# calling koala df to inner join the pd df
# where year = year in both df
# ordering by both tables
pig chicken
0 18 326
1 20 250
2 489 589
3 675 1241
4 1776 2118
# converting koalas df to Pyspark df
kdf = ks.DataFrame({'A': [1, 2, 3, 4, 5], 'B': [10, 20, 30, 40, 50]})
pydf = kdf.to_spark()
So now we arrived to the end of our exercise. This was a great module that taught me multiple things about BigData and eased the fear I had about trying to learn this new step of Data Science, and with a bit of study and discipline it may not be a big monster. I hope you enjoyed 👾!