Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature request "DataFrameClient" transfer from version 1.7 to 2 - Pandas dataframe not possible to ingest to influxdb 2 #79

Closed
Sutyke opened this issue Apr 2, 2020 · 14 comments · Fixed by #88
Assignees
Labels
enhancement New feature or request
Milestone

Comments

@Sutyke
Copy link

Sutyke commented Apr 2, 2020

Dear Great Maintainer,

I would like to request to transfer feature which is currently in Influxdb 1.7 to 2.0.

There are 2 proposals:

  1. Transfer DataFrameClient from 1.7 to 2

In the version 1.7, function DataFrameClient allows to insert the dataframe to idb.

import pandas as pd
from influxdb import DataFrameClient
dbConnDF = DataFrameClient('localhost', '8086', 'd', 'password', 'securities')
df = pd.read_parquet('/home/d/fi/01_Data/01_raw_data/qd.parquet').set_index('date').sort_values()
%time dbConnDF.write_points(df, 'securities', tag_columns=['symbol'], protocol="json", batch_size=10000)
%time d = dbConnDF.query("select * from securities")
d['securities']

  1. Create some helping function to change dataframe to format readable for influxdb 2

@Anaisdg created the function below.

https://github.com/Anaisdg/Influx_Pandas
**`import pandas as pd

import time
from datetime import datetime

def lp(df,measurement,tag_key,field,value,datetime):
    lines= [str(df[measurement][d]) + "," 
            + str(tag_key) + "=" + str(df[tag_key][d]) 
            + " " + str(df[field][d]) + "=" + str(df[value][d]) 
            + " " + str(int(time.mktime(df[datetime][d].timetuple()))) + "000000000" for d in range(len(df))]
    return lines`**

Thank you in advance for your help it will save a lot of time to your customers

Sutyke

@bednar bednar added the enhancement New feature or request label Apr 2, 2020
@bednar bednar added this to the 1.7.0 milestone Apr 2, 2020
@bednar
Copy link
Contributor

bednar commented Apr 2, 2020

@Sutyke thanks for feedback. I hope so that We would be able to implement it pretty soon.

@lifeisawavesorideit
Copy link

This would be extremely helpful!

@bednar
Copy link
Contributor

bednar commented May 2, 2020

@lifeisawavesorideit you could follow progress at #88

@bednar
Copy link
Contributor

bednar commented May 4, 2020

Hi @lifeisawavesorideit,

We just merged support for ingesting a Pandas DataFrame into master branch.

You could use it with something like:

_now = pd.Timestamp().now('UTC')
_data_frame = pd.DataFrame(data=[["coyote_creek", 1.0], ["coyote_creek", 2.0]],
                           index=[now, now + timedelta(hours=1)],
                           columns=["location", "water_level"])

_write_client.write(bucket.name, record=data_frame, data_frame_measurement_name='h2o_feet',
                    data_frame_tag_columns=['location'])

If you would like to test it then install client via:

pip install git+https://github.com/influxdata/influxdb-client-python.git@master

and also a feedback is welcome.

Regards

@Anaisdg
Copy link

Anaisdg commented May 6, 2020

Hello @bednar thank you! Does this write method support batching as usual? Thank you.

@bednar
Copy link
Contributor

bednar commented May 7, 2020

Hi @Anaisdg, yeah it is supports batching.

@benb92
Copy link

benb92 commented May 7, 2020

Hello,

Thanks for this function, seems to work. This is the code I ran below to do the import. Do you need to specify batch size anywhere? How would I amend the code below to do that? Or do you simply set data_frame_object to be a big dataframe of 10^6 rows etc. and it will auto batch it?

Re. multiprocessing - I saw that InfluxDB 2.0 would support it. This is fantastic - how do I do this with python, or would this function automatically support that?

from influxdb_client import InfluxDBClient
bucket = "bucket_name"
org = "org_name"
token = "tokenID...."
client = InfluxDBClient(
url="http://localhost:9999",
token=token,
org=org
)
write_api = client.write_api()

write_api.write(bucket=bucket, record=data_frame_object, data_frame_measurement_name='measurement_name',
data_frame_tag_columns=tag_columns)

@bednar
Copy link
Contributor

bednar commented May 7, 2020

@Anaisdg, @bburden to clarifying how the batching works:

the client read whole DataFrame and write data in batches into the InfluxDB. If we have a very big DataFrame than a performance will be limited by memory size...

If we want to store a potentially unbound DataFrame we could improve our implementations with a streaming tweaks.

Whats is expected size (rows, columns) of DataFrame?

@benb92
Copy link

benb92 commented May 7, 2020

this example I am doing a small historical import 1.7 million rows 8 columns, nothing major. broadly, i have various flows and milliseconds matter. for (1) historical imports it is not so important but still good to establish best practice. is that best practice below? generally the file sizes will be bigger for historical imports. can i utilise multiprocessing somehow in the code below?

so just to double check - I would amend the above code with something like this:
write_api.write(bucket=bucket, record=data_frame_object, data_frame_measurement_name='measurement_name',
data_frame_tag_columns=tag_columns,
protocol="line", batch_size=10000)

live i will be importing minutely data and aspiring for 25ms from grabbing data from websockets to importing data into influxdb database. any tips on how to crank max performance out of influxdb for (1) big historical imports and (2) smaller live imports is appreciated!

@bednar
Copy link
Contributor

bednar commented May 7, 2020

(1)
The client is able to run in multiprocessing environment. Best approach for import large amount of data is create a singleton instance of write_api:

self.client.write_api(
            write_options=WriteOptions(write_type=WriteType.batching, batch_size=50_000, flush_interval=10_000))

and push data by separate processes:

self.write_api.write(bucket="my-bucket", record=next_record)

The most critical part is parsing data into LineProtocol. Our implementation of writing DataFrame is generic... so the best performance you achieve by directly creating a LineProtocol from your DataFrame row.

but everything depends on your data and there is not a one general approach.

(2)
It depends, but something like synchronous approach will perform well:

write_api = self.client.write_api(write_options=SYNCHRONOUS)
records = "mem,host=host1 used_percent=23.43234543\n" \
          "mem,host=host1 available_percent=15.856523"
write_api.write(bucket, org, records)

See also:
https://github.com/influxdata/influxdb-client-python/blob/master/examples/import_data_set_multiprocessing.py
https://github.com/influxdata/influxdb-client-python/blob/master/examples/import_data_set.py

@cjelsa
Copy link

cjelsa commented May 13, 2020

Would it be possible to next to, data_frame_tag_columns=tag_columns, also have a 'tags=' argument? This way a tag can be added to a DF which doesn't appear in the DF.

For example, I have a DF with timestamp, open, high, low, close (etc) data. I would like to be able to add tags as ticker, exchange etc which don't appear in the DF.

@bednar
Copy link
Contributor

bednar commented May 13, 2020

@cjelsa Yes, we could.

Could you please create a new issue for that?

Thanks for using our client.

@galgal770
Copy link

galgal770 commented Mar 9, 2021

@bednar, following your reply to @bburden, I'm also trying to write a large amount of data directly from a DataFrame. The multiprocessing script you supplied (import_data_set_multiprocessing.py) processes each line of a csv file and converts it to a LineProtocol. As I understand, two amendments are needed here:

and push data by separate processes

  1. How do I adjust the current process to read rows of a DataFrame (and not lines from a csv-file)? Where's the relevant endpoint? Any example?

The most critical part is parsing data into LineProtocol. Our implementation of writing DataFrame is generic... so the best performance you achieve by directly creating a LineProtocol from your DataFrame row.

  1. Any example? How do I do that? Where is the relevant endpoint to do the conversion?

Is there any corresponding end-to-end documentary to support DataFrame ingestion in a multiprocessing environment?

@bednar
Copy link
Contributor

bednar commented Mar 10, 2021

@galgal770

  1. How do I adjust the current process to read rows of a DataFrame (and not lines from a csv-file)? Where's the relevant endpoint? Any example?

You could use - DataFrame.itertuples

import rx
from rx import operators as ops
import pandas as pd

df = pd.read_csv("vix-daily.csv")

batches = rx \
    .from_iterable(df.itertuples(index=False)) \
    .pipe(ops.buffer_with_count(500))

batches.subscribe(on_next=lambda batch: print(f"my batch: {batch}"),
                  on_error=lambda ex: print(f'Unexpected error: {ex}'),
                  on_completed=lambda: print('Import finished!'))
  1. Any example? How do I do that? Where is the relevant endpoint to do the conversion?

The DataFrame.itertuples returns tuples. Your implementation just concats this tuples into LineProtocol.

Is there any corresponding end-to-end documentary to support DataFrame ingestion in a multiprocessing environment?

No, but your code will be pretty same as import_data_set_multiprocessing.py, except:

From my POV the best approach will be:

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging a pull request may close this issue.

8 participants