Skip to content

Latest commit

 

History

History
2269 lines (1984 loc) · 68.3 KB

pyspark_cookbook.org

File metadata and controls

2269 lines (1984 loc) · 68.3 KB

PySpark Cookbook

Functions

Introduction

To create an empty dataframe

import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("A", T.ArrayType(T.StringType()), True),
        T.StructField("B", T.ArrayType(T.StringType()), True),
    ]
)
data = []
df = spark.createDataFrame(schema=schema, data=data)
df.show()
AB

To create a dataframe with columns key and value from a dictionary

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

dict_a = {"key1": "value1", "key2": "value2"}
values = [(k, v) for k, v in dict_a.items()]
columns = ["key", "value"]

df = spark.createDataFrame(values, columns)
df.show()
keyvalue
key1value1
key2value2

To duplicate a column

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

dict_a = {"key1": "value1", "key2": "value2"}
values = [(k, v) for k, v in dict_a.items()]
columns = ["key", "value"]

df = spark.createDataFrame(values, columns)
df = df.withColumn("value_dup", F.col("value"))
df.show()
keyvaluevalue_dup
key1value1value1
key2value2value2

To rename a column using .withColumnRenamed()

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

dict_a = {"key1": "value1", "key2": "value2"}
values = [(k, v) for k, v in dict_a.items()]
columns = ["key", "value"]

df = spark.createDataFrame(values, columns)
print("Original dataframe:")
df.show()
df = df.withColumnRenamed("key", "new_key") \
        .withColumnRenamed("value","new_value")
print("Modified dataframe:")
df.show()
keyvalue
key1value1
key2value2

Modified dataframe:

new_keynew_value
key1value1
key2value2

To rename a column using .withColumnsRenamed()

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

dict_a = {"key1": "value1", "key2": "value2"}
values = [(k, v) for k, v in dict_a.items()]
columns = ["key", "value"]

df = spark.createDataFrame(values, columns)
print("Original dataframe:")
df.show()
df = df.withColumnsRenamed({"key": "new_key", "value": "new_value"})
print("Modified dataframe:")
df.show()
keyvalue
key1value1
key2value2

Modified dataframe:

new_keynew_value
key1value1
key2value2

To rename a column using .select()

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

dict_a = {"key1": "value1", "key2": "value2"}
values = [(k, v) for k, v in dict_a.items()]
columns = ["key", "value"]
df = spark.createDataFrame(values, columns)
print("Original dataframe:")
df.show()

df = df.select(F.col("key").alias("new_key"), F.col("value").alias("new_value"))
print("Modified dataframe:")
df.show()
keyvalue
key1value1
key2value2

Modified dataframe:

new_keynew_value
key1value1
key2value2

To rename columns by adding a prefix

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate()

schema = T.StructType(
    [
        T.StructField("index", T.IntegerType(), True),
        T.StructField("value", T.StringType(), True),
    ]
)
data = [(1, "Home"),
        (2, "School"),
        (3, "Home"),]
df = spark.createDataFrame(schema=schema, data=data)
print("Original dataframe:")
df.show()
print("Dataframe with renamed columns:")
df = df.select(*[F.col(k).alias(f"prefix_{k}") for k in df.columns])
df.show()
indexvalue
1Home
2School
3Home

Dataframe with renamed columns:

prefix_indexprefix_value
1Home
2School
3Home

To drop columns from a dataframe

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

dict_a = {"key1": "value1", "key2": "value2"}
values = [(k, v) for k, v in dict_a.items()]
columns = ["key", "value"]
df = spark.createDataFrame(values, columns)

df = df.withColumn("const", F.lit(1))
print("Original dataframe:")
df.show()

df = df.drop("value", "const")
print("Modified dataframe:")
df.show()
keyvalueconst
key1value11
key2value21

Modified dataframe:

key
key1
key2

To subset columns of a dataframe

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

dict_a = {"key1": "value1", "key2": "value2"}
values = [(k, v) for k, v in dict_a.items()]
columns = ["key", "value"]
df = spark.createDataFrame(values, columns)
df = df.withColumn("const", F.lit(1))
print("Original dataframe:")
df.show()
print("Subset 'key', 'value' columns:")
df["key", "value"].show()
print("Subset 'key', 'const' columns:")
df.select("key", "const").show()
keyvalueconst
key1value11
key2value21

Subset ‘key’, ‘value’ columns:

keyvalue
key1value1
key2value2

Subset ‘key’, ‘const’ columns:

keyconst
key11
key21

To add a column with a constant value using F.lit()

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

dict_a = {"key1": "value1", "key2": "value2"}
values = [(k, v) for k, v in dict_a.items()]
columns = ["key", "value"]
df = spark.createDataFrame(values, columns)
print("Original dataframe:")
df.show()

df = df.withColumn("const_integer", F.lit(1))
df = df.withColumn("const_string", F.lit("string"))
print("Modified dataframe:")
df.show()
keyvalue
key1value1
key2value2

Modified dataframe:

keyvalueconst_integerconst_string
key1value11string
key2value21string

To add a column with a constant value using .select()

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

dict_a = {"key1": "value1", "key2": "value2"}
values = [(k, v) for k, v in dict_a.items()]
columns = ["key", "value"]
df = spark.createDataFrame(values, columns)
print("Original dataframe:")
df.show()

df = df.select("key", "value", F.lit("const_str").alias("constant_value"))
print("Modified dataframe:")
df.show()
keyvalue
key1value1
key2value2

Modified dataframe:

keyvalueconstant_value
key1value1const_str
key2value2const_str

To create a dataframe from a list of tuples

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

values = [(1, ["A", "B"]), (2, ["C", "D"]), (3, ["E", "F"])]
columns = ["integer", "characters"]

df = spark.createDataFrame(values, columns)
df.show()
integercharacters
1[A, B]
2[C, D]
3[E, F]

