Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Streaming with query_row_block_stream crashes after few reads #399

Closed
keu opened this issue Sep 24, 2024 · 7 comments · Fixed by #400
Closed

Streaming with query_row_block_stream crashes after few reads #399

keu opened this issue Sep 24, 2024 · 7 comments · Fixed by #400
Labels
bug Something isn't working

Comments

@keu
Copy link

keu commented Sep 24, 2024

Describe the bug

The streaming read is crashing after some time if there any processing in between reads.

Traceback (most recent call last):
  File "/usr/lib/python3.11/http/client.py", line 573, in _get_chunk_left
    chunk_left = self._read_next_chunk_size()
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/http/client.py", line 540, in _read_next_chunk_size
    return int(line, 16)
           ^^^^^^^^^^^^^
ValueError: invalid literal for int() with base 16: b'\xe9\xe9\x80\xe8\xf8\x8c\x05I"\xe7\x1bh\x9e\x0c\xa6\x06\xe3\x92\xa3?\x7f6:e\xec\x9d\xa8\x01d\xd3\x118{\xe1\xe9\xe60ta\x11T\xeb\xd0\x1d\x91\xed\xbf\x8aT5\xc8\x00\xceN\x1bA{\xe4\xf9\xeb\xd0\xc5\xba\xb
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/usr/lib/python3.11/http/client.py", line 590, in _read_chunked
    chunk_left = self._get_chunk_left()
                 ^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/http/client.py", line 575, in _get_chunk_left
    raise IncompleteRead(b'')
http.client.IncompleteRead: IncompleteRead(0 bytes read)
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
  File "/progai/.venv/lib/python3.11/site-packages/urllib3/response.py", line 444, in _error_catcher
    yield
  File "/progai/.venv/lib/python3.11/site-packages/urllib3/response.py", line 567, in read
    data = self._fp_read(amt) if not fp_closed else b""
           ^^^^^^^^^^^^^^^^^^
  File "/progai/.venv/lib/python3.11/site-packages/urllib3/response.py", line 533, in _fp_read
    return self._fp.read(amt) if amt is not None else self._fp.read()
           ^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/http/client.py", line 467, in read
    return self._read_chunked(amt)
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/lib/python3.11/http/client.py", line 605, in _read_chunked
    raise IncompleteRead(b''.join(value)) from exc
http.client.IncompleteRead: IncompleteRead(134 bytes read)
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
  File "/progai/pipeline/airflow/dags/shared/female_names_processing.py", line 76, in execute
    for i, block in enumerate(stream):
  File "/progai/.venv/lib/python3.11/site-packages/clickhouse_connect/driver/common.py", line 201, in __next__
    return next(self.gen)
           ^^^^^^^^^^^^^^
  File "/progai/.venv/lib/python3.11/site-packages/clickhouse_connect/driver/query.py", line 296, in _row_block_stream
    for block in self._column_block_stream():
  File "/progai/.venv/lib/python3.11/site-packages/clickhouse_connect/driver/transform.py", line 75, in gen
    next_block = get_block()
                 ^^^^^^^^^^^
  File "/progai/.venv/lib/python3.11/site-packages/clickhouse_connect/driver/transform.py", line 50, in get_block
    column = col_type.read_column(source, num_rows, context)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/progai/.venv/lib/python3.11/site-packages/clickhouse_connect/datatypes/base.py", line 143, in read_column
    return self.read_column_data(source, num_rows, ctx)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/progai/.venv/lib/python3.11/site-packages/clickhouse_connect/datatypes/base.py", line 158, in read_column_data
    column = self._read_column_binary(source, num_rows, ctx)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/progai/.venv/lib/python3.11/site-packages/clickhouse_connect/datatypes/string.py", line 34, in _read_column_binary
    return source.read_str_col(num_rows, self._active_encoding(ctx))
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "clickhouse_connect/driverc/buffer.pyx", line 248, in clickhouse_connect.driverc.buffer.ResponseBuffer.read_str_col
  File "clickhouse_connect/driverc/buffer.pyx", line 134, in clickhouse_connect.driverc.buffer.ResponseBuffer._read_str_col
  File "clickhouse_connect/driverc/buffer.pyx", line 74, in clickhouse_connect.driverc.buffer.ResponseBuffer.read_bytes_c
  File "/progai/.venv/lib/python3.11/site-packages/clickhouse_connect/driver/httputil.py", line 200, in decompress
    chunk = response.read(chunk_size, decode_content=False)
            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/progai/.venv/lib/python3.11/site-packages/urllib3/response.py", line 566, in read
    with self._error_catcher():
  File "/usr/lib/python3.11/contextlib.py", line 158, in __exit__
    self.gen.throw(typ, value, traceback)
  File "/progai/.venv/lib/python3.11/site-packages/urllib3/response.py", line 461, in _error_catcher
    raise ProtocolError("Connection broken: %r" % e, e)
