-
Notifications
You must be signed in to change notification settings - Fork 186
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature request "DataFrameClient" transfer from version 1.7 to 2 - Pandas dataframe not possible to ingest to influxdb 2 #79
Comments
@Sutyke thanks for feedback. I hope so that We would be able to implement it pretty soon. |
This would be extremely helpful! |
@lifeisawavesorideit you could follow progress at #88 |
We just merged support for ingesting a Pandas DataFrame into You could use it with something like: _now = pd.Timestamp().now('UTC')
_data_frame = pd.DataFrame(data=[["coyote_creek", 1.0], ["coyote_creek", 2.0]],
index=[now, now + timedelta(hours=1)],
columns=["location", "water_level"])
_write_client.write(bucket.name, record=data_frame, data_frame_measurement_name='h2o_feet',
data_frame_tag_columns=['location']) If you would like to test it then install client via:
and also a feedback is welcome. Regards |
Hello @bednar thank you! Does this write method support batching as usual? Thank you. |
Hi @Anaisdg, yeah it is supports batching. |
Hello, Thanks for this function, seems to work. This is the code I ran below to do the import. Do you need to specify batch size anywhere? How would I amend the code below to do that? Or do you simply set data_frame_object to be a big dataframe of 10^6 rows etc. and it will auto batch it? Re. multiprocessing - I saw that InfluxDB 2.0 would support it. This is fantastic - how do I do this with python, or would this function automatically support that? from influxdb_client import InfluxDBClient write_api.write(bucket=bucket, record=data_frame_object, data_frame_measurement_name='measurement_name', |
@Anaisdg, @bburden to clarifying how the batching works: the client read whole DataFrame and write data in batches into the InfluxDB. If we have a very big DataFrame than a performance will be limited by memory size... If we want to store a potentially unbound DataFrame we could improve our implementations with a streaming tweaks. Whats is expected size (rows, columns) of DataFrame? |
this example I am doing a small historical import 1.7 million rows 8 columns, nothing major. broadly, i have various flows and milliseconds matter. for (1) historical imports it is not so important but still good to establish best practice. is that best practice below? generally the file sizes will be bigger for historical imports. can i utilise multiprocessing somehow in the code below? so just to double check - I would amend the above code with something like this: live i will be importing minutely data and aspiring for 25ms from grabbing data from websockets to importing data into influxdb database. any tips on how to crank max performance out of influxdb for (1) big historical imports and (2) smaller live imports is appreciated! |
(1) self.client.write_api(
write_options=WriteOptions(write_type=WriteType.batching, batch_size=50_000, flush_interval=10_000)) and push data by separate processes: self.write_api.write(bucket="my-bucket", record=next_record) The most critical part is parsing data into LineProtocol. Our implementation of writing but everything depends on your data and there is not a one general approach. (2) write_api = self.client.write_api(write_options=SYNCHRONOUS)
records = "mem,host=host1 used_percent=23.43234543\n" \
"mem,host=host1 available_percent=15.856523"
write_api.write(bucket, org, records) See also: |
Would it be possible to next to, data_frame_tag_columns=tag_columns, also have a 'tags=' argument? This way a tag can be added to a DF which doesn't appear in the DF. For example, I have a DF with timestamp, open, high, low, close (etc) data. I would like to be able to add tags as ticker, exchange etc which don't appear in the DF. |
@cjelsa Yes, we could. Could you please create a new issue for that? Thanks for using our client. |
@bednar, following your reply to @bburden, I'm also trying to write a large amount of data directly from a DataFrame. The multiprocessing script you supplied (import_data_set_multiprocessing.py) processes each line of a csv file and converts it to a LineProtocol. As I understand, two amendments are needed here:
Is there any corresponding end-to-end documentary to support DataFrame ingestion in a multiprocessing environment? |
You could use - DataFrame.itertuples import rx
from rx import operators as ops
import pandas as pd
df = pd.read_csv("vix-daily.csv")
batches = rx \
.from_iterable(df.itertuples(index=False)) \
.pipe(ops.buffer_with_count(500))
batches.subscribe(on_next=lambda batch: print(f"my batch: {batch}"),
on_error=lambda ex: print(f'Unexpected error: {ex}'),
on_completed=lambda: print('Import finished!'))
The DataFrame.itertuples returns tuples. Your implementation just concats this tuples into LineProtocol.
No, but your code will be pretty same as import_data_set_multiprocessing.py, except:
From my POV the best approach will be:
|
Dear Great Maintainer,
I would like to request to transfer feature which is currently in Influxdb 1.7 to 2.0.
There are 2 proposals:
In the version 1.7, function DataFrameClient allows to insert the dataframe to idb.
@Anaisdg created the function below.
https://github.com/Anaisdg/Influx_Pandas
**`import pandas as pd
Thank you in advance for your help it will save a lot of time to your customers
Sutyke
The text was updated successfully, but these errors were encountered: