To create an empty dataframe
import pyspark .sql .types as T
from pyspark .sql import SparkSession
spark = SparkSession .builder .master ("local[1]" ).appName ("test-app" ).getOrCreate ()
schema = T .StructType (
[
T .StructField ("A" , T .ArrayType (T .StringType ()), True ),
T .StructField ("B" , T .ArrayType (T .StringType ()), True ),
]
)
data = []
df = spark .createDataFrame (schema = schema , data = data )
df .show ()
To create a dataframe with columns key and value from a dictionary
from pyspark .sql import SparkSession
import pyspark .sql .functions as F
spark = SparkSession .builder .master ("local" ).appName ("test-app" ).getOrCreate ()
dict_a = {"key1" : "value1" , "key2" : "value2" }
values = [(k , v ) for k , v in dict_a .items ()]
columns = ["key" , "value" ]
df = spark .createDataFrame (values , columns )
df .show ()
key value
key1 value1
key2 value2
from pyspark .sql import SparkSession
import pyspark .sql .functions as F
spark = SparkSession .builder .master ("local" ).appName ("test-app" ).getOrCreate ()
dict_a = {"key1" : "value1" , "key2" : "value2" }
values = [(k , v ) for k , v in dict_a .items ()]
columns = ["key" , "value" ]
df = spark .createDataFrame (values , columns )
df = df .withColumn ("value_dup" , F .col ("value" ))
df .show ()
key value value_dup
key1 value1 value1
key2 value2 value2
To rename a column using .withColumnRenamed()
from pyspark .sql import SparkSession
import pyspark .sql .functions as F
spark = SparkSession .builder .master ("local" ).appName ("test-app" ).getOrCreate ()
dict_a = {"key1" : "value1" , "key2" : "value2" }
values = [(k , v ) for k , v in dict_a .items ()]
columns = ["key" , "value" ]
df = spark .createDataFrame (values , columns )
print ("Original dataframe:" )
df .show ()
df = df .withColumnRenamed ("key" , "new_key" ) \
.withColumnRenamed ("value" ,"new_value" )
print ("Modified dataframe:" )
df .show ()
key value
key1 value1
key2 value2
Modified dataframe:
new_key new_value
key1 value1
key2 value2
To rename a column using .withColumnsRenamed()
from pyspark .sql import SparkSession
import pyspark .sql .functions as F
spark = SparkSession .builder .master ("local" ).appName ("test-app" ).getOrCreate ()
dict_a = {"key1" : "value1" , "key2" : "value2" }
values = [(k , v ) for k , v in dict_a .items ()]
columns = ["key" , "value" ]
df = spark .createDataFrame (values , columns )
print ("Original dataframe:" )
df .show ()
df = df .withColumnsRenamed ({"key" : "new_key" , "value" : "new_value" })
print ("Modified dataframe:" )
df .show ()
key value
key1 value1
key2 value2
Modified dataframe:
new_key new_value
key1 value1
key2 value2
To rename a column using .select()
from pyspark .sql import SparkSession
import pyspark .sql .functions as F
spark = SparkSession .builder .master ("local" ).appName ("test-app" ).getOrCreate ()
dict_a = {"key1" : "value1" , "key2" : "value2" }
values = [(k , v ) for k , v in dict_a .items ()]
columns = ["key" , "value" ]
df = spark .createDataFrame (values , columns )
print ("Original dataframe:" )
df .show ()
df = df .select (F .col ("key" ).alias ("new_key" ), F .col ("value" ).alias ("new_value" ))
print ("Modified dataframe:" )
df .show ()
key value
key1 value1
key2 value2
Modified dataframe:
new_key new_value
key1 value1
key2 value2
To rename columns by adding a prefix
import pyspark .sql .functions as F
import pyspark .sql .types as T
from pyspark .sql import SparkSession
spark = SparkSession .builder .master ("local[1]" ).appName ("test-app" ).getOrCreate ()
schema = T .StructType (
[
T .StructField ("index" , T .IntegerType (), True ),
T .StructField ("value" , T .StringType (), True ),
]
)
data = [(1 , "Home" ),
(2 , "School" ),
(3 , "Home" ),]
df = spark .createDataFrame (schema = schema , data = data )
print ("Original dataframe:" )
df .show ()
print ("Dataframe with renamed columns:" )
df = df .select (* [F .col (k ).alias (f"prefix_{ k } " ) for k in df .columns ])
df .show ()
index value
1 Home
2 School
3 Home
Dataframe with renamed columns:
prefix_index prefix_value
1 Home
2 School
3 Home
To drop columns from a dataframe
from pyspark .sql import SparkSession
import pyspark .sql .functions as F
spark = SparkSession .builder .master ("local" ).appName ("test-app" ).getOrCreate ()
dict_a = {"key1" : "value1" , "key2" : "value2" }
values = [(k , v ) for k , v in dict_a .items ()]
columns = ["key" , "value" ]
df = spark .createDataFrame (values , columns )
df = df .withColumn ("const" , F .lit (1 ))
print ("Original dataframe:" )
df .show ()
df = df .drop ("value" , "const" )
print ("Modified dataframe:" )
df .show ()
key value const
key1 value1 1
key2 value2 1
Modified dataframe:
To subset columns of a dataframe
from pyspark .sql import SparkSession
import pyspark .sql .functions as F
spark = SparkSession .builder .master ("local" ).appName ("test-app" ).getOrCreate ()
dict_a = {"key1" : "value1" , "key2" : "value2" }
values = [(k , v ) for k , v in dict_a .items ()]
columns = ["key" , "value" ]
df = spark .createDataFrame (values , columns )
df = df .withColumn ("const" , F .lit (1 ))
print ("Original dataframe:" )
df .show ()
print ("Subset 'key', 'value' columns:" )
df ["key" , "value" ].show ()
print ("Subset 'key', 'const' columns:" )
df .select ("key" , "const" ).show ()
key value const
key1 value1 1
key2 value2 1
Subset ‘key’, ‘value’ columns:
key value
key1 value1
key2 value2
Subset ‘key’, ‘const’ columns:
To add a column with a constant value using F.lit()
from pyspark .sql import SparkSession
import pyspark .sql .functions as F
spark = SparkSession .builder .master ("local" ).appName ("test-app" ).getOrCreate ()
dict_a = {"key1" : "value1" , "key2" : "value2" }
values = [(k , v ) for k , v in dict_a .items ()]
columns = ["key" , "value" ]
df = spark .createDataFrame (values , columns )
print ("Original dataframe:" )
df .show ()
df = df .withColumn ("const_integer" , F .lit (1 ))
df = df .withColumn ("const_string" , F .lit ("string" ))
print ("Modified dataframe:" )
df .show ()
key value
key1 value1
key2 value2
Modified dataframe:
key value const_integer const_string
key1 value1 1 string
key2 value2 1 string
To add a column with a constant value using .select()
from pyspark .sql import SparkSession
import pyspark .sql .functions as F
spark = SparkSession .builder .master ("local" ).appName ("test-app" ).getOrCreate ()
dict_a = {"key1" : "value1" , "key2" : "value2" }
values = [(k , v ) for k , v in dict_a .items ()]
columns = ["key" , "value" ]
df = spark .createDataFrame (values , columns )
print ("Original dataframe:" )
df .show ()
df = df .select ("key" , "value" , F .lit ("const_str" ).alias ("constant_value" ))
print ("Modified dataframe:" )
df .show ()
key value
key1 value1
key2 value2
Modified dataframe:
key value constant_value
key1 value1 const_str
key2 value2 const_str
To create a dataframe from a list of tuples
from pyspark .sql import SparkSession
import pyspark .sql .functions as F
spark = SparkSession .builder .master ("local" ).appName ("test-app" ).getOrCreate ()
values = [(1 , ["A" , "B" ]), (2 , ["C" , "D" ]), (3 , ["E" , "F" ])]
columns = ["integer" , "characters" ]
df = spark .createDataFrame (values , columns )
df .show ()
integer characters
1 [A, B]
2 [C, D]
3 [E, F]
To get the number of rows of a dataframe
from pyspark .sql import SparkSession
import pyspark .sql .functions as F
spark = SparkSession .builder .master ("local" ).appName ("test-app" ).getOrCreate ()
values = [(1 , ["A" , "B" ]), (2 , ["C" , "D" ]), (3 , ["E" , "F" ])]
columns = ["integer" , "characters" ]
df = spark .createDataFrame (values , columns )
df .show ()
num_rows = df .count ()
print (f"df has { num_rows } rows" )
integer characters
1 [A, B]
2 [C, D]
3 [E, F]
df has 3 rows
from pyspark .sql import SparkSession
import pyspark .sql .functions as F
spark = SparkSession .builder .master ("local" ).appName ("test-app" ).getOrCreate ()
values = [(1 , ["A" , "B" ]), (2 , ["C" , "D" ]), (3 , ["E" , "F" ])]
columns = ["integer" , "characters" ]
df = spark .createDataFrame (values , columns )
df .show ()
print ("These are first 2 rows:" )
df .limit (2 ).show ()
integer characters
1 [A, B]
2 [C, D]
3 [E, F]
These are first 2 rows:
integer characters
1 [A, B]
2 [C, D]
import pyspark .sql .functions as F
import pyspark .sql .types as T
from pyspark .sql import SparkSession
spark = SparkSession .builder .master ("local[1]" ).appName ("test-app" ).getOrCreate ()
schema = T .StructType (
[
T .StructField ("key" , T .IntegerType (), True ),
T .StructField ("value" , T .StringType (), True ),
T .StructField ("comment" , T .StringType (), True ),
]
)
data = [(1 , "Home" , "a house" ),
(1 , "Home" , "a house" ),
(2 , "School" , "a building" ),
(2 , "School" , "a house" ),
(3 , "Home" , "a house" ),]
df = spark .createDataFrame (schema = schema , data = data )
print ("Original dataframe:" )
df .show ()
print ("Dataframe with distinct rows:" )
df .distinct ().show ()
print ("Dataframe with dropped duplicate rows:" )
df .dropDuplicates ().show ()
print ("Dataframe with dropped duplicates in columns 'key' and 'value':" )
df = df .dropDuplicates (subset = ["key" , "value" ])
df .show ()
key value comment
1 Home a house
1 Home a house
2 School a building
2 School a house
3 Home a house
Dataframe with distinct rows:
key value comment
2 School a house
3 Home a house
2 School a building
1 Home a house
Dataframe with dropped duplicate rows:
key value comment
2 School a house
3 Home a house
2 School a building
1 Home a house
Dataframe with dropped duplicates in columns ‘key’ and ‘value’:
key value comment
1 Home a house
2 School a building
3 Home a house
To convert a column to a list using a lambda function
from pyspark .sql import SparkSession
import pyspark .sql .functions as F
spark = SparkSession .builder .master ("local" ).appName ("test-app" ).getOrCreate ()
values = [(1 , ["A" , "B" ]), (2 , ["C" , "D" ]), (3 , ["E" , "F" ])]
columns = ["integer" , "characters" ]
df = spark .createDataFrame (values , columns )
df .show ()
lst = df .select ("integer" ).rdd .map (lambda r : r [0 ]).collect ()
print (f"Column \" integer\" has values: <<tld>>{ lst } <<tld>>" )
integer characters
1 [A, B]
2 [C, D]
3 [E, F]
Column “integer” has values: [1, 2, 3]
To convert a dataframe to a list of dictionaries corresponding to every row
import pprint
from pyspark .sql import SparkSession
import pyspark .sql .functions as F
spark = SparkSession .builder .master ("local" ).appName ("test-app" ).getOrCreate ()
values = [(1 , ["A" , "B" ]), (2 , ["C" , "D" ]), (3 , ["E" , "F" ])]
columns = ["integer" , "characters" ]
df = spark .createDataFrame (values , columns )
df .show ()
lst_dict = df .rdd .map (lambda row : row .asDict ()).collect ()
print (f"Dataframe is represented as:\n " )
txt = pprint .pformat (lst_dict )<< txtblk ("txt" )>>
integer characters
1 [A, B]
2 [C, D]
3 [E, F]
Dataframe is represented as:
[{'characters': ['A', 'B'], 'integer': 1},
{'characters': ['C', 'D'], 'integer': 2},
{'characters': ['E', 'F'], 'integer': 3}]
To convert a column to a list using list comprehension
from pyspark .sql import SparkSession
import pyspark .sql .functions as F
spark = SparkSession .builder .master ("local" ).appName ("test-app" ).getOrCreate ()
values = [(1 , ["A" , "B" ]), (2 , ["C" , "D" ]), (3 , ["E" , "F" ])]
columns = ["integer" , "characters" ]
df = spark .createDataFrame (values , columns )
df .show ()
lst = [k ["integer" ] for k in df .select ("integer" ).rdd .collect ()]
print (f"Column \" integer\" has values: <<tld>>{ lst } <<tld>>" )
integer characters
1 [A, B]
2 [C, D]
3 [E, F]
Column “integer” has values: [1, 2, 3]
To convert a column to a list using Pandas
from pyspark .sql import SparkSession
import pyspark .sql .functions as F
spark = SparkSession .builder .master ("local" ).appName ("test-app" ).getOrCreate ()
values = [(1 , ["A" , "B" ]), (2 , ["C" , "D" ]), (3 , ["E" , "F" ])]
columns = ["integer" , "characters" ]
df = spark .createDataFrame (values , columns )
df .show ()
lst = df .select ("integer" ).toPandas ()["integer" ].tolist ()
print (f"Column \" integer\" has values: <<tld>>{ lst } <<tld>>" )
integer characters
1 [A, B]
2 [C, D]
3 [E, F]
Column “integer” has values: [1, 2, 3]
To display full width of a column (do not truncate)
import pyspark .sql .functions as F
import pyspark .sql .types as T
from pyspark .sql import SparkSession
spark = SparkSession .builder .master ("local[1]" ).appName ("test-app" ).getOrCreate ()
schema = T .StructType (
[
T .StructField ("sentence" , T .ArrayType (T .StringType ()), True ),
]
)
data = [(["A" , "very" , "long" , "sentence" ],),
(["with" , "many" , "words" , "." ],)]
df = spark .createDataFrame (schema = schema , data = data )
print ("Truncated output (default behavior):" )
<< prettify_table ("df.show()" )>> df .show ()
print ("Truncated to 15 characters output:" )
<< prettify_table ("df.show(truncate\075 15)" )>> df .show (truncate = 15 )
print ("Non-truncated output (show all):" )
<< prettify_table ("df.show(truncate\u003d False)" )>> df .show (truncate = False )
sentence
[A, very, long, s…
[with, many, word…
Truncated to 15 characters output:
sentence
[A, very, lo…
[with, many,…
Non-truncated output (show all):
sentence
[A, very, long, sentence]
[with, many, words, .]
To filter based on values of a column
import pyspark .sql .functions as F
import pyspark .sql .types as T
from pyspark .sql import SparkSession
spark = SparkSession .builder .master ("local" ).appName ("test-app" ).getOrCreate ()
schema = T .StructType (
[
T .StructField ("Location" , T .StringType (), True ),
T .StructField ("Product" , T .StringType (), True ),
T .StructField ("Quantity" , T .IntegerType (), True ),
]
)
data = [("Home" , "Laptop" , 12 ),
("Home" , "Monitor" , None ),
("Home" , "Keyboard" , 9 ),
("Office" , "Laptop" , None ),
("Office" , "Monitor" , 10 ),
("Office" , "Mouse" , 9 )]
df = spark .createDataFrame (schema = schema , data = data )
print ("Original dataframe:" )
df .show ()
print ('Filter: <<tld>>F.col("Location" == "Home")<<tld>>' )
dft = df .filter (F .col ("Location" ) == "Home" )
dft .show ()
print ('Filter: <<tld>>F.col("Quantity").isNull()<<tld>>' )
dft = df .filter (F .col ("Quantity" ).isNull ())
dft .show ()
print ('Filter: <<tld>>F.col("Quantity").isNotNull()<<tld>>' )
dft = df .filter (F .col ("Quantity" ).isNotNull ())
dft .show ()
print ('Filter: <<tld>>(F.col("Location") == "Home") & (F.col("Product") == "Laptop"))<<tld>>' )
dft = df .filter ((F .col ("Location" ) == "Home" ) & (F .col ("Product" ) == "Laptop" ))
dft .show ()
print ('Filter: <<tld>>(F.col("Location") == "Home") & !(F.col("Product") == "Laptop"))<<tld>>' )
dft = df .filter ((F .col ("Location" ) == "Home" ) & ~ (F .col ("Product" ) == "Laptop" ))
dft .show ()
print ('Filter: <<tld>>(F.col("Product") == "Laptop") | (F.col("Product") == "Mouse"))<<tld>>' )
dft = df .filter ((F .col ("Product" ) == "Laptop" ) | (F .col ("Product" ) == "Mouse" ))
dft .show ()
print ('Filter: <<tld>>F.col("Product").isin(["Laptop", "Mouse"])<<tld>>' )
dft = df .filter (F .col ("Product" ).isin (["Laptop" , "Mouse" ]))
dft .show ()
Location Product Quantity
Home Laptop 12
Home Monitor null
Home Keyboard 9
Office Laptop null
Office Monitor 10
Office Mouse 9
Filter: F.col("Location" == "Home")
Location Product Quantity
Home Laptop 12
Home Monitor null
Home Keyboard 9
Filter: F.col("Quantity").isNull()
Location Product Quantity
Home Monitor null
Office Laptop null
Filter: F.col("Quantity").isNotNull()
Location Product Quantity
Home Laptop 12
Home Keyboard 9
Office Monitor 10
Office Mouse 9
Filter: (F.col("Location") == "Home") & (F.col("Product") == "Laptop"))
Location Product Quantity
Home Laptop 12
Filter: (F.col("Location") == "Home") & !(F.col("Product") == "Laptop"))
Location Product Quantity
Home Monitor null
Home Keyboard 9
Filter: (F.col("Product") == "Laptop") | (F.col("Product") == "Mouse"))
Location Product Quantity
Home Laptop 12
Office Laptop null
Office Mouse 9
Filter: F.col("Product").isin(["Laptop", "Mouse"])
Location Product Quantity
Home Laptop 12
Office Laptop null
Office Mouse 9
To create arrays of different lengths
import pyspark .sql .functions as F
import pyspark .sql .types as T
from pyspark .sql import SparkSession
spark = SparkSession .builder .master ("local" ).appName ("test-app" ).getOrCreate ()
schema = T .StructType (
[
T .StructField ("A" , T .ArrayType (T .IntegerType ()), True ),
T .StructField ("B" , T .ArrayType (T .IntegerType ()), True ),
]
)
data = [([1 , 2 ], [2 , 3 , 4 , 5 ]),
([4 , 5 , 6 ], [2 , 3 , 4 , 5 ])]
df = spark .createDataFrame (schema = schema , data = data )
dft = df .select ("A" , "B" )
dft .show ()
A B
[1, 2] [2, 3, 4, 5]
[4, 5, 6] [2, 3, 4, 5]
To calculate set difference
import pyspark .sql .functions as F
import pyspark .sql .types as T
from pyspark .sql import SparkSession
spark = SparkSession .builder .master ("local[1]" ).appName ("test-app" ).getOrCreate ()
schema = T .StructType (
[
T .StructField ("A" , T .ArrayType (T .StringType ()), True ),
T .StructField ("B" , T .ArrayType (T .StringType ()), True ),
]
)
data = [(["b" , "a" , "c" ], ["c" , "d" , "a" , "f" ])]
df = spark .createDataFrame (schema = schema , data = data )
dft = df .select ("A" , "B" ,
F .array_except ("A" , "B" ).alias ("A\B" ),
F .array_except ("B" , "A" ).alias ("B\A" ))
dft .show ()
A B A\B B\A
[b, a, c] [c, d, a, f] [b] [d, f]
import pyspark .sql .functions as F
import pyspark .sql .types as T
from pyspark .sql import SparkSession
spark = SparkSession .builder .master ("local[1]" ).appName ("test-app" ).getOrCreate ()
schema = T .StructType (
[
T .StructField ("A" , T .ArrayType (T .StringType ()), True ),
T .StructField ("B" , T .ArrayType (T .StringType ()), True ),
]
)
data = [(["b" , "a" , "c" ], ["c" , "d" , "a" , "f" ])]
df = spark .createDataFrame (schema = schema , data = data )
dft = df .select ("A" , "B" ,
F .array_union ("A" , "B" ).alias ("A U B" ))
dft .show ()
A B A U B
[b, a, c] [c, d, a, f] [b, a, c, d, f]
To calculate set intersection
import pyspark .sql .functions as F
import pyspark .sql .types as T
from pyspark .sql import SparkSession
spark = SparkSession .builder .master ("local[1]" ).appName ("test-app" ).getOrCreate ()
schema = T .StructType (
[
T .StructField ("A" , T .ArrayType (T .StringType ()), True ),
T .StructField ("B" , T .ArrayType (T .StringType ()), True ),
]
)
data = [(["b" , "a" , "c" ], ["c" , "d" , "a" , "f" ])]
df = spark .createDataFrame (schema = schema , data = data )
dft = df .select ("A" , "B" , F .array_intersect ("A" , "B" ).alias ("A \u2229 B" ))
dft .show ()
A B A ∩ B
[b, a, c] [c, d, a, f] [a, c]
import pyspark .sql .functions as F
import pyspark .sql .types as T
from pyspark .sql import SparkSession
spark = SparkSession .builder .master ("local" ).appName ("test-app" ).getOrCreate ()
schema = T .StructType (
[
T .StructField ("A" , T .ArrayType (T .IntegerType ()), True ),
T .StructField ("B" , T .ArrayType (T .IntegerType ()), True ),
]
)
data = [([1 , 2 ], [2 , 3 , 4 , 5 ]),
([4 , 5 , 6 ], [2 , 3 , 4 , 5 ])]
df = spark .createDataFrame (schema = schema , data = data )
n = 4
fill_value = 0
df1 = df .withColumn ("A_padding" , F .expr (f"array_repeat({ fill_value } , { n } - size(A))" ))
df1 = df1 .withColumn ("A_padded" , F .concat ("A" , "A_padding" ))
dft = df1 .select ("A" , "A_padding" , "A_padded" )
dft .show ()
df2 = df .withColumn ("A_padding" , F .array_repeat (F .lit (fill_value ), F .lit (n ) - F .size ("A" )))
df2 = df2 .withColumn ("A_padded" , F .concat ("A" , "A_padding" ))
dft = df2 .select ("A" , "A_padding" , "A_padded" )
dft .show ()
A A_padding A_padded
[1, 2] [0, 0] [1, 2, 0, 0]
[4, 5, 6] [0] [4, 5, 6, 0]
A A_padding A_padded
[1, 2] [0, 0] [1, 2, 0, 0]
[4, 5, 6] [0] [4, 5, 6, 0]
To sum two arrays elementwise using F.element_at()
import pyspark .sql .functions as F
import pyspark .sql .types as T
from pyspark .sql import SparkSession
spark = SparkSession .builder .master ("local" ).appName ("test-app" ).getOrCreate ()
schema = T .StructType (
[
T .StructField ("A" , T .ArrayType (T .IntegerType ()), True ),
T .StructField ("B" , T .ArrayType (T .IntegerType ()), True ),
]
)
data = [([1 , 2 ], [2 , 3 , 4 , 5 ]),
([4 , 5 , 6 ], [2 , 3 , 4 , 5 ])]
df = spark .createDataFrame (schema = schema , data = data )
df = df .withColumn ("A_padding" , F .array_repeat (F .lit (fill_value ), F .lit (n ) - F .size ("A" )))
df = df .withColumn ("A_padded" , F .concat ("A" , "A_padding" ))
df = df .withColumn ("AB_sum" , F .expr ('transform(A_padded, (element, index) -> element + element_at(B, index + 1))' ))
dft = df .select ("A" , "A_padded" , "B" , "AB_sum" )
dft .show ()
A A_padded B AB_sum
[1, 2] [1, 2, 0, 0] [2, 3, 4, 5] [3, 5, 4, 5]
[4, 5, 6] [4, 5, 6, 0] [2, 3, 4, 5] [6, 8, 10, 5]
To sum two arrays using F.arrays_zip()
import pyspark .sql .functions as F
import pyspark .sql .types as T
from pyspark .sql import SparkSession
spark = SparkSession .builder .master ("local" ).appName ("test-app" ).getOrCreate ()
schema = T .StructType (
[
T .StructField ("A" , T .ArrayType (T .IntegerType ()), True ),
T .StructField ("B" , T .ArrayType (T .IntegerType ()), True ),
]
)
data = [([1 , 2 ], [2 , 3 , 4 , 5 ]),
([4 , 5 , 6 ], [2 , 3 , 4 , 5 ])]
df = spark .createDataFrame (schema = schema , data = data )
df = df .withColumn ("A_padding" , F .array_repeat (F .lit (fill_value ), F .lit (n ) - F .size ("A" )))
df = df .withColumn ("A_padded" , F .concat ("A" , "A_padding" ))
df = df .withColumn ("AB_sum" , F .expr ("transform(arrays_zip(A_padded, B), x -> x.A_padded + x.B)" ))
dft = df .select ("A" , "A_padded" , "B" , "AB_sum" )
dft .show ()
A A_padded B AB_sum
[1, 2] [1, 2, 0, 0] [2, 3, 4, 5] [3, 5, 4, 5]
[4, 5, 6] [4, 5, 6, 0] [2, 3, 4, 5] [6, 8, 10, 5]
To find mode of an array (most common element)
from collections import Counter
import pyspark .sql .functions as F
import pyspark .sql .types as T
from pyspark .sql import SparkSession
spark = SparkSession .builder .master ("local" ).appName ("test-app" ).getOrCreate ()
schema = T .StructType (
[
T .StructField ("A" , T .ArrayType (T .IntegerType ()), True ),
]
)
data = [([1 , 2 , 2 , 4 ],),
([4 , 5 , 6 , 7 ],),
([1 , 1 , 2 , 2 ],)]
df = spark .createDataFrame (schema = schema , data = data )
@F .udf
def udf_mode (x ):
return Counter (x ).most_common (1 )[0 ][0 ]
dft = df .withColumn ("mode" , udf_mode ("A" ))
<< print_schema ("dft" )>> dft .printSchema ()
dft .show ()
A mode
[1, 2, 2, 4] 2
[4, 5, 6, 7] 4
[1, 1, 2, 2] 1
To calculate difference of two consecutive elements in an array
import numpy as np
import pyspark .sql .functions as F
import pyspark .sql .types as T
from pyspark .sql import SparkSession
spark = SparkSession .builder .master ("local" ).appName ("test-app" ).getOrCreate ()
schema = T .StructType (
[
T .StructField ("id" , T .StringType (), True ),
T .StructField ("values" , T .ArrayType (T .IntegerType ()), True ),
]
)
data = [("A" , [4 , 1 , 0 , 2 ]),
("B" , [1 , 0 , 3 , 1 ])]
df = spark .createDataFrame (schema = schema , data = data )
@F .udf (returnType = T .ArrayType (T .IntegerType ()))
def diff_of_two_consecutive_elements (x ):
return np .ediff1d (np .array (x )).tolist ()
df = df .withColumn ("diff" , diff_of_two_consecutive_elements (F .col ("values" )))
df .show ()
<< print_schema ("df" )>> df .printSchema ()
id values diff
A [4, 1, 0, 2] [-3, -1, 2]
B [1, 0, 3, 1] [-1, 3, -2]
Schema of df
is:
root
|-- id: string (nullable = true)
|-- values: array (nullable = true)
| |-- element: integer (containsNull = true)
|-- diff: array (nullable = true)
| |-- element: integer (containsNull = true)
To apply a function to every element of an array
import pyspark .sql .functions as F
import pyspark .sql .types as T
from pyspark .sql import SparkSession
spark = SparkSession .builder .master ("local[1]" ).appName ("test-app" ).getOrCreate ()
schema = T .StructType (
[
T .StructField ("words_with_suffixes" , T .ArrayType (T .StringType ()), True )
]
)
data = [(["pen_10" , "note_11" , "bottle_12" ],), (["apple_13" , "orange_14" , "lemon_15" ],),]
df = spark .createDataFrame (schema = schema , data = data )
df = df .withColumn ("words" , F .transform ("words_with_suffixes" , lambda x : F .split (x , "_" ).getItem (0 )))
df .show (truncate = False )
words_with_suffixes words
[pen_10, note_11, bottle_12] [pen, note, bottle]
[apple_13, orange_14, lemon_15] [apple, orange, lemon]
To deduplicate elements in an array (find unique/distinct elements)
import pyspark .sql .functions as F
import pyspark .sql .types as T
from pyspark .sql import SparkSession
spark = SparkSession .builder .master ("local[1]" ).appName ("test-app" ).getOrCreate ()
schema = T .StructType (
[
T .StructField ("words" , T .ArrayType (T .StringType ()), True )
]
)
data = [(["pen" , "note" , "pen" ],), (["apple" , "apple" , "lemon" ],),]
df = spark .createDataFrame (schema = schema , data = data )
df = df .withColumn ("unique_words" , F .array_distinct ("words" ))
df .show (truncate = False )
words unique_words
[pen, note, pen] [pen, note]
[apple, apple, lemon] [apple, lemon]
To create a map (dictionary) from two arrays (one with keys, one with values)
import pyspark .sql .functions as F
import pyspark .sql .types as T
from pyspark .sql import SparkSession
spark = SparkSession .builder .master ("local[1]" ).appName ("test-app" ).getOrCreate ()
schema = T .StructType (
[
T .StructField ("keys" , T .ArrayType (T .IntegerType ()), True ),
T .StructField ("values" , T .ArrayType (T .StringType ()), True ),
]
)
data = [([1 , 2 , 3 ], ["A" , "B" , "C" ])]
df = spark .createDataFrame (schema = schema , data = data )
df = df .withColumn ("map_kv" , F .map_from_arrays ("keys" , "values" ))
df .show (truncate = False )
keys values map_kv
[1, 2, 3] [A, B, C] {1 -> A, 2 -> B, 3 -> C}
To calculate mean of an array
import pyspark .sql .functions as F
import pyspark .sql .types as T
from pyspark .sql import SparkSession
spark = SparkSession .builder .master ("local" ).appName ("test-app" ).getOrCreate ()
schema = T .StructType (
[
T .StructField ("values" , T .ArrayType (T .IntegerType ()), True ),
]
)
data = [([1 , 2 ],),
([4 , 5 , 6 ],)]
df = spark .createDataFrame (schema = schema , data = data )
df = df .withColumn ("mean" , F .aggregate (
"values" , # column
F .lit (0 ), # initialValue
lambda acc , x : acc + x , # merge operation
lambda acc : acc / F .size (F .col ("values" )), # finish
))
df .show ()
values mean
[1, 2] 1.5
[4, 5, 6] 5.0
To find out whether an array has any negative elements
import pyspark .sql .functions as F
import pyspark .sql .types as T
from pyspark .sql import SparkSession
spark = SparkSession .builder .master ("local" ).appName ("test-app" ).getOrCreate ()
schema = T .StructType (
[
T .StructField ("values" , T .ArrayType (T .IntegerType ()), True ),
]
)
data = [([1 , - 2 ],),
([4 , 5 , 6 ],)]
df = spark .createDataFrame (schema = schema , data = data )
df = df .withColumn ("any_negative" , F .exists ("values" , lambda x : x < 0 ))
df .show ()
values any_negative
[1, -2] true
[4, 5, 6] false
To convert elements of an array to columns
import pyspark .sql .functions as F
import pyspark .sql .types as T
from pyspark .sql import SparkSession
spark = SparkSession .builder .master ("local" ).appName ("test-app" ).getOrCreate ()
schema = T .StructType (
[
T .StructField ("A" , T .ArrayType (T .IntegerType ()), True ),
]
)
data = [([1 , 2 , 3 , 4 ],),
([5 , 6 , 7 ],)]
df = spark .createDataFrame (schema = schema , data = data )
df = df .withColumn ("first" , F .col ("A" ).getItem (0 ))
dft = df .select ("A" , "first" , * [F .col ("A" ).getItem (k ).alias (f"element_{ k + 1 } " ) for k in range (1 ,4 )])
dft .show ()
A first element_2 element_3 element_4
[1, 2, 3, 4] 1 2 3 4
[5, 6, 7] 5 6 7 null
To find location of the first occurence of an element in an array
import pyspark .sql .functions as F
import pyspark .sql .types as T
import pandas as pd
from pyspark .sql import SparkSession
import numpy as np
spark = SparkSession .builder .master ("local" ).appName ("test-app" ).getOrCreate ()
schema = T .StructType (
[
T .StructField ("values" , T .ArrayType (T .IntegerType ()), True ),
]
)
data = [([1 , 7 , 5 ],),
([7 , 4 , 7 ],)]
df = spark .createDataFrame (schema = schema , data = data )
df = df .withColumn ("position" , F .array_position (F .col ("values" ), 7 ))
df .show ()
values position
[1, 7, 5] 2
[7, 4, 7] 1
To calculate moving difference of two consecutive elements in an array
import pyspark .sql .functions as F
import pyspark .sql .types as T
import pandas as pd
from pyspark .sql import SparkSession
import numpy as np
spark = SparkSession .builder .master ("local" ).appName ("test-app" ).getOrCreate ()
schema = T .StructType (
[
T .StructField ("values" , T .ArrayType (T .IntegerType ()), True ),
]
)
data = [([1 , 2 , 5 ],),
([4 , 4 , 6 ],)]
df = spark .createDataFrame (schema = schema , data = data )
@F .pandas_udf (T .ArrayType (T .IntegerType ()))
def diff2e (x : pd .Series ) -> pd .Series :
return x .apply (lambda x : (x [1 :] - x [:- 1 ]))
@F .udf (returnType = T .ArrayType (T .IntegerType ()))
def diff_of_two_consecutive_elements (x ):
return np .ediff1d (np .array (x )).tolist ()
df = df .withColumn ("diff2e" , diff2e (F .col ("values" )))
df = df .withColumn ("ediff1d" , diff_of_two_consecutive_elements (F .col ("values" )))
df .show ()
values diff2e ediff1d
[1, 2, 5] [1, 3] [1, 3]
[4, 4, 6] [0, 2] [0, 2]
import pyspark .sql .functions as F
import pyspark .sql .types as T
import pandas as pd
from pyspark .sql import SparkSession
import numpy as np
spark = SparkSession .builder .master ("local" ).appName ("test-app" ).getOrCreate ()
schema = T .StructType (
[
T .StructField ("values" , T .ArrayType (T .IntegerType ()), True ),
]
)
data = [([1 , 7 , 5 , 2 ],),
([6 , 4 , 7 , 3 ],)]
df = spark .createDataFrame (schema = schema , data = data )
df = df .withColumn ("values[1:3]" , F .slice ("values" , start = 2 , length = 2 ))
df .show ()
values values[1:3]
[1, 7, 5, 2] [7, 5]
[6, 4, 7, 3] [4, 7]
To slice an array dynamically
import pyspark .sql .functions as F
import pyspark .sql .types as T
import pandas as pd
from pyspark .sql import SparkSession
import numpy as np
spark = SparkSession .builder .master ("local" ).appName ("test-app" ).getOrCreate ()
schema = T .StructType (
[
T .StructField ("values" , T .ArrayType (T .IntegerType ()), True ),
]
)
data = [([1 , 7 , 5 ],),
([6 , 4 , 7 , 3 ],)]
df = spark .createDataFrame (schema = schema , data = data )
start_idx = 2
df = df .withColumn ("values[1:]" , F .slice ("values" , start = 2 , length = (F .size ("values" ) - F .lit (start_idx - 1 ))))
df .show ()
values values[1:]
[1, 7, 5] [7, 5]
[6, 4, 7, 3] [4, 7, 3]
To remove prefix from a string using a UDF
import pyspark .sql .functions as F
import pyspark .sql .types as T
from pyspark .sql import SparkSession
spark = SparkSession .builder .master ("local" ).appName ("test-app" ).getOrCreate ()
schema = T .StructType (
[
T .StructField ("text" , T .StringType (), True ),
]
)
data = [("id_orange" ,),
("apple" ,)]
df = spark .createDataFrame (schema = schema , data = data )
remove_prefix = F .udf (lambda x : x [3 :] if x [:3 ] == "id_" else x , T .StringType ())
df = df .withColumn ("no_prefix" , remove_prefix (F .col ("text" )))
df .show ()
text no_prefix
id_orange orange
apple apple
To split a string into letters (characters) using regex
import pyspark .sql .functions as F
import pyspark .sql .types as T
from pyspark .sql import SparkSession
spark = SparkSession .builder .master ("local" ).appName ("test-app" ).getOrCreate ()
schema = T .StructType (
[
T .StructField ("String" , T .StringType (), True )
]
)
data = [["This is" ]]
df = spark .createDataFrame (schema = schema , data = data )
dft = df .select ('String' , F .split ('String' , '(?!$)' ).alias ("Characters" ))
dft .show (truncate = False )
String Characters
This is [T, h, i, s, , i, s]
To concatenate columns with strings using a separator
import pyspark .sql .functions as F
import pyspark .sql .types as T
from pyspark .sql import SparkSession
spark = SparkSession .builder .master ("local" ).appName ("test-app" ).getOrCreate ()
schema = T .StructType (
[
T .StructField ("Str1" , T .StringType (), True ),
T .StructField ("Str2" , T .StringType (), True )
]
)
data = [("This is" , "a string" ),
("on a" , "different row" )]
df = spark .createDataFrame (schema = schema , data = data )
df = df .withColumn ("Str_Concat" , F .concat_ws ( "_" , "Str1" , "Str2" ))
df .show ()
Str1 Str2 Str_Concat
This is a string This is_a string
on a different row on a_different row
To split a string into letters (characters) using split function
import pyspark .sql .functions as F
import pyspark .sql .types as T
from pyspark .sql import SparkSession
spark = SparkSession .builder .master ("local" ).appName ("test-app" ).getOrCreate ()
schema = T .StructType (
[
T .StructField ("String" , T .StringType (), True )
]
)
data = [["This is" ]]
df = spark .createDataFrame (schema = schema , data = data )
fsplit = F .expr ("split(String, '')" )
dft = df .select ('String' , fsplit .alias ("Characters" ))
dft .show (truncate = False )
String Characters
This is [T, h, i, s, , i, s]
To split a string into letters (characters) and remove last character
import pyspark .sql .functions as F
import pyspark .sql .types as T
from pyspark .sql import SparkSession
spark = SparkSession .builder .master ("local" ).appName ("test-app" ).getOrCreate ()
schema = T .StructType (
[
T .StructField ("String" , T .StringType (), True )
]
)
data = [["This is_" ]]
df = spark .createDataFrame (schema = schema , data = data )
print ("Using split function and remove last character:" )
fsplit = "split(String, '')"
fsplit = F .expr (f'slice({ fsplit } , 1, size({ fsplit } ) - 1)' )
dft = df .select ('String' , fsplit .alias ("Characters" ))
dft .show (truncate = False )
String Characters
This is_ [T, h, i, s, , i, s]
To append a string to all values in a column
import pyspark .sql .functions as F
import pyspark .sql .types as T
from pyspark .sql import SparkSession
spark = SparkSession .builder .master ("local" ).appName ("test-app" ).getOrCreate ()
schema = T .StructType (
[
T .StructField ("Str1" , T .StringType (), True ),
T .StructField ("Str2" , T .StringType (), True )
]
)
data = [("This is" , "a string" ),
("on a" , "different row" )]
df = spark .createDataFrame (schema = schema , data = data )
df = df .withColumn ("Str1_with_prefix" , F .concat (F .lit ("Prefix_" ), "Str1" ))
dft = df .select ("Str1" , "Str1_with_prefix" )
dft .show ()
Str1 Str1_with_prefix
This is Prefix_This is
on a Prefix_on a
To calculate cumulative sum of a column
import pandas as pd
from pyspark .sql import Window
import pyspark .sql .functions as F
import pyspark .sql .types as T
from pyspark .sql import SparkSession
spark = SparkSession .builder .master ("local" ).appName ("test-app" ).getOrCreate ()
df = pd .DataFrame ({'time' : [0 , 1 , 2 , 3 , 4 , 5 ],
'value' : [False , False , True , False , True , True ]})
df = spark .createDataFrame (df )
df = df .withColumn ("cml_n_true" , F .sum ((F .col ("value" ) == True ).cast ("int" )).over (Window .orderBy (F .col ("time" ).asc ())))
df = df .withColumn ("cml_n_false" , F .sum ((F .col ("value" ) == False ).cast ("int" )).over (Window .orderBy (F .col ("time" ).asc ())))
df .show ()
time value cml_n_true cml_n_false
0 false 0 1
1 false 0 2
2 true 1 2
3 false 1 3
4 true 2 3
5 true 3 3
To convert Unix time stamp to human readable format
import pyspark .sql .functions as F
import pyspark .sql .types as T
from pyspark .sql import SparkSession
spark = SparkSession .builder .master ("local" ).appName ("test-app" ).getOrCreate ()
schema = T .StructType (
[
T .StructField ("timestamp" , T .LongType (), True ),
]
)
data = [(1703224755 ,),
(1703285602 ,)]
df = spark .createDataFrame (schema = schema , data = data )
df = df .withColumn ("time_stamp_hrf" , F .from_unixtime (F .col ("timestamp" )))
df .show ()
timestamp time_stamp_hrf
1703224755 2023-12-22 06:59:15
1703285602 2023-12-22 23:53:22
To find percentage of a column
import pyspark .sql .functions as F
import pyspark .sql .types as T
from pyspark .sql .window import Window
from pyspark .sql import SparkSession
spark = SparkSession .builder .master ("local" ).appName ("test-app" ).getOrCreate ()
schema = T .StructType (
[
T .StructField ("Product" , T .StringType (), True ),
T .StructField ("Quantity" , T .IntegerType (), True ),
]
)
data = [("Laptop" , 12 ),
("Monitor" , 7 ),
("Mouse" , 8 ),
("Keyboard" , 9 )]
df = spark .createDataFrame (schema = schema , data = data )
df = df .withColumn ("%" , F .round (F .col ("Quantity" )/ F .sum ("Quantity" ).over (Window .partitionBy ())* 100 , 2 ))
dft = df .select ("Product" , "Quantity" , "%" ).orderBy (F .desc ("Quantity" ))
dft .show ()
Product Quantity %
Laptop 12 33.33
Keyboard 9 25.0
Mouse 8 22.22
Monitor 7 19.44
To find percentage of a column within a group using a window
import pyspark .sql .functions as F
import pyspark .sql .types as T
from pyspark .sql .window import Window
from pyspark .sql import SparkSession
spark = SparkSession .builder .master ("local" ).appName ("test-app" ).getOrCreate ()
schema = T .StructType (
[
T .StructField ("Location" , T .StringType (), True ),
T .StructField ("Product" , T .StringType (), True ),
T .StructField ("Quantity" , T .IntegerType (), True ),
]
)
data = [("Home" , "Laptop" , 12 ),
("Home" , "Monitor" , 7 ),
("Home" , "Mouse" , 8 ),
("Home" , "Keyboard" , 9 ),
("Office" , "Laptop" , 23 ),
("Office" , "Monitor" , 10 ),
("Office" , "Mouse" , 9 )]
df = spark .createDataFrame (schema = schema , data = data )
df = df .withColumn ("%" , F .round (F .col ("Quantity" )/ F .sum ("Quantity" ).over (Window .partitionBy ("Location" ))* 100 , 2 ))
dft = df .select ("Location" , "Product" , "Quantity" , "%" ).orderBy (F .desc ("Location" ), F .desc ("Quantity" ))
dft .show ()
Location Product Quantity %
Office Laptop 23 54.76
Office Monitor 10 23.81
Office Mouse 9 21.43
Home Laptop 12 33.33
Home Keyboard 9 25.0
Home Mouse 8 22.22
Home Monitor 7 19.44
To find percentage of a column within a group using .groupBy()
and a join
import pyspark .sql .functions as F
import pyspark .sql .types as T
from pyspark .sql .window import Window
from pyspark .sql import SparkSession
spark = SparkSession .builder .master ("local" ).appName ("test-app" ).getOrCreate ()
schema = T .StructType (
[
T .StructField ("Location" , T .StringType (), True ),
T .StructField ("Product" , T .StringType (), True ),
T .StructField ("Quantity" , T .IntegerType (), True ),
]
)
data = [("Home" , "Laptop" , 12 ),
("Home" , "Monitor" , 7 ),
("Home" , "Mouse" , 8 ),
("Home" , "Keyboard" , 9 ),
("Office" , "Laptop" , 23 ),
("Office" , "Monitor" , 10 ),
("Office" , "Mouse" , 9 )]
df = spark .createDataFrame (schema = schema , data = data )
df_sum = df .groupBy ("Location" ).agg (F .sum ("Quantity" ).alias ("Total_Quantity" ))
df = df .join (df_sum , on = "Location" ).withColumn ("%" , F .round (F .col ("Quantity" )/ F .col ("Total_Quantity" )* 100 , 2 ))
dft = df .select ("Location" , "Product" , "Quantity" , "%" ).orderBy (F .desc ("Location" ), F .desc ("Quantity" ))
dft .show ()
Location Product Quantity %
Office Laptop 23 54.76
Office Monitor 10 23.81
Office Mouse 9 21.43
Home Laptop 12 33.33
Home Keyboard 9 25.0
Home Mouse 8 22.22
Home Monitor 7 19.44
To find maximum value of a column
import pyspark .sql .functions as F
import pyspark .sql .types as T
from pyspark .sql .window import Window
from pyspark .sql import SparkSession
spark = SparkSession .builder .master ("local" ).appName ("test-app" ).getOrCreate ()
schema = T .StructType (
[
T .StructField ("Location" , T .StringType (), True ),
T .StructField ("Product" , T .StringType (), True ),
T .StructField ("Quantity" , T .IntegerType (), True ),
]
)
data = [("Home" , "Laptop" , 12 ),
("Home" , "Monitor" , 7 ),
("Home" , "Mouse" , 8 ),
("Home" , "Keyboard" , 9 ),
("Office" , "Laptop" , 23 ),
("Office" , "Monitor" , 10 ),
("Office" , "Mouse" , 9 )]
df = spark .createDataFrame (schema = schema , data = data )
df .show ()
max_val = df .select ("Quantity" ).rdd .max ()[0 ]
print (f"Maximum value of Quantity: { max_val } " )
Location Product Quantity
Home Laptop 12
Home Monitor 7
Home Mouse 8
Home Keyboard 9
Office Laptop 23
Office Monitor 10
Office Mouse 9
Maximum value of Quantity: 23
To add a column with count of elements per group
import pyspark .sql .functions as F
import pyspark .sql .types as T
from pyspark .sql .window import Window
from pyspark .sql import SparkSession
spark = SparkSession .builder .master ("local" ).appName ("test-app" ).getOrCreate ()
schema = T .StructType (
[
T .StructField ("Location" , T .StringType (), True ),
T .StructField ("Product" , T .StringType (), True ),
T .StructField ("Quantity" , T .IntegerType (), True ),
]
)
data = [("Home" , "Laptop" , 12 ),
("Home" , "Monitor" , 7 ),
("Home" , "Mouse" , 8 ),
("Home" , "Keyboard" , 9 ),
("Office" , "Laptop" , 23 ),
("Office" , "Monitor" , 10 ),
("Office" , "Mouse" , 9 )]
df = spark .createDataFrame (schema = schema , data = data )
df = df .withColumn ("count_per_group" , F .count (F .lit (1 )).over (Window .partitionBy (F .col ("Location" ))))
df .show ()
Location Product Quantity count_per_group
Home Laptop 12 4
Home Monitor 7 4
Home Mouse 8 4
Home Keyboard 9 4
Office Laptop 23 3
Office Monitor 10 3
Office Mouse 9 3
Dataframe join operations
To perform a full, outer, left, right join operations
import pyspark .sql .functions as F
import pyspark .sql .types as T
from pyspark .sql .window import Window
from pyspark .sql import SparkSession
spark = SparkSession .builder .master ("local" ).appName ("test-app" ).getOrCreate ()
schema = T .StructType (
[
T .StructField ("Name" , T .StringType (), True ),
T .StructField ("Score" , T .IntegerType (), True ),
]
)
data = [("Alice" , 10 ),
("Bob" , 11 )
]
df_a = spark .createDataFrame (schema = schema , data = data )
print ("Table A:" )
df_a .show ()
schema = T .StructType (
[
T .StructField ("Name" , T .StringType (), True ),
T .StructField ("Surname" , T .StringType (), True ),
T .StructField ("Age" , T .StringType (), True ),
]
)
data = [("Alice" , "Doe" , 12 ),
("Alice" , "Smith" , 30 ),
("Jane" , "Carter" , 7 ),
]
df_b = spark .createDataFrame (schema = schema , data = data )
print ("Table B:" )
df_b .show ()
df = df_a .join (df_b , on = "Name" , how = "full" )
print ("Full join on 'Name':" )
df .show ()
df = df_a .join (df_b , on = "Name" , how = "outer" )
print ("Outer join on 'Name':" )
df .show ()
df = df_a .join (df_b , df_a ["Name" ] == df_b ["Name" ])
print ("Join on 'Name' on equal condition:" )
df .show ()
df = df_a .join (df_b , on = "Name" , how = "inner" )
print ("Inner join on 'Name':" )
df .show ()
df = df_a .join (df_b , on = "Name" , how = "left" )
print ("Left join on 'Name':" )
df .show ()
df = df_a .join (df_b , on = "Name" , how = "left_outer" )
print ("Left-outer join on 'Name':" )
df .show ()
df = df_a .join (df_b , on = "Name" , how = "left_anti" )
print ("Left-anti join on 'Name':" )
df .show ()
df = df_a .join (df_b , on = "Name" , how = "left_semi" )
print ("Left-semi join on 'Name':" )
df .show ()
df = df_a .join (df_b , on = "Name" , how = "right" )
print ("Right join on 'Name':" )
df .show ()
df = df_a .join (df_b , on = "Name" , how = "right_outer" )
print ("Right-outer join on 'Name':" )
df .show ()
Table B:
Name Surname Age
Alice Doe 12
Alice Smith 30
Jane Carter 7
Full join on ‘Name’:
Name Score Surname Age
Alice 10 Doe 12
Alice 10 Smith 30
Bob 11 null null
Jane null Carter 7
Outer join on ‘Name’:
Name Score Surname Age
Alice 10 Doe 12
Alice 10 Smith 30
Bob 11 null null
Jane null Carter 7
Join on ‘Name’ on equal condition:
Name Score Name Surname Age
Alice 10 Alice Doe 12
Alice 10 Alice Smith 30
Inner join on ‘Name’:
Name Score Surname Age
Alice 10 Doe 12
Alice 10 Smith 30
Left join on ‘Name’:
Name Score Surname Age
Bob 11 null null
Alice 10 Smith 30
Alice 10 Doe 12
Left-outer join on ‘Name’:
Name Score Surname Age
Bob 11 null null
Alice 10 Smith 30
Alice 10 Doe 12
Left-anti join on ‘Name’:
Left-semi join on ‘Name’:
Right join on ‘Name’:
Name Score Surname Age
Alice 10 Doe 12
Alice 10 Smith 30
Jane null Carter 7
Right-outer join on ‘Name’:
Name Score Surname Age
Alice 10 Doe 12
Alice 10 Smith 30
Jane null Carter 7
To drop one of the duplicate columns after join
from pyspark .sql import Row
from pyspark .sql import SparkSession
spark = SparkSession .builder .master ("local" ).appName ("test-app" ).getOrCreate ()
df_a = spark .createDataFrame ([
Row (id = 1 , value = "A1" ),
Row (id = 1 , value = "B1" ),
Row (id = 1 , value = "C1" ),
Row (id = 2 , value = "A1" ),
Row (id = 2 , value = "X1" ),
Row (id = 2 , value = "Y1" )]
)
print ("Dataframe <<tld>>df_a<<tld>>:" )
df_1 .show ()
df_b = spark .createDataFrame ([
Row (id = 1 , updated = "A2" ),
Row (id = 1 , updated = "B1" ),
Row (id = 1 , updated = "C1" ),
Row (id = 2 , updated = "A1" ),
Row (id = 2 , updated = "X1" ),
Row (id = 2 , updated = "Y1" )]
)
print ("Dataframe <<tld>>df_b<<tld>>:" )
df_2 .show ()
df = df_a .join (df_b , on = [df_a ["id" ] == df_b ["id" ], df_a ["value" ] == df_b ["updated" ]], how = "full" )
print ("Full join on <<tld>>df_a['value'] == df_b['updated']<<tld>>:" )
df .show ()
df = df_a .join (df_b , on = [df_a ["id" ] == df_b ["id" ], df_a ["value" ] == df_b ["updated" ]], how = "full" ).drop (df_b ["id" ])
print ("Full join on <<tld>>df_a['value'] == df_b['updated']<<tld>> with dropped <<tld>>df_b['id']<<tld>> column:" )
df .show ()
id value
1 A1
1 B1
1 C1
2 A1
2 X1
2 Y1
Dataframe df_b
:
id updated
1 A2
1 B1
1 C1
2 A1
2 X1
2 Y1
Full join on df_a['value'] == df_b['updated']
:
id value id updated
1.0 A1 null null
null null 1.0 A2
1.0 B1 1.0 B1
1.0 C1 1.0 C1
2.0 A1 2.0 A1
2.0 X1 2.0 X1
2.0 Y1 2.0 Y1
Full join on df_a['value'] == df_b['updated']
with dropped df_b['id']
column:
id value updated
1.0 A1 null
null null A2
1.0 B1 B1
1.0 C1 C1
2.0 A1 A1
2.0 X1 X1
2.0 Y1 Y1
To group by and aggregate into a map using F.map_from_entries()
import pyspark .sql .functions as F
from pyspark .sql import Row
from pyspark .sql .window import Window
from pyspark .sql import SparkSession
spark = SparkSession .builder .master ("local" ).appName ("test-app" ).getOrCreate ()
df = spark .createDataFrame ([
Row (id = 1 , key = 'a' , value = "A1" ),
Row (id = 1 , key = 'b' , value = "B1" ),
Row (id = 1 , key = 'c' , value = "C1" ),
Row (id = 2 , key = 'a' , value = "A1" ),
Row (id = 2 , key = 'x' , value = "X1" ),
Row (id = 2 , key = 'y' , value = "Y1" )]
)
print ("Dataframe with keys and values:" )
df .show (truncate = False )
dft = df .groupBy ("id" ).agg (F .map_from_entries (F .collect_list (
F .struct ("key" , "value" ))).alias ("key_value" )
)
print ("Dataframe with key -> value mapping" )
dft .show (truncate = False )
<< print_schema ("dft" )>> dft .printSchema ()
id key value
1 a A1
1 b B1
1 c C1
2 a A1
2 x X1
2 y Y1
Dataframe with key -> value mapping
id key_value
1 {a -> A1, b -> B1, c -> C1}
2 {a -> A1, x -> X1, y -> Y1}
Schema of dft
is:
root
|-- id: long (nullable = true)
|-- key_value: map (nullable = false)
| |-- key: string
| |-- value: string (valueContainsNull = true)
To group by and aggregate into a map using UDF
import pyspark .sql .functions as F
from pyspark .sql import Row
from pyspark .sql import SparkSession
import pyspark .sql .types as T
spark = SparkSession .builder .master ("local" ).appName ("test-app" ).getOrCreate ()
df = spark .createDataFrame ([
Row (id = 1 , key = 'a' , value = "A1" ),
Row (id = 1 , key = 'b' , value = "B1" ),
Row (id = 1 , key = 'c' , value = "C1" ),
Row (id = 2 , key = 'a' , value = "A1" ),
Row (id = 2 , key = 'x' , value = "X1" ),
Row (id = 2 , key = 'y' , value = "Y1" )]
)
print ("Dataframe with keys and values:" )
df .show ()
@F .udf (returnType = T .MapType (T .StringType (), T .StringType ()))
def map_array (column ):
return dict (column )
dft = (df .groupBy ("id" )
.agg (F .collect_list (F .struct ("key" , "value" )).alias ("key_value" ))
.withColumn ('key_value' , map_array ('key_value' )))
print ("Dataframe with keys and values:" )
dft .show (truncate = False )
<< print_schema ("dft" )>> dft .printSchema ()
id key value
1 a A1
1 b B1
1 c C1
2 a A1
2 x X1
2 y Y1
Dataframe with keys and values:
id key_value
1 {a -> A1, b -> B1, c -> C1}
2 {x -> X1, a -> A1, y -> Y1}
Schema of dft
is:
root
|-- id: long (nullable = true)
|-- key_value: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
To agregate over multiple columns and sum values of dictionaries
import pyspark .sql .types as T
import pyspark .sql .functions as F
from pyspark .sql import SparkSession
df_schema = T .StructType ([T .StructField ('clid' , T .StringType (), True ),
T .StructField ('coef_1' , T .MapType (T .StringType (), T .DoubleType (), True ), False ),
T .StructField ('coef_2' , T .MapType (T .StringType (), T .DoubleType (), True ), False ),
T .StructField ('coef_3' , T .MapType (T .StringType (), T .DoubleType (), True ), False )])
df_data = [["X" , {'B' : 0.4 , 'C' : 0.4 }, {'B' : 0.33 , 'C' : 0.5 }, {'A' : 0.5 , 'C' : 0.33 }],
["Y" , {'B' : 0.67 , 'C' : 0.33 }, {'B' : 0.85 }, {'A' : 0.4 , 'C' : 0.57 }],
]
spark = SparkSession .builder \
.appName ("Parse DataFrame Schema" )\
.getOrCreate ()
df = spark .createDataFrame (data = df_data , schema = df_schema )
df = df .withColumn ("coef_total" , F .col ("coef_1" ))
for i in range (2 ,4 ):
df = df .withColumn ("coef_total" , F .map_zip_with ("coef_total" , f"coef_{ i } " ,
lambda k , v1 , v2 : F .when (v1 .isNull (), 0 ).otherwise (v1 ) + F .when (v2 .isNull (), 0 ).otherwise (v2 )))
df .show (truncate = False )
clid coef_1 coef_2 coef_3 coef_total
X {B -> 0.4, C -> 0.4} {B -> 0.33, C -> 0.5} {A -> 0.5, C -> 0.33} {B -> 0.73, C -> 1.23, A -> 0.5}
Y {B -> 0.67, C -> 0.33} {B -> 0.85} {A -> 0.4, C -> 0.57} {B -> 1.52, C -> 0.8999999999999999, A -> 0.4}
import pyspark .sql .functions as F
import pyspark .sql .types as T
from pyspark .sql import SparkSession
spark = SparkSession .builder .master ("local" ).appName ("test-app" ).getOrCreate ()
schema = T .StructType (
[
T .StructField ("index" , T .IntegerType (), True ),
T .StructField ("value" , T .StringType (), True ),
]
)
data = [(1 , "Home" ),
(2 , "School" ),
(3 , "Home" ),
(4 , "Home" ),
(5 , "Office" ),
(6 , "Office" ),
(7 , "Office" ),
(8 , "Mall" ),
(9 , "Mall" ),
(10 , "School" )]
df = spark .createDataFrame (schema = schema , data = data ).repartition (3 )
df = df .withColumn ("partition" , F .spark_partition_id ()).orderBy ("index" )
print ("Original dataframe:" )
df .show ()
print ("Sampled dataframe:" )
dft = df .sample (fraction = 0.5 , seed = 1 ).orderBy ("index" )
dft .show ()
index value partition
1 Home 1
2 School 0
3 Home 0
4 Home 2
5 Office 2
6 Office 2
7 Office 1
8 Mall 0
9 Mall 1
10 School 0
Sampled dataframe:
index value partition
3 Home 0
4 Home 2
7 Office 1
8 Mall 0
9 Mall 1
To generate a UUID for every row
import pyspark .sql .functions as F
import pyspark .sql .types as T
from pyspark .sql import SparkSession
import random
import uuid
spark = SparkSession .builder .master ("local" ).appName ("test-app" ).getOrCreate ()
schema = T .StructType (
[
T .StructField ("Name" , T .StringType (), True ),
]
)
data = [["Alice" ],
["Bon" ],
["John" ],
["Cecile" ]
]
df = spark .createDataFrame (schema = schema , data = data ).repartition (2 )
def _generate_uuid (uuid_gen , v = 10 ):
def _replace_byte (value : int , byte : int ):
byte = byte & 0xF
bit_shift = 76
mask = ~ (0xF << bit_shift )
return value & mask | (byte << bit_shift )
uuid_ = uuid_gen .generate ()
return uuid .UUID (int = (_replace_byte (uuid_ .int , v )))
class RandomDistributedUUIDGenerator :
def generate (self ):
return uuid .uuid4 ()
class SeedBasedUUIDGenerator :
def __init__ (self , seed ):
self .rnd = random .Random (seed )
def generate (self ):
return uuid .UUID (int = self .rnd .getrandbits (128 ), version = 4 )
gen = RandomDistributedUUIDGenerator ()
udf_generate_uuid = F .udf (lambda : _generate_uuid (gen ).__str__ (), T .StringType ())
df = df .withColumn ("UUID_random_distributed" , udf_generate_uuid ())
seed_for_rng = 1
gen = SeedBasedUUIDGenerator (seed_for_rng )
udf_generate_uuid = F .udf (lambda : _generate_uuid (gen ).__str__ (), T .StringType ())
df = df .withColumn ("UUID_seed_based" , udf_generate_uuid ())
print ("The dataframe resides in two partitions. Seed-based random UUID generator uses the same seed on both partitions, yielding identical values." )
df .show (truncate = False )
Name UUID_random_distributed UUID_seed_based
John 4e9a3bb1-a189-a25e-8389-7f8382635b09 cd613e30-d8f1-aadf-91b7-584a2265b1f5
Bon 16cd1549-0c74-a483-9bbe-707e59e0796f 1e2feb89-414c-a43c-9027-c4d1c386bbc4
Cecile b8b05619-6004-aa75-b98b-7e1c83c9f301 cd613e30-d8f1-aadf-91b7-584a2265b1f5
Alice b1f1a9fb-feb9-a946-9171-3e7cb577fdaa 1e2feb89-414c-a43c-9027-c4d1c386bbc4