urllib3.exceptions.ProtocolError: ('Connection broken: IncompleteRead(134 bytes read)', IncompleteRead(134 bytes read))

Steps to reproduce

  1. start stream read
  2. consume iterator with some work/sleep
  3. soon it will crash with the error above

Expected behaviour

I expect it to read the whole dataset. If I disable processing, the dataset is read fine (40 mln records).
This leads me to think that it is not related to actual data and response but something inside the implementation.

Code example

import clickhouse_connect
read_client = clickhouse_connect.get_client(
            host=host, username=login, password=password, port=port,
            connect_timeout=60 * 30, send_receive_timeout=60 * 30, client_name="airflow-read",
            settings={
                "session_timeout": 60 * 20,
            }
        )
qry = 'SELECT id, name FROM dev.tbl WHERE NOT empty(name)'
with read_client.query_row_block_stream(qry) as stream:
    for block in stream:
        rows = list(process(block, dataset))

clickhouse-connect and/or ClickHouse server logs

Configuration

Environment

  • clickhouse-connect version: 0.7.19
  • Python version: 3.11.10
  • Operating system: Ubuntu 20.4

ClickHouse server

  • ClickHouse Server version: 24.7.2 revision 54468
  • ClickHouse Server non-default settings, if any:
  • CREATE TABLE statements for tables involved:
  • Sample data for these tables, use clickhouse-obfuscator if necessary
@keu keu added the bug Something isn't working label Sep 24, 2024
@genzgd
Copy link
Collaborator

genzgd commented Sep 24, 2024

It's crashing because your connection is breaking: 'Connection broken: IncompleteRead(134 bytes read)'. This could be caused by something like an idle timeout anywhere in your chain. How much time is processing taking per chunk?

@keu
Copy link
Author

keu commented Sep 24, 2024

The average processing time for a single block is ~10 seconds.
The connection breaks around 4-5 minute after the read started.
The server and the client are on the same machine.

@genzgd
Copy link
Collaborator

genzgd commented Sep 24, 2024

After digging into this I believe the problem is that ClickHouse server times out when pushing more data because the client has not read all of the data off the socket. When trying to reproduce this I get the following error in the ClickHouse logs:

DynamicQueryHandler: Code: 209. DB::Exception: Timeout exceeded while writing to socket ([::1]:63309, 30000 ms): While executing Native. (SOCKET_TIMEOUT)

The socket is still busy/full when ClickHouse tries to send the error:

DynamicQueryHandler: Cannot send exception to client: Code: 209. DB::NetException: Timeout exceeded while writing to socket ([::1]:63309, 30000 ms). (SOCKET_TIMEOUT)

The end result is that if reading data falls more than 30 seconds behind ClickHouse sending the data, ClickHouse will close the connection, causing the error you see.

There's not an easy fix directly in clickhouse-connect because the point of streaming is to avoid reading all of the data at once into memory -- but if your processing is falling behind, the data sent from ClickHouse has no place to go. So the short term answer to your problem is to only query as much data from ClickHouse as you can process without falling behind more than 30 seconds. Unfortunately the HTTP interface in ClickHouse is not "smart" to keep the connection open and stream the data as it is being consumed.

However, in the next release I'm looking at adding an intermediate buffer with a configurable size to temporarily store the HTTP data until requested by the stream processing. So if your total query size is something like 50MB, and the new intermediate buffer is configured at 100MB, you should not have this issue. But there will definitely be a tradeoff between using the additional memory and ensuring that your connection isn't closed while processing.

@keu
Copy link
Author

keu commented Sep 24, 2024

@genzgd thank you for the investigation. I understand it more precisely now.
So, the short-term fix for me would be something like this

client = clickhouse_connect.get_client(
   ...
   settings={"max_block_size": 30 / seconds_per_row}
)

@genzgd
Copy link
Collaborator

genzgd commented Sep 24, 2024

max_block_size isn't going to reduce the total amount of data you're consuming, and you'll still fall behind as ClickHouse tries to push data faster than you can process it. You either need faster processing or to read less data in the same amount of time by using queries that return less data.

@keu
Copy link
Author

keu commented Sep 24, 2024

I see.
Unfortunately, I don't see an easy way to make processing faster without going into buffering and multithreading, which IMO probably will cause more troubles in future.
So the only option I see for now is to save data to disk and then read from it.
AFAIK Clickhouse doesn't have an analog to Postgres server-side cursor...

@genzgd
Copy link
Collaborator

genzgd commented Sep 24, 2024

Unfortunately, no, there is no server side cursor. If you can break your query up into chunks based on the primary key, you could read each chunk into memory (using just the client query method instead of streaming), then process each chunk, and then get the next one. Some version of that is going to be necessary unless you do something to match your processing speed with the amount of data ClickHouse is sending.

@genzgd genzgd mentioned this issue Sep 26, 2024
2 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

2 participants