Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove snakebite-py3 based HDFS hooks and sensors #31262

Merged
merged 1 commit into from
May 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions airflow/contrib/sensors/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,12 +114,6 @@
'airflow.providers.google.cloud.sensors.gcs.GCSUploadSessionCompleteSensor'
),
},
'hdfs_sensor': {
'HdfsFolderSensor': 'airflow.providers.apache.hdfs.sensors.hdfs.HdfsFolderSensor',
'HdfsRegexSensor': 'airflow.providers.apache.hdfs.sensors.hdfs.HdfsRegexSensor',
'HdfsSensorFolder': 'airflow.providers.apache.hdfs.sensors.hdfs.HdfsFolderSensor',
'HdfsSensorRegex': 'airflow.providers.apache.hdfs.sensors.hdfs.HdfsRegexSensor',
},
'imap_attachment_sensor': {
'ImapAttachmentSensor': 'airflow.providers.imap.sensors.imap_attachment.ImapAttachmentSensor',
},
Expand Down
4 changes: 0 additions & 4 deletions airflow/hooks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,6 @@
"DruidDbApiHook": "airflow.providers.apache.druid.hooks.druid.DruidDbApiHook",
"DruidHook": "airflow.providers.apache.druid.hooks.druid.DruidHook",
},
"hdfs_hook": {
"HDFSHook": "airflow.providers.apache.hdfs.hooks.hdfs.HDFSHook",
"HDFSHookException": "airflow.providers.apache.hdfs.hooks.hdfs.HDFSHookException",
},
"hive_hooks": {
"HIVE_QUEUE_PRIORITIES": "airflow.providers.apache.hive.hooks.hive.HIVE_QUEUE_PRIORITIES",
"HiveCliHook": "airflow.providers.apache.hive.hooks.hive.HiveCliHook",
Expand Down
18 changes: 18 additions & 0 deletions airflow/providers/apache/hdfs/CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,24 @@
Changelog
---------

4.0.0
-----

Breaking changes
~~~~~~~~~~~~~~~~

The original HDFS Hook and sensor has been removed. It used the old HDFS snakebite-py3 library that had no
update in years and the protobuf they are using reached end of life.

The 3.* version of the provider is still available and can be used if you need to use the old hooks and
sensors.

The ``HDFSHook``, ``HDFSSensor``, ``HdfsRegexSensor``, ``HdfsRegexSensor`` that have been removed from
this provider and they are not available any more. If you want to continue using them,
you can use 3.* version of the provider, but the recommendation is to switch to the new
``WebHDFSHook`` and ``WebHDFSSensor`` that use the ``WebHDFS`` API.


3.2.1
.....

Expand Down
105 changes: 22 additions & 83 deletions airflow/providers/apache/hdfs/hooks/hdfs.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
Expand All @@ -15,98 +14,38 @@
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
"""Hook for HDFS operations"""
from __future__ import annotations

from typing import Any

from airflow.configuration import conf
from airflow.exceptions import AirflowException
from airflow import AirflowException
from airflow.hooks.base import BaseHook

try:
from snakebite.client import AutoConfigClient, Client, HAClient, Namenode

snakebite_loaded = True
except ImportError:
snakebite_loaded = False
_EXCEPTION_MESSAGE = """The old HDFS Hooks have been removed in 4.0.0 version of the apache.hdfs provider.
Please convert your DAGs to use the WebHdfsHook or downgrade the provider to below 4.*
if you want to continue using it.
If you want to use earlier provider you can downgrade to latest released 3.* version
using `pip install apache-airflow-providers-hdfs==3.2.1` (no constraints)
"""


class HDFSHookException(AirflowException):
"""Exception specific for HDFS"""


class HDFSHook(BaseHook):
"""
Interact with HDFS. This class is a wrapper around the snakebite library.

:param hdfs_conn_id: Connection id to fetch connection info
:param proxy_user: effective user for HDFS operations
:param autoconfig: use snakebite's automatically configured client
This Exception has been removed and is not functional. Please convert your DAGs to use the
WebHdfsHook or downgrade the provider to below 4.* if you want to continue using it.
If you want to use earlier provider you can downgrade to latest released 3.* version
using `pip install apache-airflow-providers-hdfs==3.2.1` (no constraints).
"""

conn_name_attr = "hdfs_conn_id"
default_conn_name = "hdfs_default"
conn_type = "hdfs"
hook_name = "HDFS"
def __init__(self, *args, **kwargs):
raise Exception(_EXCEPTION_MESSAGE)

def __init__(
self,
hdfs_conn_id: str | set[str] = "hdfs_default",
proxy_user: str | None = None,
autoconfig: bool = False,
):
super().__init__()
if not snakebite_loaded:
raise ImportError(
"This HDFSHook implementation requires snakebite, but "
"snakebite is not compatible with Python 3 "
"(as of August 2015). Please help by submitting a PR!"
)
self.hdfs_conn_id = {hdfs_conn_id} if isinstance(hdfs_conn_id, str) else hdfs_conn_id
self.proxy_user = proxy_user
self.autoconfig = autoconfig

def get_conn(self) -> Any:
"""Returns a snakebite HDFSClient object."""
# When using HAClient, proxy_user must be the same, so is ok to always
# take the first.
effective_user = self.proxy_user
autoconfig = self.autoconfig
use_sasl = conf.get("core", "security") == "kerberos"

try:
connections = [self.get_connection(i) for i in self.hdfs_conn_id]

if not effective_user:
effective_user = connections[0].login
if not autoconfig:
autoconfig = connections[0].extra_dejson.get("autoconfig", False)
hdfs_namenode_principal = connections[0].extra_dejson.get("hdfs_namenode_principal")
except AirflowException:
if not autoconfig:
raise

if autoconfig:
# will read config info from $HADOOP_HOME conf files
client = AutoConfigClient(effective_user=effective_user, use_sasl=use_sasl)
elif len(connections) == 1:
client = Client(
connections[0].host,
connections[0].port,
effective_user=effective_user,
use_sasl=use_sasl,
hdfs_namenode_principal=hdfs_namenode_principal,
)
elif len(connections) > 1:
name_node = [Namenode(conn.host, conn.port) for conn in connections]
client = HAClient(
name_node,
effective_user=effective_user,
use_sasl=use_sasl,
hdfs_namenode_principal=hdfs_namenode_principal,
)
else:
raise HDFSHookException("conn_id doesn't exist in the repository and autoconfig is not specified")
class HDFSHook(BaseHook):
"""
This Hook has been removed and is not functional. Please convert your DAGs to use the
WebHdfsHook or downgrade the provider to below 4.*. if you want to continue using it.
If you want to use earlier provider you can downgrade to latest released 3.* version
using `pip install apache-airflow-providers-hdfs==3.2.1` (no constraints).
"""

return client
def __init__(self, *args, **kwargs):
raise Exception(_EXCEPTION_MESSAGE)
15 changes: 1 addition & 14 deletions airflow/providers/apache/hdfs/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ description: |

suspended: false
versions:
- 4.0.0
- 3.2.1
- 3.2.0
- 3.1.0
Expand All @@ -41,14 +42,11 @@ versions:

dependencies:
- apache-airflow>=2.4.0
- snakebite-py3
- hdfs[avro,dataframe,kerberos]>=2.0.4

integrations:
- integration-name: Hadoop Distributed File System (HDFS)
external-doc-url: https://hadoop.apache.org/docs/r1.2.1/hdfs_design.html
how-to-guide:
- /docs/apache-airflow-providers-apache-hdfs/operators/hdfs.rst
logo: /integration-logos/apache/hadoop.png
tags: [apache]
- integration-name: WebHDFS
Expand All @@ -59,22 +57,11 @@ integrations:
tags: [apache]

sensors:
- integration-name: Hadoop Distributed File System (HDFS)
python-modules:
- airflow.providers.apache.hdfs.sensors.hdfs
- integration-name: WebHDFS
python-modules:
- airflow.providers.apache.hdfs.sensors.web_hdfs

hooks:
- integration-name: Hadoop Distributed File System (HDFS)
python-modules:
- airflow.providers.apache.hdfs.hooks.hdfs
- integration-name: WebHDFS
python-modules:
- airflow.providers.apache.hdfs.hooks.webhdfs


connection-types:
- hook-class-name: airflow.providers.apache.hdfs.hooks.hdfs.HDFSHook
connection-type: hdfs
Loading