To get the number of rows of a dataframe

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

values = [(1, ["A", "B"]), (2, ["C", "D"]), (3, ["E", "F"])]
columns = ["integer", "characters"]

df = spark.createDataFrame(values, columns)
df.show()
num_rows = df.count()
print(f"df has {num_rows} rows")
integercharacters
1[A, B]
2[C, D]
3[E, F]

df has 3 rows

To select first N rows

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

values = [(1, ["A", "B"]), (2, ["C", "D"]), (3, ["E", "F"])]
columns = ["integer", "characters"]

df = spark.createDataFrame(values, columns)
df.show()
print("These are first 2 rows:")
df.limit(2).show()
integercharacters
1[A, B]
2[C, D]
3[E, F]

These are first 2 rows:

integercharacters
1[A, B]
2[C, D]

To deduplicate rows

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate()

schema = T.StructType(
    [
        T.StructField("key", T.IntegerType(), True),
        T.StructField("value", T.StringType(), True),
        T.StructField("comment", T.StringType(), True),
    ]
)
data = [(1, "Home", "a house"),
        (1, "Home", "a house"),
        (2, "School", "a building"),
        (2, "School", "a house"),
        (3, "Home", "a house"),]
df = spark.createDataFrame(schema=schema, data=data)

print("Original dataframe:")
df.show()

print("Dataframe with distinct rows:")
df.distinct().show()

print("Dataframe with dropped duplicate rows:")
df.dropDuplicates().show()

print("Dataframe with dropped duplicates in columns 'key' and 'value':")
df = df.dropDuplicates(subset=["key", "value"])
df.show()
keyvaluecomment
1Homea house
1Homea house
2Schoola building
2Schoola house
3Homea house

Dataframe with distinct rows:

keyvaluecomment
2Schoola house
3Homea house
2Schoola building
1Homea house

Dataframe with dropped duplicate rows:

keyvaluecomment
2Schoola house
3Homea house
2Schoola building
1Homea house

Dataframe with dropped duplicates in columns ‘key’ and ‘value’:

keyvaluecomment
1Homea house
2Schoola building
3Homea house

To convert a column to a list using a lambda function

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

values = [(1, ["A", "B"]), (2, ["C", "D"]), (3, ["E", "F"])]
columns = ["integer", "characters"]

df = spark.createDataFrame(values, columns)
df.show()
lst = df.select("integer").rdd.map(lambda r: r[0]).collect()
print(f"Column \"integer\" has values: <<tld>>{lst}<<tld>>")
integercharacters
1[A, B]
2[C, D]
3[E, F]

Column “integer” has values: [1, 2, 3]

To convert a dataframe to a list of dictionaries corresponding to every row

import pprint
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

values = [(1, ["A", "B"]), (2, ["C", "D"]), (3, ["E", "F"])]
columns = ["integer", "characters"]

df = spark.createDataFrame(values, columns)
df.show()
lst_dict = df.rdd.map(lambda row: row.asDict()).collect()
print(f"Dataframe is represented as:\n")
txt = pprint.pformat(lst_dict)<<txtblk("txt")>>
integercharacters
1[A, B]
2[C, D]
3[E, F]

Dataframe is represented as:

[{'characters': ['A', 'B'], 'integer': 1},
 {'characters': ['C', 'D'], 'integer': 2},
 {'characters': ['E', 'F'], 'integer': 3}]

To convert a column to a list using list comprehension

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

values = [(1, ["A", "B"]), (2, ["C", "D"]), (3, ["E", "F"])]
columns = ["integer", "characters"]

df = spark.createDataFrame(values, columns)
df.show()
lst = [k["integer"] for k in df.select("integer").rdd.collect()]
print(f"Column \"integer\" has values: <<tld>>{lst}<<tld>>")
integercharacters
1[A, B]
2[C, D]
3[E, F]

Column “integer” has values: [1, 2, 3]

To convert a column to a list using Pandas

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

values = [(1, ["A", "B"]), (2, ["C", "D"]), (3, ["E", "F"])]
columns = ["integer", "characters"]

df = spark.createDataFrame(values, columns)
df.show()
lst = df.select("integer").toPandas()["integer"].tolist()
print(f"Column \"integer\" has values: <<tld>>{lst}<<tld>>")
integercharacters
1[A, B]
2[C, D]
3[E, F]

Column “integer” has values: [1, 2, 3]

To display full width of a column (do not truncate)

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("sentence", T.ArrayType(T.StringType()), True),
    ]
)
data = [(["A", "very", "long", "sentence"],),
        (["with", "many", "words", "."],)]
df = spark.createDataFrame(schema=schema, data=data)

print("Truncated output (default behavior):")
<<prettify_table("df.show()")>>df.show()

print("Truncated to 15 characters output:")
<<prettify_table("df.show(truncate\07515)")>>df.show(truncate=15)

print("Non-truncated output (show all):")
<<prettify_table("df.show(truncate\u003dFalse)")>>df.show(truncate=False)
sentence
[A, very, long, s…
[with, many, word…

Truncated to 15 characters output:

sentence
[A, very, lo…
[with, many,…

Non-truncated output (show all):

sentence
[A, very, long, sentence]
[with, many, words, .]

Filtering rows

To filter based on values of a column

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()

schema = T.StructType(
    [
        T.StructField("Location", T.StringType(), True),
        T.StructField("Product", T.StringType(), True),
        T.StructField("Quantity", T.IntegerType(), True),
    ]
)
data = [("Home", "Laptop", 12),
        ("Home", "Monitor", None),
        ("Home", "Keyboard", 9),
        ("Office", "Laptop", None),
        ("Office", "Monitor", 10),
        ("Office", "Mouse", 9)]
df = spark.createDataFrame(schema=schema, data=data)

print("Original dataframe:")
df.show()

print('Filter: <<tld>>F.col("Location" == "Home")<<tld>>')
dft = df.filter(F.col("Location") == "Home")
dft.show()

print('Filter: <<tld>>F.col("Quantity").isNull()<<tld>>')
dft = df.filter(F.col("Quantity").isNull())
dft.show()

print('Filter: <<tld>>F.col("Quantity").isNotNull()<<tld>>')
dft = df.filter(F.col("Quantity").isNotNull())
dft.show()

print('Filter: <<tld>>(F.col("Location") == "Home") & (F.col("Product") == "Laptop"))<<tld>>')
dft = df.filter((F.col("Location") == "Home") & (F.col("Product") == "Laptop"))
dft.show()

print('Filter: <<tld>>(F.col("Location") == "Home") & !(F.col("Product") == "Laptop"))<<tld>>')
dft = df.filter((F.col("Location") == "Home") & ~(F.col("Product") == "Laptop"))
dft.show()

print('Filter: <<tld>>(F.col("Product") == "Laptop") | (F.col("Product") == "Mouse"))<<tld>>')
dft = df.filter((F.col("Product") == "Laptop") | (F.col("Product") == "Mouse"))
dft.show()

print('Filter: <<tld>>F.col("Product").isin(["Laptop", "Mouse"])<<tld>>')
dft = df.filter(F.col("Product").isin(["Laptop", "Mouse"]))
dft.show()
LocationProductQuantity
HomeLaptop12
HomeMonitornull
HomeKeyboard9
OfficeLaptopnull
OfficeMonitor10
OfficeMouse9

Filter: F.col("Location" == "Home")

LocationProductQuantity
HomeLaptop12
HomeMonitornull
HomeKeyboard9

Filter: F.col("Quantity").isNull()

LocationProductQuantity
HomeMonitornull
OfficeLaptopnull

Filter: F.col("Quantity").isNotNull()

LocationProductQuantity
HomeLaptop12
HomeKeyboard9
OfficeMonitor10
OfficeMouse9

Filter: (F.col("Location") == "Home") & (F.col("Product") == "Laptop"))

LocationProductQuantity
HomeLaptop12

Filter: (F.col("Location") == "Home") & !(F.col("Product") == "Laptop"))

LocationProductQuantity
HomeMonitornull
HomeKeyboard9

Filter: (F.col("Product") == "Laptop") | (F.col("Product") == "Mouse"))

LocationProductQuantity
HomeLaptop12
OfficeLaptopnull
OfficeMouse9

Filter: F.col("Product").isin(["Laptop", "Mouse"])

LocationProductQuantity
HomeLaptop12
OfficeLaptopnull
OfficeMouse9

Array operations

To create arrays of different lengths

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("A", T.ArrayType(T.IntegerType()), True),
        T.StructField("B", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, 2], [2, 3, 4, 5]),
        ([4, 5, 6], [2, 3, 4, 5])]
df = spark.createDataFrame(schema=schema, data=data)
dft = df.select("A", "B")
dft.show()
AB
[1, 2][2, 3, 4, 5]
[4, 5, 6][2, 3, 4, 5]

To calculate set difference

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("A", T.ArrayType(T.StringType()), True),
        T.StructField("B", T.ArrayType(T.StringType()), True),
    ]
)
data = [(["b", "a", "c"], ["c", "d", "a", "f"])]
df = spark.createDataFrame(schema=schema, data=data)

dft = df.select("A", "B",
          F.array_except("A", "B").alias("A\B"),
          F.array_except("B", "A").alias("B\A"))
dft.show()
ABA\BB\A
[b, a, c][c, d, a, f][b][d, f]

To calculate set union

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("A", T.ArrayType(T.StringType()), True),
        T.StructField("B", T.ArrayType(T.StringType()), True),
    ]
)
data = [(["b", "a", "c"], ["c", "d", "a", "f"])]
df = spark.createDataFrame(schema=schema, data=data)
dft = df.select("A", "B",
          F.array_union("A", "B").alias("A U B"))
dft.show()
ABA U B
[b, a, c][c, d, a, f][b, a, c, d, f]

To calculate set intersection

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("A", T.ArrayType(T.StringType()), True),
        T.StructField("B", T.ArrayType(T.StringType()), True),
    ]
)
data = [(["b", "a", "c"], ["c", "d", "a", "f"])]
df = spark.createDataFrame(schema=schema, data=data)
dft = df.select("A", "B", F.array_intersect("A", "B").alias("A \u2229 B"))
dft.show()
ABA ∩ B
[b, a, c][c, d, a, f][a, c]

To pad arrays with value

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("A", T.ArrayType(T.IntegerType()), True),
        T.StructField("B", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, 2], [2, 3, 4, 5]),
        ([4, 5, 6], [2, 3, 4, 5])]
df = spark.createDataFrame(schema=schema, data=data)
n = 4
fill_value = 0
df1 = df.withColumn("A_padding", F.expr(f"array_repeat({fill_value}, {n} - size(A))"))
df1 = df1.withColumn("A_padded", F.concat("A", "A_padding"))
dft = df1.select("A", "A_padding", "A_padded")
dft.show()

df2 = df.withColumn("A_padding", F.array_repeat(F.lit(fill_value), F.lit(n) - F.size("A")))
df2 = df2.withColumn("A_padded", F.concat("A", "A_padding"))
dft = df2.select("A", "A_padding", "A_padded")
dft.show()
AA_paddingA_padded
[1, 2][0, 0][1, 2, 0, 0]
[4, 5, 6][0][4, 5, 6, 0]
AA_paddingA_padded
[1, 2][0, 0][1, 2, 0, 0]
[4, 5, 6][0][4, 5, 6, 0]

To sum two arrays elementwise using F.element_at()

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("A", T.ArrayType(T.IntegerType()), True),
        T.StructField("B", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, 2], [2, 3, 4, 5]),
        ([4, 5, 6], [2, 3, 4, 5])]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn("A_padding", F.array_repeat(F.lit(fill_value), F.lit(n) - F.size("A")))
df = df.withColumn("A_padded", F.concat("A", "A_padding"))
df = df.withColumn("AB_sum", F.expr('transform(A_padded, (element, index) -> element + element_at(B, index + 1))'))
dft = df.select("A", "A_padded", "B", "AB_sum")
dft.show()
AA_paddedBAB_sum
[1, 2][1, 2, 0, 0][2, 3, 4, 5][3, 5, 4, 5]
[4, 5, 6][4, 5, 6, 0][2, 3, 4, 5][6, 8, 10, 5]

To sum two arrays using F.arrays_zip()

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("A", T.ArrayType(T.IntegerType()), True),
        T.StructField("B", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, 2], [2, 3, 4, 5]),
        ([4, 5, 6], [2, 3, 4, 5])]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn("A_padding", F.array_repeat(F.lit(fill_value), F.lit(n) - F.size("A")))
df = df.withColumn("A_padded", F.concat("A", "A_padding"))
df = df.withColumn("AB_sum", F.expr("transform(arrays_zip(A_padded, B), x -> x.A_padded + x.B)"))
dft = df.select("A", "A_padded", "B", "AB_sum")
dft.show()
AA_paddedBAB_sum
[1, 2][1, 2, 0, 0][2, 3, 4, 5][3, 5, 4, 5]
[4, 5, 6][4, 5, 6, 0][2, 3, 4, 5][6, 8, 10, 5]

To find mode of an array (most common element)

from collections import Counter
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("A", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, 2, 2, 4],),
        ([4, 5, 6, 7],),
        ([1, 1, 2, 2],)]
df = spark.createDataFrame(schema=schema, data=data)

@F.udf
def udf_mode(x):
    return Counter(x).most_common(1)[0][0]

dft = df.withColumn("mode", udf_mode("A"))
<<print_schema("dft")>>dft.printSchema()
dft.show()

Amode
[1, 2, 2, 4]2
[4, 5, 6, 7]4
[1, 1, 2, 2]1

To calculate difference of two consecutive elements in an array

import numpy as np
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("id", T.StringType(), True),
        T.StructField("values", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [("A", [4, 1, 0, 2]),
        ("B", [1, 0, 3, 1])]
df = spark.createDataFrame(schema=schema, data=data)

@F.udf(returnType=T.ArrayType(T.IntegerType()))
def diff_of_two_consecutive_elements(x):
    return np.ediff1d(np.array(x)).tolist()

df = df.withColumn("diff", diff_of_two_consecutive_elements(F.col("values")))
df.show()
<<print_schema("df")>>df.printSchema()
idvaluesdiff
A[4, 1, 0, 2][-3, -1, 2]
B[1, 0, 3, 1][-1, 3, -2]

Schema of df is:

root
 |-- id: string (nullable = true)
 |-- values: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- diff: array (nullable = true)
 |    |-- element: integer (containsNull = true)

To apply a function to every element of an array

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("words_with_suffixes", T.ArrayType(T.StringType()), True)
    ]
)
data = [(["pen_10", "note_11", "bottle_12"],), (["apple_13", "orange_14", "lemon_15"],),]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn("words", F.transform("words_with_suffixes", lambda x: F.split(x, "_").getItem(0)))
df.show(truncate=False)
words_with_suffixeswords
[pen_10, note_11, bottle_12][pen, note, bottle]
[apple_13, orange_14, lemon_15][apple, orange, lemon]

To deduplicate elements in an array (find unique/distinct elements)

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("words", T.ArrayType(T.StringType()), True)
    ]
)
data = [(["pen", "note", "pen"],), (["apple", "apple", "lemon"],),]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn("unique_words", F.array_distinct("words"))
df.show(truncate=False)
wordsunique_words
[pen, note, pen][pen, note]
[apple, apple, lemon][apple, lemon]

To create a map (dictionary) from two arrays (one with keys, one with values)

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[1]").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("keys", T.ArrayType(T.IntegerType()), True),
        T.StructField("values", T.ArrayType(T.StringType()), True),
    ]
)
data = [([1, 2, 3], ["A", "B", "C"])]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn("map_kv", F.map_from_arrays("keys", "values"))
df.show(truncate=False)
keysvaluesmap_kv
[1, 2, 3][A, B, C]{1 -> A, 2 -> B, 3 -> C}

To calculate mean of an array

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("values", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, 2],),
        ([4, 5, 6],)]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn("mean", F.aggregate(
          "values",                                  # column
          F.lit(0),                                  # initialValue
          lambda acc, x: acc + x,                    # merge operation
          lambda acc: acc / F.size(F.col("values")), # finish
      ))
df.show()
valuesmean
[1, 2]1.5
[4, 5, 6]5.0

To find out whether an array has any negative elements

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("values", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, -2],),
        ([4, 5, 6],)]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn("any_negative", F.exists("values", lambda x: x < 0))
df.show()
valuesany_negative
[1, -2]true
[4, 5, 6]false

To convert elements of an array to columns

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("A", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, 2, 3, 4],),
        ([5, 6, 7],)]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn("first", F.col("A").getItem(0))
dft = df.select("A", "first", *[F.col("A").getItem(k).alias(f"element_{k+1}") for k in range(1,4)])
dft.show()
Afirstelement_2element_3element_4
[1, 2, 3, 4]1234
[5, 6, 7]567null

To find location of the first occurence of an element in an array

import pyspark.sql.functions as F
import pyspark.sql.types as T
import pandas as pd
from pyspark.sql import SparkSession
import numpy as np
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("values", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, 7, 5],),
        ([7, 4, 7],)]
df = spark.createDataFrame(schema=schema, data=data)

df = df.withColumn("position", F.array_position(F.col("values"), 7))
df.show()
valuesposition
[1, 7, 5]2
[7, 4, 7]1

To calculate moving difference of two consecutive elements in an array

import pyspark.sql.functions as F
import pyspark.sql.types as T
import pandas as pd
from pyspark.sql import SparkSession
import numpy as np
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("values", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, 2, 5],),
        ([4, 4, 6],)]
df = spark.createDataFrame(schema=schema, data=data)

@F.pandas_udf(T.ArrayType(T.IntegerType()))
def diff2e(x: pd.Series) -> pd.Series:
    return x.apply(lambda x: (x[1:] - x[:-1]))

@F.udf(returnType=T.ArrayType(T.IntegerType()))
def diff_of_two_consecutive_elements(x):
    return np.ediff1d(np.array(x)).tolist()

df = df.withColumn("diff2e", diff2e(F.col("values")))
df = df.withColumn("ediff1d", diff_of_two_consecutive_elements(F.col("values")))
df.show()
valuesdiff2eediff1d
[1, 2, 5][1, 3][1, 3]
[4, 4, 6][0, 2][0, 2]

To slice an array

import pyspark.sql.functions as F
import pyspark.sql.types as T
import pandas as pd
from pyspark.sql import SparkSession
import numpy as np
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("values", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, 7, 5, 2],),
        ([6, 4, 7, 3],)]
df = spark.createDataFrame(schema=schema, data=data)

df = df.withColumn("values[1:3]", F.slice("values", start=2, length=2))
df.show()
valuesvalues[1:3]
[1, 7, 5, 2][7, 5]
[6, 4, 7, 3][4, 7]

To slice an array dynamically

import pyspark.sql.functions as F
import pyspark.sql.types as T
import pandas as pd
from pyspark.sql import SparkSession
import numpy as np
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("values", T.ArrayType(T.IntegerType()), True),
    ]
)
data = [([1, 7, 5],),
        ([6, 4, 7, 3],)]
df = spark.createDataFrame(schema=schema, data=data)
start_idx = 2
df = df.withColumn("values[1:]", F.slice("values", start=2, length=(F.size("values") - F.lit(start_idx - 1))))
df.show()
valuesvalues[1:]
[1, 7, 5][7, 5]
[6, 4, 7, 3][4, 7, 3]

Text processing

To remove prefix from a string using a UDF

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("text", T.StringType(), True),
    ]
)
data = [("id_orange",),
        ("apple",)]
df = spark.createDataFrame(schema=schema, data=data)
remove_prefix = F.udf(lambda x: x[3:] if x[:3] == "id_" else x, T.StringType())
df = df.withColumn("no_prefix", remove_prefix(F.col("text")))
df.show()
textno_prefix
id_orangeorange
appleapple

To split a string into letters (characters) using regex

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("String", T.StringType(), True)
    ]
)
data = [["This is"]]
df = spark.createDataFrame(schema=schema, data=data)
dft = df.select('String', F.split('String', '(?!$)').alias("Characters"))
dft.show(truncate=False)
StringCharacters
This is[T, h, i, s, , i, s]

To concatenate columns with strings using a separator

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("Str1", T.StringType(), True),
        T.StructField("Str2", T.StringType(), True)
    ]
)
data = [("This is", "a string"),
        ("on a", "different row")]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn("Str_Concat", F.concat_ws( "_", "Str1", "Str2"))
df.show()
Str1Str2Str_Concat
This isa stringThis is_a string
on adifferent rowon a_different row

To split a string into letters (characters) using split function

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("String", T.StringType(), True)
    ]
)
data = [["This is"]]
df = spark.createDataFrame(schema=schema, data=data)
fsplit = F.expr("split(String, '')")
dft = df.select('String', fsplit.alias("Characters"))
dft.show(truncate=False)
StringCharacters
This is[T, h, i, s, , i, s]

To split a string into letters (characters) and remove last character

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("String", T.StringType(), True)
    ]
)
data = [["This is_"]]
df = spark.createDataFrame(schema=schema, data=data)
print("Using split function and remove last character:")
fsplit = "split(String, '')"
fsplit = F.expr(f'slice({fsplit}, 1, size({fsplit}) - 1)')
dft = df.select('String', fsplit.alias("Characters"))
dft.show(truncate=False)
StringCharacters
This is_[T, h, i, s, , i, s]

To append a string to all values in a column

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("Str1", T.StringType(), True),
        T.StructField("Str2", T.StringType(), True)
    ]
)
data = [("This is", "a string"),
        ("on a", "different row")]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn("Str1_with_prefix", F.concat(F.lit("Prefix_"), "Str1"))
dft = df.select("Str1", "Str1_with_prefix")
dft.show()
Str1Str1_with_prefix
This isPrefix_This is
on aPrefix_on a

Time operations

To calculate cumulative sum of a column

import pandas as pd
from pyspark.sql import Window
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
df = pd.DataFrame({'time': [0, 1, 2, 3, 4, 5],
                   'value': [False, False, True, False, True, True]})

df = spark.createDataFrame(df)
df = df.withColumn("cml_n_true", F.sum((F.col("value") == True).cast("int")).over(Window.orderBy(F.col("time").asc())))
df = df.withColumn("cml_n_false", F.sum((F.col("value") == False).cast("int")).over(Window.orderBy(F.col("time").asc())))
df.show()
timevaluecml_n_truecml_n_false
0false01
1false02
2true12
3false13
4true23
5true33

To convert Unix time stamp to human readable format

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("timestamp", T.LongType(), True),
    ]
)
data = [(1703224755,),
        (1703285602,)]
df = spark.createDataFrame(schema=schema, data=data)

df = df.withColumn("time_stamp_hrf", F.from_unixtime(F.col("timestamp")))
df.show()
timestamptime_stamp_hrf
17032247552023-12-22 06:59:15
17032856022023-12-22 23:53:22

Numerical operations

