-
Notifications
You must be signed in to change notification settings - Fork 90
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
assess_jobs fails with “PySparkValueError: [CANNOT_BE_NONE] Argument obj
can not be None.”
#297
Comments
@tamilselvanveeramani This is public repo, don't post links to internal workspaces, remove them immediately. |
@tamilselvanveeramani Can you figure out if it's the value that is none or a column that is none. If it's value - add skipping in the crawlerbase, otherwise make column nullable |
removed the workspace url |
Linking issue #346 |
this is just filtering records for None |
in Azure demo workspace assess_jobs fails with “PySparkValueError: [CANNOT_BE_NONE] Argument
obj
can not be None.”Here is the error traces
PySparkValueError: [CANNOT_BE_NONE] Argument
obj
can not be None.PySparkValueError Traceback (most recent call last)
File ~/.ipykernel/1386/command--1-1316133125:18
15 entry = [ep for ep in metadata.distribution("databricks_labs_ucx").entry_points if ep.name == "runtime"]
16 if entry:
17 # Load and execute the entrypoint, assumes no parameters
---> 18 entry[0].load()()
19 else:
20 import databricks_labs_ucx
File /local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.10/site-packages/databricks/labs/ucx/runtime.py:211, in main()
210 def main():
--> 211 trigger(*sys.argv)
File /local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.10/site-packages/databricks/labs/ucx/framework/tasks.py:91, in trigger(*argv)
88 cfg = WorkspaceConfig.from_file(Path(args["config"]))
89 logging.getLogger("databricks").setLevel(cfg.log_level)
---> 91 current_task.fn(cfg)
File /local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.10/site-packages/databricks/labs/ucx/runtime.py:104, in assess_jobs(cfg)
102 ws = WorkspaceClient(config=cfg.to_databricks_config())
103 crawler = JobsCrawler(ws, RuntimeBackend(), cfg.inventory_database)
--> 104 crawler.snapshot()
File /local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.10/site-packages/databricks/labs/ucx/assessment/crawlers.py:146, in JobsCrawler.snapshot(self)
145 def snapshot(self) -> list[ClusterInfo]:
--> 146 return self._snapshot(self._try_fetch, self._crawl)
File /local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.10/site-packages/databricks/labs/ucx/framework/crawlers.py:215, in CrawlerBase._snapshot(self, fetcher, loader)
213 logger.debug(f"[{self._full_name}] crawling new batch for {self._table}")
214 loaded_records = list(loader())
--> 215 self._append_records(loaded_records)
216 return loaded_records
File /local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.10/site-packages/databricks/labs/ucx/framework/crawlers.py:222, in CrawlerBase._append_records(self, items)
220 return
221 logger.debug(f"[{self._full_name}] found {len(items)} new records for {self._table}")
--> 222 self._backend.save_table(self._full_name, items, mode="append")
File /local_disk0/.ephemeral_nfs/cluster_libraries/python/lib/python3.10/site-packages/databricks/labs/ucx/framework/crawlers.py:121, in RuntimeBackend.save_table(self, full_name, rows, mode)
119 return
120 # pyspark deals well with lists of dataclass instances, as long as schema is provided
--> 121 df = self._spark.createDataFrame(rows, self._schema_for(rows[0]))
122 df.write.saveAsTable(full_name, mode=mode)
File /databricks/spark/python/pyspark/instrumentation_utils.py:48, in _wrap_function..wrapper(*args, **kwargs)
46 start = time.perf_counter()
47 try:
---> 48 res = func(*args, **kwargs)
49 logger.log_success(
50 module_name, class_name, function_name, time.perf_counter() - start, signature
51 )
52 return res
File /databricks/spark/python/pyspark/sql/session.py:1427, in SparkSession.createDataFrame(self, data, schema, samplingRatio, verifySchema)
1422 if has_pandas and isinstance(data, pd.DataFrame):
1423 # Create a DataFrame from pandas DataFrame.
1424 return super(SparkSession, self).createDataFrame( # type: ignore[call-overload]
1425 data, schema, samplingRatio, verifySchema
1426 )
-> 1427 return self._create_dataframe(
1428 data, schema, samplingRatio, verifySchema # type: ignore[arg-type]
1429 )
File /databricks/spark/python/pyspark/sql/session.py:1477, in SparkSession._create_dataframe(self, data, schema, samplingRatio, verifySchema)
1475 rdd, struct = self._createFromRDD(data.map(prepare), schema, samplingRatio)
1476 else:
-> 1477 rdd, struct = self._createFromLocal(map(prepare, data), schema)
1478 jrdd = self._jvm.SerDeUtil.toJavaArray(rdd._to_java_object_rdd())
1479 jdf = self._jsparkSession.applySchemaToPythonRDD(jrdd.rdd(), struct.json())
File /databricks/spark/python/pyspark/sql/session.py:1079, in SparkSession._createFromLocal(self, data, schema)
1071 def _createFromLocal(
1072 self, data: Iterable[Any], schema: Optional[Union[DataType, List[str]]]
1073 ) -> Tuple["RDD[Tuple]", StructType]:
1074 """
1075 Create an RDD for DataFrame from a list or pandas.DataFrame, returns the RDD and schema.
1076 This would be broken with table acl enabled as user process does not have permission to
1077 write temp files.
1078 """
-> 1079 internal_data, struct = self._wrap_data_schema(data, schema)
1080 return self._sc.parallelize(internal_data), struct
File /databricks/spark/python/pyspark/sql/session.py:1043, in SparkSession._wrap_data_schema(self, data, schema)
1038 def _wrap_data_schema(
1039 self, data: Iterable[Any], schema: Optional[Union[DataType, List[str]]]
1040 ) -> Tuple[Iterable[Tuple], StructType]:
1041 # make sure data could consumed multiple times
1042 if not isinstance(data, list):
-> 1043 data = list(data)
1045 if schema is None or isinstance(schema, (list, tuple)):
1046 struct = self._inferSchemaFromList(data, names=schema)
File /databricks/spark/python/pyspark/sql/session.py:1443, in SparkSession._create_dataframe..prepare(obj)
1441 @no_type_check
1442 def prepare(obj):
-> 1443 verify_func(obj)
1444 return obj
File /databricks/spark/python/pyspark/sql/types.py:2187, in _make_type_verifier..verify(obj)
2185 def verify(obj: Any) -> None:
2186 if not verify_nullability(obj):
-> 2187 verify_value(obj)
File /databricks/spark/python/pyspark/sql/types.py:2164, in _make_type_verifier..verify_struct(obj)
2162 d = obj.dict
2163 for f, verifier in verifiers:
-> 2164 verifier(d.get(f))
2165 else:
2166 raise PySparkTypeError(
2167 error_class="CANNOT_ACCEPT_OBJECT_IN_TYPE",
2168 message_parameters={
(...)
2172 },
2173 )
File /databricks/spark/python/pyspark/sql/types.py:2186, in _make_type_verifier..verify(obj)
2185 def verify(obj: Any) -> None:
-> 2186 if not verify_nullability(obj):
2187 verify_value(obj)
File /databricks/spark/python/pyspark/sql/types.py:1989, in _make_type_verifier..verify_nullability(obj)
1987 return True
1988 else:
-> 1989 raise PySparkValueError(
1990 error_class="CANNOT_BE_NONE",
1991 message_parameters={"arg_name": "obj"},
1992 )
1993 else:
1994 return False
PySparkValueError: [CANNOT_BE_NONE] Argument
obj
can not be None.The text was updated successfully, but these errors were encountered: