Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ADAP-670] [Regression] communication error in 1.5 #518

Closed
2 tasks done
falonso-alo opened this issue Jul 5, 2023 · 21 comments · Fixed by #519 or #523
Closed
2 tasks done

[ADAP-670] [Regression] communication error in 1.5 #518

falonso-alo opened this issue Jul 5, 2023 · 21 comments · Fixed by #519 or #523
Assignees
Labels
type:bug Something isn't working type:regression

Comments

@falonso-alo
Copy link

falonso-alo commented Jul 5, 2023

Is this a regression in a recent version of dbt-redshift?

  • I believe this is a regression in dbt-redshift functionality
  • I have searched the existing issues, and I could not find an existing issue for this regression

Current Behavior

dbt compile fails with a "communication error"/OSError using DBT 1.5.

image

Expected/Previous Behavior

dbt compile succeeds using DBT 1.4

image

Steps To Reproduce

  1. Install DBT 1.5
  2. Run dbt compile

Relevant log output

No response

Environment

- OS: MacOS 13.3.1 (a) and Ubuntu
- Python: 3.10.5
- dbt-core (working version): 1.4.6
- dbt-redshift (working version): 1.4.6
- dbt-core (regression version): 1.5.2.
- dbt-redshift (regression version): 1.5.0
pip packages (working)
Package                  Version
------------------------ --------
agate                    1.7.0
annotated-types          0.5.0
appdirs                  1.4.4
asn1crypto               1.5.1
attrs                    23.1.0
Babel                    2.12.1
beautifulsoup4           4.12.2
betterproto              1.2.5
black                    23.3.0
boto3                    1.26.165
botocore                 1.29.165
certifi                  2023.5.7
cffi                     1.15.1
chardet                  5.1.0
charset-normalizer       3.1.0
click                    8.1.3
click-config-file        0.6.0
colorama                 0.4.6
configobj                5.0.8
dataclasses-json         0.5.4
dbt-core                 1.4.6
dbt-extractor            0.4.1
dbt-postgres             1.4.6
dbt-redshift             1.4.0
decorator                5.1.1
diff-cover               7.6.0
exceptiongroup           1.1.2
future                   0.18.3
graphql-core             3.2.3
grpclib                  0.4.5
gunicorn                 20.1.0
h2                       4.1.0
hologram                 0.0.16
hpack                    4.0.0
hyperframe               6.0.1
idna                     3.4
iniconfig                2.0.0
isodate                  0.6.1
Jinja2                   3.1.2
jinja2-simple-tags       0.5.0
jmespath                 1.0.1
jsonschema               4.17.3
leather                  0.3.4
Logbook                  1.5.3
lxml                     4.9.3
markdown-it-py           3.0.0
MarkupSafe               2.0.1
marshmallow              3.19.0
marshmallow-enum         1.5.1
mashumaro                3.3.1
mdurl                    0.1.2
minimal-snowplow-tracker 0.0.2
montecarlodata           0.40.2
msgpack                  1.0.5
multidict                6.0.4
mypy-extensions          1.0.0
networkx                 2.8.8
packaging                23.1
parsedatetime            2.4
pathspec                 0.10.3
pip                      22.0.4
platformdirs             3.8.0
pluggy                   1.2.0
protobuf                 4.23.3
psycopg2-binary          2.9.6
py                       1.11.0
pycarlo                  0.7.1
pycparser                2.21
pydantic                 2.0.2
pydantic_core            2.1.2
Pygments                 2.15.1
pyrsistent               0.19.3
pytest                   7.4.0
python-box               7.0.1
python-dateutil          2.8.2
python-slugify           8.0.1
pytimeparse              1.1.8
pytz                     2023.3
PyYAML                   6.0
redshift-connector       2.0.912
regex                    2023.6.3
requests                 2.31.0
retry                    0.9.2
rich                     13.4.2
ruamel.yaml              0.17.32
ruamel.yaml.clib         0.2.7
s3transfer               0.6.1
scramp                   1.4.4
setuptools               58.1.0
sgqlc                    14.1
shandy-sqlfmt            0.19.0
six                      1.16.0
soupsieve                2.4.1
sqlfluff                 2.1.2
sqlfluff-templater-dbt   2.1.2
sqlparse                 0.4.3
stringcase               1.2.0
tabulate                 0.9.0
tblib                    2.0.0
text-unidecode           1.3
toml                     0.10.2
tomli                    2.0.1
tqdm                     4.65.0
typing_extensions        4.7.1
typing-inspect           0.9.0
urllib3                  1.26.16
Werkzeug                 2.1.2
pip packages (regression)
Package                  Version
------------------------ --------
agate                    1.7.0
annotated-types          0.5.0
appdirs                  1.4.4
asn1crypto               1.5.1
attrs                    23.1.0
Babel                    2.12.1
beautifulsoup4           4.12.2
betterproto              1.2.5
black                    23.3.0
boto3                    1.26.165
botocore                 1.29.165
certifi                  2023.5.7
cffi                     1.15.1
chardet                  5.1.0
charset-normalizer       3.1.0
click                    8.1.3
click-config-file        0.6.0
colorama                 0.4.6
configobj                5.0.8
dataclasses-json         0.5.4
dbt-core                 1.5.2
dbt-extractor            0.4.1
dbt-postgres             1.5.2
dbt-redshift             1.5.0
decorator                5.1.1
diff-cover               7.6.0
exceptiongroup           1.1.2
future                   0.18.3
graphql-core             3.2.3
grpclib                  0.4.5
gunicorn                 20.1.0
h2                       4.1.0
hologram                 0.0.16
hpack                    4.0.0
hyperframe               6.0.1
idna                     3.4
iniconfig                2.0.0
isodate                  0.6.1
Jinja2                   3.1.2
jinja2-simple-tags       0.5.0
jmespath                 1.0.1
jsonschema               4.17.3
leather                  0.3.4
Logbook                  1.5.3
lxml                     4.9.3
markdown-it-py           3.0.0
MarkupSafe               2.0.1
marshmallow              3.19.0
marshmallow-enum         1.5.1
mashumaro                3.6
mdurl                    0.1.2
minimal-snowplow-tracker 0.0.2
montecarlodata           0.40.2
msgpack                  1.0.5
multidict                6.0.4
mypy-extensions          1.0.0
networkx                 2.8.8
packaging                23.1
parsedatetime            2.4
pathspec                 0.10.3
pip                      22.0.4
platformdirs             3.8.0
pluggy                   1.2.0
protobuf                 4.23.3
psycopg2-binary          2.9.6
py                       1.11.0
pycarlo                  0.7.1
pycparser                2.21
pydantic                 2.0.2
pydantic_core            2.1.2
Pygments                 2.15.1
pyrsistent               0.19.3
pytest                   7.4.0
python-box               7.0.1
python-dateutil          2.8.2
python-slugify           8.0.1
pytimeparse              1.1.8
pytz                     2023.3
PyYAML                   6.0
redshift-connector       2.0.912
regex                    2023.6.3
requests                 2.31.0
retry                    0.9.2
rich                     13.4.2
ruamel.yaml              0.17.32
ruamel.yaml.clib         0.2.7
s3transfer               0.6.1
scramp                   1.4.4
setuptools               58.1.0
sgqlc                    14.1
shandy-sqlfmt            0.19.0
six                      1.16.0
soupsieve                2.4.1
sqlfluff                 2.1.2
sqlfluff-templater-dbt   2.1.2
sqlparse                 0.4.3
stringcase               1.2.0
tabulate                 0.9.0
tblib                    2.0.0
text-unidecode           1.3
toml                     0.10.2
tomli                    2.0.1
tqdm                     4.65.0
typing_extensions        4.7.1
typing-inspect           0.9.0
urllib3                  1.26.16
Werkzeug                 2.1.2

