Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUGFIX] fix reusing a sparkSession using the force_reuse_spark_context keyword #3245

Closed
wants to merge 31 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
7201b77
[MAINTENANCE] Adds spark_config and force_reuse_spark_context fields…
Jul 28, 2021
e16462d
Merge branch 'develop' into maintenance/reuse-spark-context-emr
pasmavie Jul 28, 2021
a4e4ff8
Merge branch 'develop' into maintenance/reuse-spark-context-emr
pasmavie Aug 6, 2021
d2c80fa
Adds changes bullet point to docs_rtd/changelog.rst
pasmavie Aug 6, 2021
e40de89
Merge remote-tracking branch 'origin/develop' into maintenance/reuse-…
mbakunze Aug 13, 2021
1ce2ca1
Merge branch 'develop' into maintenance/reuse-spark-context-emr
mbakunze Aug 13, 2021
01e5e51
remove spark_config from DatasourceConfigSchema
mbakunze Aug 13, 2021
851cd04
allow None for force_reuse_spark_context
mbakunze Aug 13, 2021
9ec3735
fix linting
mbakunze Aug 13, 2021
c170c9b
remove missing entry for force_reuse_spark_context
mbakunze Aug 13, 2021
c5a6c66
adding back spark_config
mbakunze Aug 14, 2021
d5ca8d5
Merge branch 'develop' into maintenance/reuse-spark-context-emr
mbakunze Aug 15, 2021
10aca2a
adding unittests for `force_reuse_spark_context`
mbakunze Aug 16, 2021
44cd676
Merge remote-tracking branch 'origin/maintenance/reuse-spark-context-…
mbakunze Aug 16, 2021
1adde14
Merge branch 'develop' into maintenance/reuse-spark-context-emr
mbakunze Aug 16, 2021
cb39f1b
adding unittest for `spark_config` pass through
mbakunze Aug 16, 2021
bbc4ec0
Merge remote-tracking branch 'origin/maintenance/reuse-spark-context-…
mbakunze Aug 16, 2021
cfb8c98
adding unittest for `spark_config` pass through + run black
mbakunze Aug 16, 2021
82abefe
add docstrings to unittests to clarify intent
mbakunze Aug 16, 2021
d336103
add isort:skip for spark imports
mbakunze Aug 16, 2021
3ae3805
rename test to be more meaningful
mbakunze Aug 16, 2021
16f1c71
fix TypeError in unittest
mbakunze Aug 16, 2021
0a4ff1d
adding a non-default config in order to hopefully see the Py4jError i…
mbakunze Aug 16, 2021
6bc4ac7
remove explicit P4jError test
mbakunze Aug 16, 2021
014140c
Merge branch 'develop' into maintenance/reuse-spark-context-emr
mbakunze Aug 17, 2021
e0a128e
add explicit test for sparkdf kwargs pass-through
mbakunze Aug 17, 2021
c634a28
Merge remote-tracking branch 'origin/maintenance/reuse-spark-context-…
mbakunze Aug 17, 2021
ff93e0b
update changelog entry to reflect this is a BUGFIX
mbakunze Aug 17, 2021
3ca3fb1
fix linting
mbakunze Aug 17, 2021
2daf0eb
Merge branch 'develop' into maintenance/reuse-spark-context-emr
mbakunze Aug 19, 2021
1ac14a7
Merge branch 'develop' into maintenance/reuse-spark-context-emr
mbakunze Aug 19, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions docs_rtd/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@ Changelog

develop
-----------------

* [BUGFIX] Add force_reuse_spark_context to DatasourceConfigSchema
* [FEATURE] Implement V3 expect_column_pair_values_to_be_equal expectation for SQL Alchemy execution engine (#3267)
* [FEATURE] Implement V3 expect_column_pair_values_to_be_equal expectation for Pandas execution engine (#3252)
* [FEATURE] Expectations tests for BigQuery backend (#3219) (Thanks @jdimatteo)
* [BUGFIX] Fix deprecation warning for importing from collections (#3228)
* [DOCS] Document BigQuery test dataset configuration (#3273) (Thanks @jdimatteo)


0.13.28
-----------------
* [FEATURE] Implement ColumnPairValuesInSet metric for PandasExecutionEngine
Expand Down
5 changes: 4 additions & 1 deletion great_expectations/data_context/types/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -788,7 +788,10 @@ class Meta:

class_name = fields.String(missing="Datasource")
module_name = fields.String(missing="great_expectations.datasource")

force_reuse_spark_context = fields.Bool(required=False, allow_none=True)
spark_config = fields.Dict(
keys=fields.Str(), values=fields.Str(), required=False, allow_none=True
)
execution_engine = fields.Nested(
ExecutionEngineConfigSchema, required=False, allow_none=True
)
Expand Down
88 changes: 88 additions & 0 deletions tests/datasource/test_sparkdf_datasource.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,94 @@ def test_sparkdf_datasource_custom_data_asset(
assert res.success is True


def test_force_reuse_spark_context(
data_context_parameterized_expectation_suite, tmp_path_factory, test_backends
):
"""
Ensure that using an external sparkSession can be used by specfying the
force_reuse_spark_context argument.
"""
if "SparkDFDataset" not in test_backends:
pytest.skip("No spark backend selected.")
from pyspark.sql import SparkSession # isort:skip

dataset_name = "test_spark_dataset"

spark = SparkSession.builder.appName("local").master("local[1]").getOrCreate()
data = {"col1": [0, 1, 2], "col2": ["a", "b", "c"]}

spark_df = spark.createDataFrame(pd.DataFrame(data))
tmp_parquet_filename = os.path.join(
tmp_path_factory.mktemp(dataset_name).as_posix(), dataset_name
)
spark_df.write.format("parquet").save(tmp_parquet_filename)

data_context_parameterized_expectation_suite.add_datasource(
dataset_name,
class_name="SparkDFDatasource",
force_reuse_spark_context=True,
module_name="great_expectations.datasource",
batch_kwargs_generators={},
)

df = spark.read.format("parquet").load(tmp_parquet_filename)
batch_kwargs = {"dataset": df, "datasource": dataset_name}
_ = data_context_parameterized_expectation_suite.create_expectation_suite(
dataset_name
)
batch = data_context_parameterized_expectation_suite.get_batch(
batch_kwargs=batch_kwargs, expectation_suite_name=dataset_name
)
results = batch.expect_column_max_to_be_between("col1", min_value=1, max_value=100)
assert results.success, "Failed to use external SparkSession"
spark.stop()


def test_spark_kwargs_are_passed_through(
data_context_parameterized_expectation_suite,
tmp_path_factory,
test_backends,
spark_session,
):
"""
Ensure that an external SparkSession is not stopped when the spark_config matches
the one specfied in the GE Context.
"""
if "SparkDFDataset" not in test_backends:
pytest.skip("No spark backend selected.")
dataset_name = "test_spark_dataset"
data_context_parameterized_expectation_suite.add_datasource(
dataset_name,
class_name="SparkDFDatasource",
spark_config=dict(spark_session.sparkContext.getConf().getAll()),
force_reuse_spark_context=False,
module_name="great_expectations.datasource",
batch_kwargs_generators={},
)
datasource_config = data_context_parameterized_expectation_suite.get_datasource(
dataset_name
).config
assert datasource_config["spark_config"] == dict(
spark_session.sparkContext.getConf().getAll()
)
assert datasource_config["force_reuse_spark_context"] == False

dataset_name = "test_spark_dataset_2"
data_context_parameterized_expectation_suite.add_datasource(
dataset_name,
class_name="SparkDFDatasource",
spark_config={},
force_reuse_spark_context=True,
module_name="great_expectations.datasource",
batch_kwargs_generators={},
)
datasource_config = data_context_parameterized_expectation_suite.get_datasource(
dataset_name
).config
assert datasource_config["spark_config"] == {}
assert datasource_config["force_reuse_spark_context"] == True


def test_create_sparkdf_datasource(
data_context_parameterized_expectation_suite, tmp_path_factory, test_backends
):
Expand Down