To find percentage of a column

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("Product", T.StringType(), True),
        T.StructField("Quantity", T.IntegerType(), True),
    ]
)
data = [("Laptop", 12),
        ("Monitor", 7),
        ("Mouse", 8),
        ("Keyboard", 9)]
df = spark.createDataFrame(schema=schema, data=data)

df = df.withColumn("%", F.round(F.col("Quantity")/F.sum("Quantity").over(Window.partitionBy())*100, 2))
dft = df.select("Product", "Quantity", "%").orderBy(F.desc("Quantity"))
dft.show()
ProductQuantity%
Laptop1233.33
Keyboard925.0
Mouse822.22
Monitor719.44

To find percentage of a column within a group using a window

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("Location", T.StringType(), True),
        T.StructField("Product", T.StringType(), True),
        T.StructField("Quantity", T.IntegerType(), True),
    ]
)
data = [("Home", "Laptop", 12),
        ("Home", "Monitor", 7),
        ("Home", "Mouse", 8),
        ("Home", "Keyboard", 9),
        ("Office", "Laptop", 23),
        ("Office", "Monitor", 10),
        ("Office", "Mouse", 9)]
df = spark.createDataFrame(schema=schema, data=data)

df = df.withColumn("%", F.round(F.col("Quantity")/F.sum("Quantity").over(Window.partitionBy("Location"))*100, 2))
dft = df.select("Location", "Product", "Quantity", "%").orderBy(F.desc("Location"), F.desc("Quantity"))
dft.show()
LocationProductQuantity%
OfficeLaptop2354.76
OfficeMonitor1023.81
OfficeMouse921.43
HomeLaptop1233.33
HomeKeyboard925.0
HomeMouse822.22
HomeMonitor719.44

To find percentage of a column within a group using .groupBy() and a join

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("Location", T.StringType(), True),
        T.StructField("Product", T.StringType(), True),
        T.StructField("Quantity", T.IntegerType(), True),
    ]
)
data = [("Home", "Laptop", 12),
        ("Home", "Monitor", 7),
        ("Home", "Mouse", 8),
        ("Home", "Keyboard", 9),
        ("Office", "Laptop", 23),
        ("Office", "Monitor", 10),
        ("Office", "Mouse", 9)]
df = spark.createDataFrame(schema=schema, data=data)

df_sum = df.groupBy("Location").agg(F.sum("Quantity").alias("Total_Quantity"))
df = df.join(df_sum, on="Location").withColumn("%", F.round(F.col("Quantity")/F.col("Total_Quantity")*100, 2))
dft = df.select("Location", "Product", "Quantity", "%").orderBy(F.desc("Location"), F.desc("Quantity"))
dft.show()
LocationProductQuantity%
OfficeLaptop2354.76
OfficeMonitor1023.81
OfficeMouse921.43
HomeLaptop1233.33
HomeKeyboard925.0
HomeMouse822.22
HomeMonitor719.44

To find maximum value of a column

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("Location", T.StringType(), True),
        T.StructField("Product", T.StringType(), True),
        T.StructField("Quantity", T.IntegerType(), True),
    ]
)
data = [("Home", "Laptop", 12),
        ("Home", "Monitor", 7),
        ("Home", "Mouse", 8),
        ("Home", "Keyboard", 9),
        ("Office", "Laptop", 23),
        ("Office", "Monitor", 10),
        ("Office", "Mouse", 9)]
df = spark.createDataFrame(schema=schema, data=data)
df.show()
max_val = df.select("Quantity").rdd.max()[0]
print(f"Maximum value of Quantity: {max_val}")
LocationProductQuantity
HomeLaptop12
HomeMonitor7
HomeMouse8
HomeKeyboard9
OfficeLaptop23
OfficeMonitor10
OfficeMouse9

Maximum value of Quantity: 23

To add a column with count of elements per group

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.window import Window
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("Location", T.StringType(), True),
        T.StructField("Product", T.StringType(), True),
        T.StructField("Quantity", T.IntegerType(), True),
    ]
)
data = [("Home", "Laptop", 12),
        ("Home", "Monitor", 7),
        ("Home", "Mouse", 8),
        ("Home", "Keyboard", 9),
        ("Office", "Laptop", 23),
        ("Office", "Monitor", 10),
        ("Office", "Mouse", 9)]
df = spark.createDataFrame(schema=schema, data=data)
df = df.withColumn("count_per_group", F.count(F.lit(1)).over(Window.partitionBy(F.col("Location"))))
df.show()
LocationProductQuantitycount_per_group
HomeLaptop124
HomeMonitor74
HomeMouse84
HomeKeyboard94
OfficeLaptop233
OfficeMonitor103
OfficeMouse93

Dataframe join operations

To perform a full, outer, left, right join operations

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql.window import Window
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("Name", T.StringType(), True),
        T.StructField("Score", T.IntegerType(), True),
    ]
)
data = [("Alice", 10),
        ("Bob", 11)
        ]
df_a = spark.createDataFrame(schema=schema, data=data)
print("Table A:")
df_a.show()

schema = T.StructType(
    [
        T.StructField("Name", T.StringType(), True),
        T.StructField("Surname", T.StringType(), True),
        T.StructField("Age", T.StringType(), True),
    ]
)
data = [("Alice", "Doe", 12),
        ("Alice", "Smith", 30),
        ("Jane", "Carter", 7),
]
df_b = spark.createDataFrame(schema=schema, data=data)
print("Table B:")
df_b.show()

df = df_a.join(df_b, on="Name", how="full")
print("Full join on 'Name':")
df.show()

df = df_a.join(df_b, on="Name", how="outer")
print("Outer join on 'Name':")
df.show()

df = df_a.join(df_b, df_a["Name"] == df_b["Name"])
print("Join on 'Name' on equal condition:")
df.show()

df = df_a.join(df_b, on="Name", how="inner")
print("Inner join on 'Name':")
df.show()

df = df_a.join(df_b, on="Name", how="left")
print("Left join on 'Name':")
df.show()

df = df_a.join(df_b, on="Name", how="left_outer")
print("Left-outer join on 'Name':")
df.show()

df = df_a.join(df_b, on="Name", how="left_anti")
print("Left-anti join on 'Name':")
df.show()

df = df_a.join(df_b, on="Name", how="left_semi")
print("Left-semi join on 'Name':")
df.show()

df = df_a.join(df_b, on="Name", how="right")
print("Right join on 'Name':")
df.show()

df = df_a.join(df_b, on="Name", how="right_outer")
print("Right-outer join on 'Name':")
df.show()
NameScore
Alice10
Bob11

Table B:

NameSurnameAge
AliceDoe12
AliceSmith30
JaneCarter7

Full join on ‘Name’:

NameScoreSurnameAge
Alice10Doe12
Alice10Smith30
Bob11nullnull
JanenullCarter7

Outer join on ‘Name’:

NameScoreSurnameAge
Alice10Doe12
Alice10Smith30
Bob11nullnull
JanenullCarter7

Join on ‘Name’ on equal condition:

NameScoreNameSurnameAge
Alice10AliceDoe12
Alice10AliceSmith30

Inner join on ‘Name’:

NameScoreSurnameAge
Alice10Doe12
Alice10Smith30

Left join on ‘Name’:

NameScoreSurnameAge
Bob11nullnull
Alice10Smith30
Alice10Doe12

Left-outer join on ‘Name’:

NameScoreSurnameAge
Bob11nullnull
Alice10Smith30
Alice10Doe12

Left-anti join on ‘Name’:

NameScore
Bob11

Left-semi join on ‘Name’:

NameScore
Alice10

Right join on ‘Name’:

NameScoreSurnameAge
Alice10Doe12
Alice10Smith30
JanenullCarter7

Right-outer join on ‘Name’:

NameScoreSurnameAge
Alice10Doe12
Alice10Smith30
JanenullCarter7

To drop one of the duplicate columns after join

from pyspark.sql import Row
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
df_a = spark.createDataFrame([
  Row(id=1, value="A1"),
  Row(id=1, value="B1"),
  Row(id=1, value="C1"),
  Row(id=2, value="A1"),
  Row(id=2, value="X1"),
  Row(id=2, value="Y1")]
)
print("Dataframe <<tld>>df_a<<tld>>:")
df_1.show()

df_b = spark.createDataFrame([
  Row(id=1, updated="A2"),
  Row(id=1, updated="B1"),
  Row(id=1, updated="C1"),
  Row(id=2, updated="A1"),
  Row(id=2, updated="X1"),
  Row(id=2, updated="Y1")]
)
print("Dataframe <<tld>>df_b<<tld>>:")
df_2.show()

df = df_a.join(df_b, on=[df_a["id"] == df_b["id"], df_a["value"] == df_b["updated"]], how="full")
print("Full join on <<tld>>df_a['value'] == df_b['updated']<<tld>>:")
df.show()

df = df_a.join(df_b, on=[df_a["id"] == df_b["id"], df_a["value"] == df_b["updated"]], how="full").drop(df_b["id"])
print("Full join on <<tld>>df_a['value'] == df_b['updated']<<tld>> with dropped <<tld>>df_b['id']<<tld>> column:")
df.show()
idvalue
1A1
1B1
1C1
2A1
2X1
2Y1

Dataframe df_b:

idupdated
1A2
1B1
1C1
2A1
2X1
2Y1

Full join on df_a['value'] == df_b['updated']:

idvalueidupdated
1.0A1nullnull
nullnull1.0A2
1.0B11.0B1
1.0C11.0C1
2.0A12.0A1
2.0X12.0X1
2.0Y12.0Y1

Full join on df_a['value'] == df_b['updated'] with dropped df_b['id'] column:

idvalueupdated
1.0A1null
nullnullA2
1.0B1B1
1.0C1C1
2.0A1A1
2.0X1X1
2.0Y1Y1

Aggregation and maps

To group by and aggregate into a map using F.map_from_entries()

import pyspark.sql.functions as F
from pyspark.sql import Row
from pyspark.sql.window import Window
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
df = spark.createDataFrame([
  Row(id=1, key='a', value="A1"),
  Row(id=1, key='b', value="B1"),
  Row(id=1, key='c', value="C1"),
  Row(id=2, key='a', value="A1"),
  Row(id=2, key='x', value="X1"),
  Row(id=2, key='y', value="Y1")]
)

print("Dataframe with keys and values:")
df.show(truncate=False)
dft = df.groupBy("id").agg(F.map_from_entries(F.collect_list(
          F.struct("key", "value"))).alias("key_value")
)
print("Dataframe with key -> value mapping")
dft.show(truncate=False)
<<print_schema("dft")>>dft.printSchema()
idkeyvalue
1aA1
1bB1
1cC1
2aA1
2xX1
2yY1

Dataframe with key -> value mapping

idkey_value
1{a -> A1, b -> B1, c -> C1}
2{a -> A1, x -> X1, y -> Y1}

Schema of dft is:

root
 |-- id: long (nullable = true)
 |-- key_value: map (nullable = false)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

To group by and aggregate into a map using UDF

import pyspark.sql.functions as F
from pyspark.sql import Row
from pyspark.sql import SparkSession
import pyspark.sql.types as T

spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
df = spark.createDataFrame([
  Row(id=1, key='a', value="A1"),
  Row(id=1, key='b', value="B1"),
  Row(id=1, key='c', value="C1"),
  Row(id=2, key='a', value="A1"),
  Row(id=2, key='x', value="X1"),
  Row(id=2, key='y', value="Y1")]
)

print("Dataframe with keys and values:")
df.show()