Additional Context

Tested in a venv.

@github-actions github-actions bot changed the title [Regression] communication error in 1.5 [ADAP-670] [Regression] communication error in 1.5 Jul 5, 2023
@fivetran-joemarkiewicz
Copy link

Just wanted to chime in mentioning that I am seeing this error as well and can confirm that downgrading to 1.4 results in no communication error.

@fabientra
Copy link

fabientra commented Jul 5, 2023

Hello,
I am seeing the same issue but without upgrading to 1.5. It was working perfectly fine on 1.5 until a few hours ago.
Taking a closer look to the packages installed I see that the only difference is a change of the redshift-connector from redshift-connector-2.0.911 to redshift-connector-2.0.912.

EDIT: I confirm that downgrading to redshift-connector-2.0.911 after installing dbt-redshift==1.5.5 fixed the issue

@dataders
Copy link
Contributor

dataders commented Jul 6, 2023

@Brooke-white, do you have any idea what might have changed in the latest patch that was released today?

EDIT: I confirm that downgrading to redshift-connector-2.0.911 after installing dbt-redshift==1.5.5 fixed the issue

@david-beallor
Copy link

Hello, I am seeing the same issue but without upgrading to 1.5. It was working perfectly fine on 1.5 until a few hours ago. Taking a closer look to the packages installed I see that the only difference is a change of the redshift-connector from redshift-connector-2.0.911 to redshift-connector-2.0.912.

EDIT: I confirm that downgrading to redshift-connector-2.0.911 after installing dbt-redshift==1.5.5 fixed the issue

Confirming that downgrading to redshift-connector-2.0.911 worked for my team too

@tanmay-kulkarni
Copy link

This issue also exists in dbt-redshift 1.5.6. On running dbt debug, I get the following error.
image

@jan-benisek
Copy link

The exact same issue on 1.5.5 when running dbt compile, just today. I'll try to follow the advice here and downgrade the package.

@Brooke-white
Copy link

Hi, if anyone experiencing this issue could enable redshift-connector's logging and provide the logs it'd be a great help in the redshift-connector team's investigation of this issue. We're working to try and reproduce on our end and will cut a release as soon as a fix is identified. Thanks!

@Datarogie
Copy link

Datarogie commented Jul 6, 2023

The issue also exists on dbt-core 1.5.2 with dbt-redshift 1.5.6 which appears to install redshift-connector==2.0.912. Confirmed setting redshift-connector to 2.0.911 like others above have mentioned works to resolve this issue.

Does seem to hang a lot long at the initial connection after starting a dbt task but once it starts running it works.

@mikealfare
Copy link
Contributor

@Brooke-white Here's what we saw in our CI:

click to expand
self = <redshift_connector.core.Connection object at 0x7f18869284f0>
user = '***', password = '***', database = '***'
host = '***', port = ***
source_address = None, unix_sock = None, ssl = True, sslmode = 'verify-ca'
timeout = None, max_prepared_statements = 1000, tcp_keepalive = True
application_name = 'dbt.adapters.base.connections', replication = None
client_protocol_version = 2, database_metadata_current_db_only = True
credentials_provider = None, provider_name = None, web_identity_token = None
numeric_to_float = False

    def __init__(
        self: "Connection",
        user: str,
        password: str,
        database: str,
        host: str = "localhost",
        port: int = ***,
        source_address: typing.Optional[str] = None,
        unix_sock: typing.Optional[str] = None,
        ssl: bool = True,
        sslmode: str = "verify-ca",
        timeout: typing.Optional[int] = None,
        max_prepared_statements: int = 1000,
        tcp_keepalive: typing.Optional[bool] = True,
        application_name: typing.Optional[str] = None,
        replication: typing.Optional[str] = None,
        client_protocol_version: int = DEFAULT_PROTOCOL_VERSION,
        database_metadata_current_db_only: bool = True,
        credentials_provider: typing.Optional[str] = None,
        provider_name: typing.Optional[str] = None,
        web_identity_token: typing.Optional[str] = None,
        numeric_to_float: bool = False,
    ):
        """
        Creates a :class:`Connection` to an Amazon Redshift cluster. For more information on establishing a connection to an Amazon Redshift cluster using `federated API access <https://aws.amazon.com/blogs/big-data/federated-api-access-to-amazon-redshift-using-an-amazon-redshift-connector-for-python/>`_ see our examples page.
        This is the underlying :class:`Connection` constructor called from :func:`redshift_connector.connect`.
    
        Parameters
        ----------
        user : str
            The username to use for authentication with the Amazon Redshift cluster.
        password : str
            The password to use for authentication with the Amazon Redshift cluster.
        database : str
            The name of the database instance to connect to.
        host : str
            The hostname of the Amazon Redshift cluster.
        port : int
            The port number of the Amazon Redshift cluster. Default value is ***.
        source_address : Optional[str]
        unix_sock : Optional[str]
        ssl : bool
            Is SSL enabled. Default value is ``True``. SSL must be enabled when authenticating using IAM.
        sslmode : str
            The security of the connection to the Amazon Redshift cluster. 'verify-ca' and 'verify-full' are supported.
        timeout : Optional[int]
            The number of seconds before the connection to the server will timeout. By default there is no timeout.
        max_prepared_statements : int
        tcp_keepalive : Optional[bool]
            Is `TCP keepalive <https://en.wikipedia.org/wiki/Keepalive#TCP_keepalive>`_ used. The default value is ``True``.
        application_name : Optional[str]
            Sets the application name. The default value is None.
        replication : Optional[str]
            Used to run in `streaming replication mode <https://www.postgresql.org/docs/12/protocol-replication.html>`_.
        client_protocol_version : int
            The requested server protocol version. The default value is 1 representing `EXTENDED_RESULT_METADATA`. If the requested server protocol cannot be satisfied, a warning will be displayed to the user.
        database_metadata_current_db_only : bool
            Is `datashare <https://docs.aws.amazon.com/redshift/latest/dg/datashare-overview.html>`_ disabled. Default value is True, implying datasharing will not be used.
        credentials_provider : Optional[str]
            The class-path of the IdP plugin used for authentication with Amazon Redshift.
        provider_name : Optional[str]
            The name of the Redshift Native Auth Provider.
        web_identity_token: Optional[str]
            A web identity token used for authentication via Redshift Native IDP Integration
        numeric_to_float: bool
            Spe***fies if NUMERIC datatype values will be converted from ``de***mal.De***mal`` to ``float``. By default NUMERIC values are received as ``de***mal.De***mal``.
        """
        self.merge_socket_read = True
    
        _client_encoding = "utf8"
        self._commands_with_count: typing.Tuple[bytes, ...] = (
            b"INSERT",
            b"DELETE",
            b"UPDATE",
            b"MOVE",
            b"FETCH",
            b"COPY",
            b"SELECT",
        )
        self.notifications: deque = deque(maxlen=100)
        self.notices: deque = deque(maxlen=100)
        self.parameter_statuses: deque = deque(maxlen=100)
        self.max_prepared_statements: int = int(max_prepared_statements)
        self._run_cursor: Cursor = Cursor(self, paramstyle=DbApiParamstyle.NAMED.value)
        self._client_protocol_version: int = client_protocol_version
        self._database = database
        self.py_types = deepcopy(PY_TYPES)
        self.redshift_types = deepcopy(REDSHIFT_TYPES)
        self._database_metadata_current_db_only: bool = database_metadata_current_db_only
        self.numeric_to_float: bool = numeric_to_float
    
        # based on _client_protocol_version value, we must use different conversion functions
        # for receiving some datatypes
        self._enable_protocol_based_conversion_funcs()
    
        self.web_identity_token = web_identity_token
    
        if user is None:
            raise InterfaceError("The 'user' connection parameter cannot be None")
    
        redshift_native_auth: bool = False
    
        if application_name is None or application_name == "":
    
            def get_calling_module() -> str:
                import inspect
    
                module_name: str = ""
                stack: typing.List[inspect.FrameInfo] = inspect.stack()
                try:
                    # get_calling_module  -> init -> connect -> init -> calling module
                    start: int = min(4, len(stack) - 1)
                    parent = stack[start][0]
                    calling_module = inspect.getmodule(parent)
    
                    if calling_module:
                        module_name = calling_module.__name__
                except:
                    pass
                finally:
                    del parent
                    del stack
    
                return module_name
    
            application_name = get_calling_module()
        init_params: typing.Dict[str, typing.Optional[typing.Union[str, bytes]]] = {
            "user": "",
            "database": database,
            "application_name": application_name,
            "replication": replication,
            "client_protocol_version": str(self._client_protocol_version),
            "driver_version": DriverInfo.driver_full_name(),
            "os_version": self.client_os_version,
        }
    
        if credentials_provider:
            init_params["plugin_name"] = credentials_provider
    
            if credentials_provider.split(".")[-1] in (
                "BasicJwtCredentialsProvider",
                "BrowserAzureOAuth2CredentialsProvider",
            ):
                redshift_native_auth = True
                init_params["idp_type"] = "AzureAD"
    
                if provider_name:
                    init_params["provider_name"] = provider_name
    
        if not redshift_native_auth or user:
            init_params["user"] = user
    
        _logger.debug(make_divider_block())
        _logger.debug("Establishing a connection")
        _logger.debug(init_params)
        _logger.debug(make_divider_block())
    
        for k, v in tuple(init_params.items()):
            if isinstance(v, str):
                init_params[k] = v.encode("utf8")
            elif v is None:
                del init_params[k]
            elif not isinstance(v, (bytes, bytearray)):
                raise InterfaceError("The parameter " + k + " can't be of type " + str(type(v)) + ".")
    
        if "user" in init_params:
            self.user: bytes = typing.cast(bytes, init_params["user"])
        else:
            self.user = b""
    
        if isinstance(password, str):
            self.password: bytes = password.encode("utf8")
        else:
            self.password = password
    
        self.autocommit: bool = False
        self._xid = None
    
        self._caches: typing.Dict = {}
    
        # Create the TCP/Ip socket and connect to spe***fic database
        # if there already has a socket, it will not create new connection when run connect again
        try:
            if unix_sock is None and host is not None:
                self._usock: typing.Union[socket.socket, "SSLSocket"] = socket.socket(
                    socket.AF_INET, socket.SOCK_STREAM
                )
                if source_address is not None:
                    self._usock.bind((source_address, 0))
            elif unix_sock is not None:
                if not hasattr(socket, "AF_UNIX"):
                    raise InterfaceError("attempt to connect to unix socket on unsupported " "platform")
                self._usock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
            else:
                raise ProgrammingError("one of host or unix_sock must be provided")
            if timeout is not None:
                self._usock.settimeout(timeout)
    
            if unix_sock is None and host is not None:
>               hostport: typing.Tuple[str, int] = Connection.__get_host_address_info(host, port)

/home/runner/work/dbt-redshift/dbt-redshift/.tox/integration-redshift/lib/python3.10/site-packages/redshift_connector/core.py:610: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
/home/runner/work/dbt-redshift/dbt-redshift/.tox/integration-redshift/lib/python3.10/site-packages/redshift_connector/core.py:403: in __get_host_address_info
    response = socket.getaddrinfo(host=host, port=port, family=socket.AF_INET)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

host = '***', port = ***
family = <AddressFamily.AF_INET: 2>, type = 0, proto = 0, flags = 0

    def getaddrinfo(host, port, family=0, type=0, proto=0, flags=0):
        """Resolve host and port into list of address info entries.
    
        Translate the host/port argument into a sequence of 5-tuples that contain
        all the necessary arguments for creating a socket connected to that service.
        host is a domain name, a string representation of an IPv4/v6 address or
        None. port is a string service name such as 'http', a numeric port number or
        None. By passing None as the value of host and port, you can pass NULL to
        the underlying C API.
    
        The family, type and proto arguments can be optionally spe***fied in order to
        narrow the list of addresses returned. Passing zero as a value for each of
        these arguments selects the full range of results.
        """
        # We override this function since we want to translate the numeric family
        # and socket type values to enum constants.
        addrlist = []
>       for res in _socket.getaddrinfo(host, port, family, type, proto, flags):
E       OSError: Int or String expected

/opt/hostedtoolcache/Python/3.10.12/x64/lib/python3.10/socket.py:955: OSError

During handling of the above exception, another exception occurred:

cls = <class 'dbt.adapters.redshift.connections.RedshiftConnectionManager'>
connection = Connection(type='redshift', name='_test', state=<ConnectionState.CLOSED: 'closed'>, transaction_open=False, _handle=No...lse, connect_timeout=None, role=None, sslmode=<UserSSLMode.prefer: 'prefer'>, retries=6, region=None, autocommit=True))
connect = <function RedshiftConnectMethodFactory.get_connect_method.<locals>.connect at 0x7f1885e49240>
logger = AdapterLogger(name='Redshift')
retryable_exceptions = [<class 'redshift_connector.error.OperationalError'>, <class 'redshift_connector.error.DatabaseError'>, <class 'redshift_connector.error.DataError'>]
retry_limit = 6
retry_timeout = <function RedshiftConnectionManager.open.<locals>.exponential_backoff at 0x7f1885e49000>
_attempts = 0

    @classmethod
    def retry_connection(
        cls,
        connection: Connection,
        connect: Callable[[], AdapterHandle],
        logger: AdapterLogger,
        retryable_exceptions: Iterable[Type[Exception]],
        retry_limit: int = 1,
        retry_timeout: Union[Callable[[int], SleepTime], SleepTime] = 1,
        _attempts: int = 0,
    ) -> Connection:
        """Given a Connection, set its handle by calling connect.
    
        The calls to connect will be retried up to retry_limit times to deal with transient
        connection errors. By default, one retry will be attempted if retryable_exceptions is set.
    
        :param Connection connection: An instance of a Connection that needs a handle to be set,
            usually when attempting to open it.
        :param connect: A callable that returns the appropiate connection handle for a
            given adapter. This callable will be retried retry_limit times if a subclass of any
            Exception in retryable_exceptions is raised by connect.
        :type connect: Callable[[], AdapterHandle]
        :param AdapterLogger logger: A logger to emit messages on retry attempts or errors. When
            handling expected errors, we call debug, and call warning on unexpected errors or when
            all retry attempts have been exhausted.
        :param retryable_exceptions: An iterable of exception classes that if raised by
            connect should trigger a retry.
        :type retryable_exceptions: Iterable[Type[Exception]]
        :param int retry_limit: How many times to retry the call to connect. If this limit
            is exceeded before a successful call, a FailedToConnectError will be raised.
            Must be non-negative.
        :param retry_timeout: Time to wait between attempts to connect. Can also take a
            Callable that takes the number of attempts so far, beginning at 0, and returns an int
            or float to be passed to time.sleep.
        :type retry_timeout: Union[Callable[[int], SleepTime], SleepTime] = 1
        :param int _attempts: Parameter used to keep track of the number of attempts in calling the
            connect function across recursive calls. Passed as an argument to retry_timeout if it
            is a Callable. This parameter should not be set by the initial caller.
        :raises dbt.exceptions.FailedToConnectError: Upon exhausting all retry attempts without
            successfully acquiring a handle.
        :return: The given connection with its appropriate state and handle attributes set
            depending on whether we successfully acquired a handle or not.
        """
        timeout = retry_timeout(_attempts) if callable(retry_timeout) else retry_timeout
        if timeout < 0:
            raise dbt.exceptions.FailedToConnectError(
                "retry_timeout cannot be negative or return a negative time."
            )
    
        if retry_limit < 0 or retry_limit > sys.getrecursionlimit():
            # This guard is not perfect others may add to the recursion limit (e.g. built-ins).
            connection.handle = None
            connection.state = ConnectionState.FAIL
            raise dbt.exceptions.FailedToConnectError("retry_limit cannot be negative")
    
        try:
>           connection.handle = connect()

/home/runner/work/dbt-redshift/dbt-redshift/.tox/integration-redshift/lib/python3.10/site-packages/dbt/adapters/base/connections.py:241: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
/home/runner/work/dbt-redshift/dbt-redshift/dbt/adapters/redshift/connections.py:204: in connect
    c = redshift_connector.connect(
/home/runner/work/dbt-redshift/dbt-redshift/.tox/integration-redshift/lib/python3.10/site-packages/redshift_connector/__init__.py:344: in connect
    return Connection(
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

self = <redshift_connector.core.Connection object at 0x7f18869284f0>
user = '***', password = '***', database = '***'
host = '***', port = ***
source_address = None, unix_sock = None, ssl = True, sslmode = 'verify-ca'
timeout = None, max_prepared_statements = 1000, tcp_keepalive = True
application_name = 'dbt.adapters.base.connections', replication = None
client_protocol_version = 2, database_metadata_current_db_only = True
credentials_provider = None, provider_name = None, web_identity_token = None
numeric_to_float = False

    def __init__(
        self: "Connection",
        user: str,
        password: str,
        database: str,
        host: str = "localhost",
        port: int = ***,
        source_address: typing.Optional[str] = None,
        unix_sock: typing.Optional[str] = None,
        ssl: bool = True,
        sslmode: str = "verify-ca",
        timeout: typing.Optional[int] = None,
        max_prepared_statements: int = 1000,
        tcp_keepalive: typing.Optional[bool] = True,
        application_name: typing.Optional[str] = None,
        replication: typing.Optional[str] = None,
        client_protocol_version: int = DEFAULT_PROTOCOL_VERSION,
        database_metadata_current_db_only: bool = True,
        credentials_provider: typing.Optional[str] = None,
        provider_name: typing.Optional[str] = None,
        web_identity_token: typing.Optional[str] = None,
        numeric_to_float: bool = False,
    ):
        """
        Creates a :class:`Connection` to an Amazon Redshift cluster. For more information on establishing a connection to an Amazon Redshift cluster using `federated API access <https://aws.amazon.com/blogs/big-data/federated-api-access-to-amazon-redshift-using-an-amazon-redshift-connector-for-python/>`_ see our examples page.
        This is the underlying :class:`Connection` constructor called from :func:`redshift_connector.connect`.
    
        Parameters
        ----------
        user : str
            The username to use for authentication with the Amazon Redshift cluster.
        password : str
            The password to use for authentication with the Amazon Redshift cluster.
        database : str
            The name of the database instance to connect to.
        host : str
            The hostname of the Amazon Redshift cluster.
        port : int
            The port number of the Amazon Redshift cluster. Default value is ***.
        source_address : Optional[str]
        unix_sock : Optional[str]
        ssl : bool
            Is SSL enabled. Default value is ``True``. SSL must be enabled when authenticating using IAM.
        sslmode : str
            The security of the connection to the Amazon Redshift cluster. 'verify-ca' and 'verify-full' are supported.
        timeout : Optional[int]
            The number of seconds before the connection to the server will timeout. By default there is no timeout.
        max_prepared_statements : int
        tcp_keepalive : Optional[bool]
            Is `TCP keepalive <https://en.wikipedia.org/wiki/Keepalive#TCP_keepalive>`_ used. The default value is ``True``.
        application_name : Optional[str]
            Sets the application name. The default value is None.
        replication : Optional[str]
            Used to run in `streaming replication mode <https://www.postgresql.org/docs/12/protocol-replication.html>`_.
        client_protocol_version : int
            The requested server protocol version. The default value is 1 representing `EXTENDED_RESULT_METADATA`. If the requested server protocol cannot be satisfied, a warning will be displayed to the user.
        database_metadata_current_db_only : bool
            Is `datashare <https://docs.aws.amazon.com/redshift/latest/dg/datashare-overview.html>`_ disabled. Default value is True, implying datasharing will not be used.
        credentials_provider : Optional[str]
            The class-path of the IdP plugin used for authentication with Amazon Redshift.
        provider_name : Optional[str]
            The name of the Redshift Native Auth Provider.
        web_identity_token: Optional[str]
            A web identity token used for authentication via Redshift Native IDP Integration
        numeric_to_float: bool
            Spe***fies if NUMERIC datatype values will be converted from ``de***mal.De***mal`` to ``float``. By default NUMERIC values are received as ``de***mal.De***mal``.
        """
        self.merge_socket_read = True
    
        _client_encoding = "utf8"
        self._commands_with_count: typing.Tuple[bytes, ...] = (
            b"INSERT",
            b"DELETE",
            b"UPDATE",
            b"MOVE",
            b"FETCH",
            b"COPY",
            b"SELECT",
        )
        self.notifications: deque = deque(maxlen=100)
        self.notices: deque = deque(maxlen=100)
        self.parameter_statuses: deque = deque(maxlen=100)
        self.max_prepared_statements: int = int(max_prepared_statements)
        self._run_cursor: Cursor = Cursor(self, paramstyle=DbApiParamstyle.NAMED.value)
        self._client_protocol_version: int = client_protocol_version
        self._database = database
        self.py_types = deepcopy(PY_TYPES)
        self.redshift_types = deepcopy(REDSHIFT_TYPES)
        self._database_metadata_current_db_only: bool = database_metadata_current_db_only
        self.numeric_to_float: bool = numeric_to_float
    
        # based on _client_protocol_version value, we must use different conversion functions
        # for receiving some datatypes
        self._enable_protocol_based_conversion_funcs()
    
        self.web_identity_token = web_identity_token
    
        if user is None:
            raise InterfaceError("The 'user' connection parameter cannot be None")
    
        redshift_native_auth: bool = False
    
        if application_name is None or application_name == "":
    
            def get_calling_module() -> str:
                import inspect
    
                module_name: str = ""
                stack: typing.List[inspect.FrameInfo] = inspect.stack()
                try:
                    # get_calling_module  -> init -> connect -> init -> calling module
                    start: int = min(4, len(stack) - 1)
                    parent = stack[start][0]
                    calling_module = inspect.getmodule(parent)
    
                    if calling_module:
                        module_name = calling_module.__name__
                except:
                    pass
                finally:
                    del parent
                    del stack
    
                return module_name
    
            application_name = get_calling_module()
        init_params: typing.Dict[str, typing.Optional[typing.Union[str, bytes]]] = {
            "user": "",
            "database": database,
            "application_name": application_name,
            "replication": replication,
            "client_protocol_version": str(self._client_protocol_version),
            "driver_version": DriverInfo.driver_full_name(),
            "os_version": self.client_os_version,
        }
    
        if credentials_provider:
            init_params["plugin_name"] = credentials_provider
    
            if credentials_provider.split(".")[-1] in (
                "BasicJwtCredentialsProvider",
                "BrowserAzureOAuth2CredentialsProvider",
            ):
                redshift_native_auth = True
                init_params["idp_type"] = "AzureAD"
    
                if provider_name:
                    init_params["provider_name"] = provider_name
    
        if not redshift_native_auth or user:
            init_params["user"] = user
    
        _logger.debug(make_divider_block())
        _logger.debug("Establishing a connection")
        _logger.debug(init_params)
        _logger.debug(make_divider_block())
    
        for k, v in tuple(init_params.items()):
            if isinstance(v, str):
                init_params[k] = v.encode("utf8")
            elif v is None:
                del init_params[k]
            elif not isinstance(v, (bytes, bytearray)):
                raise InterfaceError("The parameter " + k + " can't be of type " + str(type(v)) + ".")
    
        if "user" in init_params:
            self.user: bytes = typing.cast(bytes, init_params["user"])
        else:
            self.user = b""
    
        if isinstance(password, str):
            self.password: bytes = password.encode("utf8")
        else:
            self.password = password
    
        self.autocommit: bool = False
        self._xid = None
    
        self._caches: typing.Dict = {}
    
        # Create the TCP/Ip socket and connect to spe***fic database
        # if there already has a socket, it will not create new connection when run connect again
        try:
            if unix_sock is None and host is not None:
                self._usock: typing.Union[socket.socket, "SSLSocket"] = socket.socket(
                    socket.AF_INET, socket.SOCK_STREAM
                )
                if source_address is not None:
                    self._usock.bind((source_address, 0))
            elif unix_sock is not None:
                if not hasattr(socket, "AF_UNIX"):
                    raise InterfaceError("attempt to connect to unix socket on unsupported " "platform")
                self._usock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
            else:
                raise ProgrammingError("one of host or unix_sock must be provided")
            if timeout is not None:
                self._usock.settimeout(timeout)
    
            if unix_sock is None and host is not None:
                hostport: typing.Tuple[str, int] = Connection.__get_host_address_info(host, port)
                _logger.debug(
                    "Attempting to create connection socket with address {} {}".format(hostport[0], str(hostport[1]))
                )
                self._usock.connect(hostport)
            elif unix_sock is not None:
                self._usock.connect(unix_sock)
    
            # For Redshift, we the default ssl approve is True
            # create ssl connection with Redshift CA certificates and check the hostname
            if ssl is True:
                try:
                    from ssl import CERT_REQUIRED, SSLContext
    
                    # ssl_context = ssl.create_default_context()
    
                    path = os.path.abspath(__file__)
                    if os.name == "nt":
                        path = "\\".join(path.split("\\")[:-1]) + "\\files\\redshift-ca-bundle.crt"
                    else:
                        path = "/".join(path.split("/")[:-1]) + "/files/redshift-ca-bundle.crt"
    
                    ssl_context: SSLContext = SSLContext()
                    ssl_context.verify_mode = CERT_REQUIRED
                    ssl_context.load_default_certs()
                    ssl_context.load_verify_locations(path)
    
                    # Int32(8) - Message length, including self.
                    # Int32(80877103) - The SSL request code.
                    self._usock.sendall(ii_pack(8, 80877103))
                    resp: bytes = self._usock.recv(1)
                    if resp != b"S":
                        _logger.debug(
                            "Server response code when attempting to establish ssl connection: {!r}".format(resp)
                        )
                        raise InterfaceError("Server refuses SSL")
    
                    if sslmode == "verify-ca":
                        self._usock = ssl_context.wrap_socket(self._usock)
                    elif sslmode == "verify-full":
                        ssl_context.check_hostname = True
                        self._usock = ssl_context.wrap_socket(self._usock, server_hostname=host)
    
                except ImportError:
                    raise InterfaceError("SSL required but ssl module not available in " "this python installation")
    
            self._sock: typing.Optional[typing.BinaryIO] = self._usock.makefile(mode="rwb")
            if tcp_keepalive:
                self._usock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
        except socket.error as e:
            self._usock.close()
>           raise InterfaceError("communication error", e)
E           redshift_connector.error.InterfaceError: ('communication error', OSError('Int or String expected'))

/home/runner/work/dbt-redshift/dbt-redshift/.tox/integration-redshift/lib/python3.10/site-packages/redshift_connector/core.py:661: InterfaceError

@Brooke-white
Copy link

Thanks so much, that was extremely helpful :)

I reviewed the trace and was able to reproduce the issue The root cause is this line in DBT. I see the type of port passed to redshift-connector is dbt.type_helpers.Port rather than int. A simple fix is to change the line to apply a int case to self.credentials.port:

            "port": int(self.credentials.port) if self.credentials.port else 5439,

@Brooke-white
Copy link

for those interested, here is my repro:

from dbt.helper_types import Port
import redshift_connector

with redshift_connector.connect(
    user='***',
    host='***',
    password='***',
    database='***',
    port = Port(5439),
) as conn:
      with conn.cursor() as cursor:
        cursor.execute("select 1")
        print(cursor.fetchone())

here's my verification of fix:

from dbt.helper_types import Port
import redshift_connector

with redshift_connector.connect(
    user='***',
    host='***',
    password='***',
    database='***',
    port = int(Port(5439)),
) as conn:
      with conn.cursor() as cursor:
        cursor.execute("select 1")
        print(cursor.fetchone())

happy to open a PR if you folks are ok with this fix.

@dataders
Copy link
Contributor

dataders commented Jul 6, 2023

The root cause is this line in DBT.

if you'll humor me being overly pedantic, this isn't the "root" cause per se. The root cause would be where in the changes issues in 2.0.912 that caused a breaking change in dbt. This code was previously working for months.

silly example:

if I put a 60v battery into a 20v cordless drill and it shorts the drills motor, it would be incorrect to point to the motor as root cause, as the root cause is the battery.

@Brooke-white
Copy link

While passing a dbt.helper_types.Port to the connect method's port parameter worked prior to 2.0.912, it isn't something redshift-connector supports per the docs.

@falonso-alo
Copy link
Author

At the end of the day, we don't really need to assign blame anywhere. It's just important that we solve the problem.

Thank you, @Brooke-white!

@mikealfare
Copy link
Contributor

Agreed with @falonso-alo, I don't think either side (I already don't like that connotation) should be assigning blame. This industry is toxic enough as it is and this work is hard enough as it is. With that in mind, I did some research to understand exactly what is going on and felt like sharing that information.

The initial confusion is coming from that fact that Port subclasses from int; in other words, isinstance(Port, int)==True. We are in fact passing an int, we're just not passing exactly an int.

In version v2.0.911, this line was used to connect via host/port:
https://github.com/aws/amazon-redshift-python-driver/blob/2898d86de9bb6be1e3def88e15c78c9ea767ec52/redshift_connector/core.py#L596

There is no check there to see whether port is a proper int. As long as port can behave like an int, everyone's happy.

This line was updated in v2.0.912 to:
https://github.com/aws/amazon-redshift-python-driver/blob/b2dde82ec9156e2adcc801ac54c051f3cfe61e33/redshift_connector/core.py#L610-L614

with reference to this new function:
https://github.com/aws/amazon-redshift-python-driver/blob/b2dde82ec9156e2adcc801ac54c051f3cfe61e33/redshift_connector/core.py#L397-L409

The new function calls the builtin socket, which raises the OSError(Int or String expected) exception that we're seeing. This library must be looking for exactly an int. I stopped digging at that point as socket is implemented in C, making it a bit more difficult for me to investigate. While I didn't see the check, I'm willing to assume that there is a check for int in there.

Putting all of that aside, I have to imagine we're not the only group that is passing a subclass of int (or str for that matter) into the Connection class and expecting it to work (given the argument above). It might not be the primary usage pattern, but until v2.0.912 it appeared to be a valid one. And I'm sure you can appreciate the nuance that comes with sitting on top of code that you can't easily alter (such as socket in this scenario). So while we aren't passing a proper int into Connection, it might be useful for those downstream (both us and other teams) to update this line:

https://github.com/aws/amazon-redshift-python-driver/blob/b2dde82ec9156e2adcc801ac54c051f3cfe61e33/redshift_connector/core.py#L403

to this:

response = socket.getaddrinfo(host=host, port=int(port), family=socket.AF_INET)

Such is life with dynamic typing in python.

@jan-benisek
Copy link

Hi, seems like the error is back. I am on dbt-core=1.5.1 and dbt-redshift=1.5.7.

06:02:48  Encountered an error while running operation: Database Error

  ('communication error', OSError('Int or String expected'))

script returned exit code 1

I see in my docker build that redshift-connector-2.0.913 is installed. That version was released 7hrs ago.

@dataders
Copy link
Contributor

@jan-benisek shoot! I've opened #531 to fix this once we're working on a 1.5 patch release to fix this as we speak. I'll reply back here when the patch is out, my estimate is within the next four hours, but certainly in the next 24 hours.

@Datarogie
Copy link

Is it normal to have a stable release that doesn't include dependencies locked in? If that's even an options. 😅
Please excuse my lack of knowledge in app/package development, just generally curious as it'd be great to be able to trust a stable version means it AND the dependancies all work together and new versions of the current package or dependencies won't be integrated together until testing to confirm they're stable.
However I can also see how this could be difficult to manage.

@dataders
Copy link
Contributor

dbt-redshift 1.5.8 has now been released. I cannot stress enough how:

  1. how sorry we are to have had you all suffer a degraded reliability of the dbt-redshift adapter, and
  2. grateful we are for your patience and assistance!

@jan-benisek
Copy link

@dataders Thanks a lot for such a quick fix, all 🟢 ! And don't worry, we all have been there.
Looking at your fixes, the python driver seems like not the easiest piece of software to work with.

@mustafa0taru
Copy link

hi @dataders, i'm currently facing this blocker. do i need to downgrade the dbt-redshift adapter?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
type:bug Something isn't working type:regression
Projects
None yet