@F.udf(returnType=T.MapType(T.StringType(), T.StringType()))
def map_array(column):
    return dict(column)

dft = (df.groupBy("id")
   .agg(F.collect_list(F.struct("key", "value")).alias("key_value"))
   .withColumn('key_value', map_array('key_value')))
print("Dataframe with keys and values:")
dft.show(truncate=False)
<<print_schema("dft")>>dft.printSchema()
idkeyvalue
1aA1
1bB1
1cC1
2aA1
2xX1
2yY1

Dataframe with keys and values:

idkey_value
1{a -> A1, b -> B1, c -> C1}
2{x -> X1, a -> A1, y -> Y1}

Schema of dft is:

root
 |-- id: long (nullable = true)
 |-- key_value: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)

To agregate over multiple columns and sum values of dictionaries

import pyspark.sql.types as T
import pyspark.sql.functions as F
from pyspark.sql import SparkSession

df_schema = T.StructType([T.StructField('clid', T.StringType(), True),
                        T.StructField('coef_1', T.MapType(T.StringType(), T.DoubleType(), True), False),
                        T.StructField('coef_2', T.MapType(T.StringType(), T.DoubleType(), True), False),
                        T.StructField('coef_3', T.MapType(T.StringType(), T.DoubleType(), True), False)])
df_data = [["X", {'B': 0.4, 'C': 0.4}, {'B': 0.33, 'C': 0.5}, {'A': 0.5, 'C': 0.33}],
           ["Y", {'B': 0.67, 'C': 0.33}, {'B': 0.85}, {'A': 0.4, 'C': 0.57}],
           ]
spark = SparkSession.builder\
        .appName("Parse DataFrame Schema")\
        .getOrCreate()
df = spark.createDataFrame(data=df_data, schema=df_schema)

df = df.withColumn("coef_total", F.col("coef_1"))
for i in range(2,4):
    df = df.withColumn("coef_total", F.map_zip_with("coef_total", f"coef_{i}",
                      lambda k, v1, v2: F.when(v1.isNull(), 0).otherwise(v1) + F.when(v2.isNull(), 0).otherwise(v2)))
df.show(truncate=False)
clidcoef_1coef_2coef_3coef_total
X{B -> 0.4, C -> 0.4}{B -> 0.33, C -> 0.5}{A -> 0.5, C -> 0.33}{B -> 0.73, C -> 1.23, A -> 0.5}
Y{B -> 0.67, C -> 0.33}{B -> 0.85}{A -> 0.4, C -> 0.57}{B -> 1.52, C -> 0.8999999999999999, A -> 0.4}

Sampling rows

To sample rows

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("index", T.IntegerType(), True),
        T.StructField("value", T.StringType(), True),
    ]
)
data = [(1, "Home"),
        (2, "School"),
        (3, "Home"),
        (4, "Home"),
        (5, "Office"),
        (6, "Office"),
        (7, "Office"),
        (8, "Mall"),
        (9, "Mall"),
        (10, "School")]
df = spark.createDataFrame(schema=schema, data=data).repartition(3)
df = df.withColumn("partition", F.spark_partition_id()).orderBy("index")
print("Original dataframe:")
df.show()

print("Sampled dataframe:")
dft = df.sample(fraction=0.5, seed=1).orderBy("index")
dft.show()
indexvaluepartition
1Home1
2School0
3Home0
4Home2
5Office2
6Office2
7Office1
8Mall0
9Mall1
10School0

Sampled dataframe:

indexvaluepartition
3Home0
4Home2
7Office1
8Mall0
9Mall1

UUID generation

To generate a UUID for every row

import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
import random
import uuid

spark = SparkSession.builder.master("local").appName("test-app").getOrCreate()
schema = T.StructType(
    [
        T.StructField("Name", T.StringType(), True),
    ]
)
data = [["Alice"],
        ["Bon"],
        ["John"],
        ["Cecile"]
        ]
df = spark.createDataFrame(schema=schema, data=data).repartition(2)

def _generate_uuid(uuid_gen, v=10):
    def _replace_byte(value: int, byte: int):
        byte = byte & 0xF
        bit_shift = 76
        mask = ~(0xF << bit_shift)
        return value & mask | (byte << bit_shift)

    uuid_ = uuid_gen.generate()
    return uuid.UUID(int=(_replace_byte(uuid_.int, v)))

class RandomDistributedUUIDGenerator:
    def generate(self):
        return uuid.uuid4()

class SeedBasedUUIDGenerator:
    def __init__(self, seed):
        self.rnd = random.Random(seed)

    def generate(self):
        return uuid.UUID(int=self.rnd.getrandbits(128), version=4)

gen = RandomDistributedUUIDGenerator()
udf_generate_uuid = F.udf(lambda: _generate_uuid(gen).__str__(), T.StringType())
df = df.withColumn("UUID_random_distributed", udf_generate_uuid())

seed_for_rng = 1
gen = SeedBasedUUIDGenerator(seed_for_rng)
udf_generate_uuid = F.udf(lambda: _generate_uuid(gen).__str__(), T.StringType())
df = df.withColumn("UUID_seed_based", udf_generate_uuid())

print("The dataframe resides in two partitions. Seed-based random UUID generator uses the same seed on both partitions, yielding identical values.")
df.show(truncate=False)
NameUUID_random_distributedUUID_seed_based
John4e9a3bb1-a189-a25e-8389-7f8382635b09cd613e30-d8f1-aadf-91b7-584a2265b1f5
Bon16cd1549-0c74-a483-9bbe-707e59e0796f1e2feb89-414c-a43c-9027-c4d1c386bbc4
Cecileb8b05619-6004-aa75-b98b-7e1c83c9f301cd613e30-d8f1-aadf-91b7-584a2265b1f5
Aliceb1f1a9fb-feb9-a946-9171-3e7cb577fdaa1e2feb89-414c-a43c-9027-c4d1c386bbc4