diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 604df8656..07a3be73b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -62,7 +62,7 @@ jobs: pip install -U pip wheel setuptools - name: Install npm dependencies - run: sudo npm install -g install jshint stylelint + run: sudo npm install -g jshint stylelint - name: Start InfluxDB container run: docker-compose up -d influxdb diff --git a/Dockerfile b/Dockerfile index 436180c3e..12aafb97d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,13 +1,16 @@ FROM python:3.9.19-slim-bullseye +# Install system dependencies RUN apt update && \ apt install --yes zlib1g-dev libjpeg-dev gdal-bin libproj-dev \ libgeos-dev libspatialite-dev libsqlite3-mod-spatialite \ sqlite3 libsqlite3-dev openssl libssl-dev fping && \ rm -rf /var/lib/apt/lists/* /root/.cache/pip/* /tmp/* +# Upgrade pip and install Python dependencies RUN pip install -U pip setuptools wheel +# Copy and install project dependencies COPY requirements-test.txt requirements.txt /opt/openwisp/ RUN pip install -r /opt/openwisp/requirements.txt && \ pip install -r /opt/openwisp/requirements-test.txt && \ @@ -17,6 +20,7 @@ ADD . /opt/openwisp RUN pip install -U /opt/openwisp && \ rm -rf /var/lib/apt/lists/* /root/.cache/pip/* /tmp/* WORKDIR /opt/openwisp/tests/ +# Set environment variables ENV NAME=openwisp-monitoring \ PYTHONBUFFERED=1 \ INFLUXDB_HOST=influxdb \ diff --git a/README.rst b/README.rst index 223c4b980..9e332053c 100644 --- a/README.rst +++ b/README.rst @@ -326,20 +326,23 @@ Follow the setup instructions of `openwisp-controller # Make sure you change them in production # You can select one of the backends located in openwisp_monitoring.db.backends - TIMESERIES_DATABASE = { + INFLUXDB_1x_DATABASE = { 'BACKEND': 'openwisp_monitoring.db.backends.influxdb', 'USER': 'openwisp', 'PASSWORD': 'openwisp', 'NAME': 'openwisp2', - 'HOST': 'localhost', + 'HOST': 'influxdb', 'PORT': '8086', - 'OPTIONS': { - # Specify additional options to be used while initializing - # database connection. - # Note: These options may differ based on the backend used. - 'udp_writes': True, - 'udp_port': 8089, - } + 'OPTIONS': {'udp_writes': False, 'udp_port': 8089}, + } + + INFLUXDB_2x_DATABASE = { + 'BACKEND': 'openwisp_monitoring.db.backends.influxdb2', + 'TOKEN': 'my-super-secret-auth-token', + 'ORG': 'openwisp', + 'BUCKET': 'openwisp2', + 'HOST': 'influxdb2', + 'PORT': '9999', } ``urls.py``: @@ -1413,56 +1416,109 @@ Settings | **default**: | see below | +--------------+-----------+ +Timeseries Database Configuration +--------------------------------- + +The ``TIMESERIES_DATABASE`` setting allows configuring the timeseries +database backend used by OpenWISP Monitoring. The configuration supports +both InfluxDB 1.x and 2.x versions. + +Configuration for InfluxDB 1.x +------------------------------ + .. code-block:: python - TIMESERIES_DATABASE = { + INFLUXDB_1x_DATABASE = { 'BACKEND': 'openwisp_monitoring.db.backends.influxdb', 'USER': 'openwisp', 'PASSWORD': 'openwisp', 'NAME': 'openwisp2', - 'HOST': 'localhost', + 'HOST': 'influxdb', 'PORT': '8086', - 'OPTIONS': { - 'udp_writes': False, - 'udp_port': 8089, - } + 'OPTIONS': {'udp_writes': False, 'udp_port': 8089}, } -The following table describes all keys available in ``TIMESERIES_DATABASE`` -setting: - -+---------------+--------------------------------------------------------------------------------------+ -| **Key** | ``Description`` | -+---------------+--------------------------------------------------------------------------------------+ -| ``BACKEND`` | The timeseries database backend to use. You can select one of the backends | -| | located in ``openwisp_monitoring.db.backends`` | -+---------------+--------------------------------------------------------------------------------------+ -| ``USER`` | User for logging into the timeseries database | -+---------------+--------------------------------------------------------------------------------------+ -| ``PASSWORD`` | Password of the timeseries database user | -+---------------+--------------------------------------------------------------------------------------+ -| ``NAME`` | Name of the timeseries database | -+---------------+--------------------------------------------------------------------------------------+ -| ``HOST`` | IP address/hostname of machine where the timeseries database is running | -+---------------+--------------------------------------------------------------------------------------+ -| ``PORT`` | Port for connecting to the timeseries database | -+---------------+--------------------------------------------------------------------------------------+ -| ``OPTIONS`` | These settings depends on the timeseries backend: | -| | | -| | +-----------------+----------------------------------------------------------------+ | -| | | ``udp_writes`` | Whether to use UDP for writing data to the timeseries database | | -| | +-----------------+----------------------------------------------------------------+ | -| | | ``udp_port`` | Timeseries database port for writing data using UDP | | -| | +-----------------+----------------------------------------------------------------+ | -+---------------+--------------------------------------------------------------------------------------+ - -**Note:** UDP packets can have a maximum size of 64KB. When using UDP for writing timeseries -data, if the size of the data exceeds 64KB, TCP mode will be used instead. - -**Note:** If you want to use the ``openwisp_monitoring.db.backends.influxdb`` backend -with UDP writes enabled, then you need to enable two different ports for UDP -(each for different retention policy) in your InfluxDB configuration. The UDP configuration -section of your InfluxDB should look similar to the following: +Configuration for InfluxDB 2.x +------------------------------ + +.. code-block:: python + + INFLUXDB_2x_DATABASE = { + 'BACKEND': 'openwisp_monitoring.db.backends.influxdb2', + 'TOKEN': 'my-super-secret-auth-token', + 'ORG': 'openwisp', + 'BUCKET': 'openwisp2', + 'HOST': 'influxdb2', + 'PORT': '9999', + } + +Dynamic Configuration Based on Environment +------------------------------------------ + +You can dynamically switch between InfluxDB 1.x and 2.x configurations +using environment variables: + +.. code-block:: python + + import os + + if os.environ.get('USE_INFLUXDB2', 'False') == 'True': + TIMESERIES_DATABASE = INFLUXDB_2x_DATABASE + else: + TIMESERIES_DATABASE = INFLUXDB_1x_DATABASE + + if TESTING: + if os.environ.get('TIMESERIES_UDP', False): + TIMESERIES_DATABASE['OPTIONS'] = {'udp_writes': True, 'udp_port': 8091} + +Explanation of Settings +----------------------- + ++---------------+---------------------------------------------------------------+ +| **Key** | **Description** | ++-------------------------------------------------------------------------------+ +| ``BACKEND`` | The timeseries database backend to use. You can select one | +| | of the backends located in ``openwisp_monitoring.db.backends``| ++---------------+---------------------------------------------------------------+ +| ``USER`` | User for logging into the timeseries database (only for | +| | InfluxDB 1.x) | ++---------------+---------------------------------------------------------------+ +| ``PASSWORD`` | Password of the timeseries database user (only for InfluxDB | +| | 1.x) | ++---------------+---------------------------------------------------------------+ +| ``NAME`` | Name of the timeseries database (only for InfluxDB 1.x) | ++---------------+---------------------------------------------------------------+ +| ``TOKEN`` | Authentication token for InfluxDB 2.x | ++---------------+---------------------------------------------------------------+ +| ``ORG`` | Organization name for InfluxDB 2.x | ++---------------+---------------------------------------------------------------+ +| ``BUCKET`` | Bucket name for InfluxDB 2.x | ++---------------+---------------------------------------------------------------+ +| ``HOST`` | IP address/hostname of machine where the timeseries | +| | database is running | ++---------------+---------------------------------------------------------------+ +| ``PORT`` | Port for connecting to the timeseries database | ++---------------+---------------------------------------------------------------+ +| ``OPTIONS`` | Additional options for the timeseries backend | +| | | +| | +-----------------+-----------------------------------------+ | +| | | ``udp_writes`` | Whether to use UDP for writing data | | +| | | | to the timeseries database | | +| | +-----------------+-----------------------------------------+ | +| | | ``udp_port`` | Timeseries database port for writing | | +| | | | data using UDP | | +| | +-----------------+-----------------------------------------+ | ++---------------+---------------------------------------------------------------+ + +UDP Configuration for InfluxDB 1.x +---------------------------------- + +If you want to use the ``openwisp_monitoring.db.backends.influxdb`` backend +with UDP writes enabled, you need to enable two different ports for UDP +(each for a different retention policy) in your InfluxDB configuration. + +Here is an example of the UDP configuration section in your InfluxDB +configuration file: .. code-block:: text @@ -1479,6 +1535,13 @@ section of your InfluxDB should look similar to the following: database = "openwisp2" retention-policy = 'short' +**Note:** UDP packets can have a maximum size of 64KB. When using UDP for +writing timeseries data, if the size of the data exceeds 64KB, TCP mode +will be used instead. + +Deploying with Ansible +---------------------- + If you are using `ansible-openwisp2 `_ for deploying OpenWISP, you can set the ``influxdb_udp_mode`` ansible variable to ``true`` in your playbook, this will make the ansible role automatically configure the InfluxDB UDP listeners. diff --git a/docker-compose.yml b/docker-compose.yml index a84213ddd..3a8a3f39f 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -28,6 +28,22 @@ services: INFLUXDB_USER: openwisp INFLUXDB_USER_PASSWORD: openwisp + influxdb2: + image: influxdb:2.0 + container_name: influxdb2 + ports: + # Map the 9086 port on host machine to 8086 in container + - "9086:8086" + environment: + DOCKER_INFLUXDB_INIT_MODE: setup + DOCKER_INFLUXDB_INIT_USERNAME: myuser + DOCKER_INFLUXDB_INIT_PASSWORD: mypassword + DOCKER_INFLUXDB_INIT_ORG: myorg + DOCKER_INFLUXDB_INIT_BUCKET: mybucket + DOCKER_INFLUXDB_INIT_RETENTION: 1w + volumes: + - influxdb-storage:/var/lib/influxdb2 + redis: image: redis:5.0-alpine ports: @@ -36,3 +52,4 @@ services: volumes: influxdb-data: {} + influxdb-storage: diff --git a/openwisp_monitoring/db/__init__.py b/openwisp_monitoring/db/__init__.py index 063d2d8f7..64510ebc5 100644 --- a/openwisp_monitoring/db/__init__.py +++ b/openwisp_monitoring/db/__init__.py @@ -1,7 +1,5 @@ from .backends import timeseries_db chart_query = timeseries_db.queries.chart_query -default_chart_query = timeseries_db.queries.default_chart_query -device_data_query = timeseries_db.queries.device_data_query -__all__ = ['timeseries_db', 'chart_query', 'default_chart_query', 'device_data_query'] +__all__ = ['timeseries_db', 'chart_query'] diff --git a/openwisp_monitoring/db/backends/__init__.py b/openwisp_monitoring/db/backends/__init__.py index 715b1113c..518c469e1 100644 --- a/openwisp_monitoring/db/backends/__init__.py +++ b/openwisp_monitoring/db/backends/__init__.py @@ -30,11 +30,16 @@ def load_backend_module(backend_name=TIMESERIES_DB['BACKEND'], module=None): """ try: assert 'BACKEND' in TIMESERIES_DB, 'BACKEND' - assert 'USER' in TIMESERIES_DB, 'USER' - assert 'PASSWORD' in TIMESERIES_DB, 'PASSWORD' - assert 'NAME' in TIMESERIES_DB, 'NAME' - assert 'HOST' in TIMESERIES_DB, 'HOST' - assert 'PORT' in TIMESERIES_DB, 'PORT' + if 'BACKEND' in TIMESERIES_DB and '2' in TIMESERIES_DB['BACKEND']: + # InfluxDB 2.x specific checks + assert 'TOKEN' in TIMESERIES_DB, 'TOKEN' + assert 'ORG' in TIMESERIES_DB, 'ORG' + assert 'BUCKET' in TIMESERIES_DB, 'BUCKET' + else: + # InfluxDB 1.x specific checks + assert 'USER' in TIMESERIES_DB, 'USER' + assert 'PASSWORD' in TIMESERIES_DB, 'PASSWORD' + assert 'NAME' in TIMESERIES_DB, 'NAME' if module: return import_module(f'{backend_name}.{module}') else: @@ -48,7 +53,7 @@ def load_backend_module(backend_name=TIMESERIES_DB['BACKEND'], module=None): except ImportError as e: # The database backend wasn't found. Display a helpful error message # listing all built-in database backends. - builtin_backends = ['influxdb'] + builtin_backends = ['influxdb', 'influxdb2'] if backend_name not in [ f'openwisp_monitoring.db.backends.{b}' for b in builtin_backends ]: diff --git a/openwisp_monitoring/db/backends/influxdb/client.py b/openwisp_monitoring/db/backends/influxdb/client.py index 906769a00..583ce1fac 100644 --- a/openwisp_monitoring/db/backends/influxdb/client.py +++ b/openwisp_monitoring/db/backends/influxdb/client.py @@ -56,7 +56,6 @@ class DatabaseClient(object): backend_name = 'influxdb' def __init__(self, db_name=None): - self._db = None self.db_name = db_name or TIMESERIES_DB['NAME'] self.client_error = InfluxDBClientError @@ -255,7 +254,7 @@ def read(self, key, fields, tags, **kwargs): q = f'{q} LIMIT {limit}' return list(self.query(q, precision='s').get_points()) - def get_list_query(self, query, precision='s'): + def get_list_query(self, query, precision='s', **kwargs): result = self.query(query, precision=precision) if not len(result.keys()) or result.keys()[0][1] is None: return list(result.get_points()) @@ -426,6 +425,7 @@ def __transform_field(self, field, function, operation=None): def _get_top_fields( self, + default_query, query, params, chart_type, @@ -433,9 +433,15 @@ def _get_top_fields( number, time, timezone=settings.TIME_ZONE, + get_fields=True, ): + """ + Returns top fields if ``get_fields`` set to ``True`` (default) + else it returns points containing the top fields. + """ + q = default_query.replace('{field_name}', '{fields}') q = self.get_query( - query=query, + query=q, params=params, chart_type=chart_type, group_map=group_map, @@ -444,7 +450,7 @@ def _get_top_fields( time=time, timezone=timezone, ) - res = list(self.query(q, precision='s').get_points()) + res = self.get_list_query(q) if not res: return [] res = res[0] @@ -454,4 +460,31 @@ def _get_top_fields( keys = list(sorted_dict.keys()) keys.reverse() top = keys[0:number] - return [item.replace('sum_', '') for item in top] + top_fields = [item.replace('sum_', '') for item in top] + if get_fields: + return top_fields + query = self.get_query( + query=query, + params=params, + chart_type=chart_type, + group_map=group_map, + summary=True, + fields=top_fields, + time=time, + timezone=timezone, + ) + return self.get_list_query(query) + + def default_chart_query(self, tags): + q = "SELECT {field_name} FROM {key} WHERE time >= '{time}'" + if tags: + q += " AND content_type = '{content_type}' AND object_id = '{object_id}'" + return q + + def _device_data(self, key, tags, rp, **kwargs): + """ returns last snapshot of ``device_data`` """ + query = ( + f"SELECT data FROM {rp}.{key} WHERE pk = '{tags['pk']}' " + "ORDER BY time DESC LIMIT 1" + ) + return self.get_list_query(query, precision=None) diff --git a/openwisp_monitoring/db/backends/influxdb/queries.py b/openwisp_monitoring/db/backends/influxdb/queries.py index f3f64aa2e..185677a9c 100644 --- a/openwisp_monitoring/db/backends/influxdb/queries.py +++ b/openwisp_monitoring/db/backends/influxdb/queries.py @@ -144,12 +144,3 @@ ) }, } - -default_chart_query = [ - "SELECT {field_name} FROM {key} WHERE time >= '{time}' {end_date}", - " AND content_type = '{content_type}' AND object_id = '{object_id}'", -] - -device_data_query = ( - "SELECT data FROM {0}.{1} WHERE pk = '{2}' " "ORDER BY time DESC LIMIT 1" -) diff --git a/openwisp_monitoring/monitoring/migrations/influxdb/__ini__.py b/openwisp_monitoring/db/backends/influxdb2/__init__.py similarity index 100% rename from openwisp_monitoring/monitoring/migrations/influxdb/__ini__.py rename to openwisp_monitoring/db/backends/influxdb2/__init__.py diff --git a/openwisp_monitoring/db/backends/influxdb2/client.py b/openwisp_monitoring/db/backends/influxdb2/client.py new file mode 100644 index 000000000..b2401548f --- /dev/null +++ b/openwisp_monitoring/db/backends/influxdb2/client.py @@ -0,0 +1,448 @@ +from datetime import datetime, time, timezone +from django.conf import settings +from influxdb_client import InfluxDBClient, Point, WritePrecision +from influxdb_client.client.write_api import SYNCHRONOUS +import re +import pytz +from django.utils.timezone import now +import logging +from .. import TIMESERIES_DB +from django.core.exceptions import ValidationError +from influxdb_client.rest import ApiException as InfluxDBClientError +from django.utils.translation import gettext_lazy as _ +from django.utils.dateparse import parse_datetime + + +logger = logging.getLogger(__name__) + +class DatabaseClient: + _AGGREGATE = [ + 'COUNT', 'DISTINCT', 'INTEGRAL', 'MEAN', 'MEDIAN', 'MODE', + 'SPREAD', 'STDDEV', 'SUM', 'BOTTOM', 'FIRST', 'LAST', + 'MAX', 'MIN', 'PERCENTILE', 'SAMPLE', 'TOP', 'CEILING', + 'CUMULATIVE_SUM', 'DERIVATIVE', 'DIFFERENCE', 'ELAPSED', + 'FLOOR', 'HISTOGRAM', 'MOVING_AVERAGE', 'NON_NEGATIVE_DERIVATIVE', + 'HOLT_WINTERS' + ] + _FORBIDDEN = ['drop', 'delete', 'alter', 'into'] + backend_name = 'influxdb2' + + def __init__(self, bucket=None, org=None, token=None, url=None): + self.bucket = bucket or TIMESERIES_DB['BUCKET'] + self.org = org or TIMESERIES_DB['ORG'] + self.token = token or TIMESERIES_DB['TOKEN'] + self.url = url or f'http://{TIMESERIES_DB["HOST"]}:{TIMESERIES_DB["PORT"]}' + self.client = InfluxDBClient(url=self.url, token=self.token, org=self.org) + self.write_api = self.client.write_api(write_options=SYNCHRONOUS) + self.query_api = self.client.query_api() + self.forbidden_pattern = re.compile( + r'\b(' + '|'.join(self._FORBIDDEN) + r')\b', re.IGNORECASE + ) + self.client_error = InfluxDBClientError + + def create_database(self): + logger.debug('InfluxDB 2.0 does not require explicit database creation.') + # self.create_bucket(self.bucket) + + def drop_database(self): + logger.debug('InfluxDB 2.0 does not support dropping databases via the client.') + + def create_or_alter_retention_policy(self, name, duration): + logger.debug('InfluxDB 2.0 handles retention policies via bucket settings.') + + def create_bucket(self, bucket, retention_rules=None): + bucket_api = self.client.buckets_api() + try: + existing_bucket = bucket_api.find_bucket_by_name(bucket) + if existing_bucket: + logger.info(f'Bucket "{bucket}" already exists.') + return + except Exception as e: + logger.error(f"Error checking for existing bucket: {e}") + + try: + bucket_api.create_bucket(bucket_name=bucket, retention_rules=retention_rules, org=self.org) + logger.info(f'Created bucket "{bucket}"') + except self.client_error as e: + if "already exists" in str(e): + logger.info(f'Bucket "{bucket}" already exists.') + else: + logger.error(f"Error creating bucket: {e}") + raise + + def drop_bucket(self): + bucket_api = self.client.buckets_api() + bucket = bucket_api.find_bucket_by_name(self.bucket) + if bucket: + bucket_api.delete_bucket(bucket.id) + logger.debug(f'Dropped InfluxDB bucket "{self.bucket}"') + + def _get_timestamp(self, timestamp=None): + timestamp = timestamp or now() + if isinstance(timestamp, datetime): + return timestamp.strftime('%Y-%m-%dT%H:%M:%S.%fZ') + return timestamp + + def write(self, name, values, **kwargs): + timestamp = self._get_timestamp(timestamp=kwargs.get('timestamp')) + try: + point = { + 'measurement': name, + 'tags': kwargs.get('tags'), + 'fields': values, + 'time': timestamp, + } + print(f"Writing point to InfluxDB: {point}") + self.write_api.write(bucket=self.bucket, org=self.org, record=point) + print("Successfully wrote point to InfluxDB") + except Exception as e: + print(f"Error writing to InfluxDB: {e}") + + def batch_write(self, metric_data): + points = [] + for data in metric_data: + timestamp = self._get_timestamp(timestamp=data.get('timestamp')) + point = Point(data.get('name')).tag(**data.get('tags', {})).field(**data.get('values')).time(timestamp, WritePrecision.NS) + points.append(point) + try: + self.write_api.write(bucket=self.bucket, org=self.org, record=points) + except Exception as e: + logger.error(f"Error writing batch to InfluxDB: {e}") + + def _format_date(self, date_str): + if date_str is None or date_str == 'now()': + return date_str + try: + date = datetime.strptime(date_str, '%Y-%m-%d %H:%M:%S') + return date.strftime('%Y-%m-%dT%H:%M:%SZ') + except ValueError: + # If the date_str is not in the expected format, return it as is + return date_str + + def query(self, query): + print(f"Executing query: {query}") + try: + result = self.query_api.query(query) + print(f"Query result: {result}") + return result + except Exception as e: + print(f"Error executing query: {e}") + logger.error(f"Error executing query: {e}") + return [] + + def _parse_query_result(self, result): + print("Parsing query result") + parsed_result = [] + for table in result: + for record in table.records: + parsed_record = { + 'time': record.get_time().isoformat(), + } + for key, value in record.values.items(): + if key not in ['_time', '_start', '_stop', '_measurement']: + parsed_record[key] = value + parsed_result.append(parsed_record) + print(f"Parsed result: {parsed_result}") + return parsed_result + + def read(self, key, fields, tags, **kwargs): + extra_fields = kwargs.get('extra_fields') + since = kwargs.get('since', '-30d') # Default to last 30 days if not specified + order = kwargs.get('order') + limit = kwargs.get('limit') + bucket = self.bucket + + # Start building the Flux query + flux_query = f'from(bucket:"{bucket}")' + + # Add time range + flux_query += f'\n |> range(start: {since})' + + # Filter by measurement (key) + flux_query += f'\n |> filter(fn: (r) => r["_measurement"] == "{key}")' + + # Filter by fields + if fields != '*': + if extra_fields and extra_fields != '*': + all_fields = [fields] + extra_fields if isinstance(extra_fields, list) else [fields, extra_fields] + field_filter = ' or '.join([f'r["_field"] == "{field}"' for field in all_fields]) + else: + field_filter = f'r["_field"] == "{fields}"' + flux_query += f'\n |> filter(fn: (r) => {field_filter})' + + # Filter by tags + if tags: + tag_filters = ' and '.join([f'r["{tag}"] == "{value}"' for tag, value in tags.items()]) + flux_query += f'\n |> filter(fn: (r) => {tag_filters})' + + # Add ordering + if order: + if order in ['time', '-time']: + desc = 'true' if order == '-time' else 'false' + flux_query += f'\n |> sort(columns: ["_time"], desc: {desc})' + else: + raise self.client_error( + f'Invalid order "{order}" passed.\nYou may pass "time" / "-time" to get ' + 'result sorted in ascending /descending order respectively.' + ) + + # Add limit + if limit: + flux_query += f'\n |> limit(n: {limit})' + + # Pivot the result to make it similar to InfluxDB 1.x output + flux_query += '\n |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")' + + # Execute the query + try: + result = self.query_api.query(flux_query) + return self._parse_read_result(result) + except Exception as e: + logger.error(f"Error executing read query: {e}") + return [] + + def _parse_read_result(self, result): + parsed_result = [] + for table in result: + for record in table.records: + parsed_record = { + 'time': record.get_time().isoformat(), + } + for key, value in record.values.items(): + if key not in ['_time', '_start', '_stop', '_measurement']: + parsed_record[key] = value + parsed_result.append(parsed_record) + return parsed_result + + def execute_query(self, query): + try: + result = self.query_api.query(query) + return self._parse_result(result) + except Exception as e: + logger.error(f"Error executing query: {e}") + return [] + + def _parse_result(self, result): + parsed_result = [] + for table in result: + for record in table.records: + parsed_record = { + 'time': record.get_time().isoformat(), + 'device_id': record.values.get('object_id'), + 'field': record.values.get('_field'), + 'value': record.values.get('_value') + } + parsed_result.append(parsed_record) + return parsed_result + + def delete_metric_data(self, key=None, tags=None): + start = "1970-01-01T00:00:00Z" + stop = "2100-01-01T00:00:00Z" + predicate = "" + if key: + predicate += f'r._measurement == "{key}"' + if tags: + tag_filters = ' and '.join([f'r["{tag}"] == "{value}"' for tag, value in tags.items()]) + if predicate: + predicate += f' and {tag_filters}' + else: + predicate = tag_filters + self.client.delete_api().delete(start, stop, predicate, bucket=self.bucket, org=self.org) + + def validate_query(self, query): + for word in self._FORBIDDEN: + if word in query.lower(): + msg = _(f'the word "{word.upper()}" is not allowed') + raise ValidationError({'configuration': msg}) + return self._is_aggregate(query) + + def _is_aggregate(self, q): + q = q.upper() + for word in self._AGGREGATE: + if any(['%s(' % word in q, '|%s}' % word in q, '|%s|' % word in q]): + return True + return False + + def _clean_params(self, params): + if params.get('end_date'): + params['end_date'] = f"stop: {params['end_date']}" + else: + params['end_date'] = '' + + for key, value in params.items(): + if isinstance(value, (list, tuple)): + params[key] = self._get_filter_query(key, value) + + return params + + def _get_filter_query(self, field, items): + if not items: + return '' + filters = [] + for item in items: + filters.append(f'r["{field}"] == "{item}"') + return f'|> filter(fn: (r) => {" or ".join(filters)})' + + def get_query( + self, + chart_type, + params, + time, + group_map, + summary=False, + fields=None, + query=None, + timezone=settings.TIME_ZONE + ): + bucket = self.bucket + measurement = params.get('key') + if not measurement or measurement == 'None': + logger.error("Invalid or missing measurement in params") + return None + + start_date = params.get('start_date') + end_date = params.get('end_date') + + # Set default values for start and end dates if they're None + if start_date is None: + start_date = f'-{time}' + if end_date is None: + end_date = 'now()' + + content_type = params.get('content_type') + object_id = params.get('object_id') + field_name = params.get('field_name') or fields + + object_id_filter = f' and r.object_id == "{object_id}"' if object_id else "" + + flux_query = f''' + from(bucket: "{bucket}") + |> range(start: {start_date}, stop: {end_date}) + |> filter(fn: (r) => r._measurement == "{measurement}") + |> filter(fn: (r) => r.content_type == "{content_type}"{object_id_filter}) + ''' + + if field_name: + if isinstance(field_name, (list, tuple)): + field_filter = ' or '.join([f'r._field == "{field}"' for field in field_name]) + else: + field_filter = f'r._field == "{field_name}"' + flux_query += f' |> filter(fn: (r) => {field_filter})\n' + + logger.debug(f"Time: {time}") + logger.debug(f"Group map: {group_map}") + window = group_map.get(time, '1h') + logger.debug(f"Window: {window}") + + if not summary: + flux_query += f' |> aggregateWindow(every: {window}, fn: mean, createEmpty: false)\n' + else: + flux_query += ' |> last()\n' + + flux_query += ' |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")\n' + flux_query += ' |> yield(name: "result")' + + logger.debug(f"Generated Flux query: {flux_query}") + return flux_query + + def _fields(self, fields, query, field_name): + matches = re.search(self._fields_regex, query) + if not matches and not fields: + return query + elif matches and not fields: + groups = matches.groupdict() + fields_key = groups.get('group') + fields = [field_name] + if fields and matches: + groups = matches.groupdict() + function = groups['func'] + operation = groups.get('op') + fields = [self.__transform_field(f, function, operation) for f in fields] + fields_key = groups.get('group') + else: + fields_key = '{fields}' + if fields: + selected_fields = ', '.join(fields) + return query.replace(fields_key, selected_fields) + + def __transform_field(self, field, function, operation=None): + if operation: + operation = f' |> {operation}' + else: + operation = '' + return f'{function}(r.{field}){operation} |> rename(columns: {{_{field}: "{field}"}})' + + def _get_top_fields(self, query, params, chart_type, group_map, number, time, timezone=settings.TIME_ZONE): + q = self.get_query(query=query, params=params, chart_type=chart_type, group_map=group_map, summary=True, fields=['SUM(*)'], time=time, timezone=timezone) + flux_query = f''' + {q} + |> aggregateWindow(every: {time}, fn: sum, createEmpty: false) + |> group(columns: ["_field"]) + |> sum() + |> sort(columns: ["_value"], desc: true) + |> limit(n: {number}) + |> map(fn: (r) => ({{ r with _field: r._field }})) + ''' + result = list(self.query_api.query(flux_query)) + top_fields = [record["_field"] for table in result for record in table.records] + return top_fields + + def default_chart_query(self, tags): + q = f''' + from(bucket: "{self.bucket}") + |> range(start: {{time}}) + |> filter(fn: (r) => r._measurement == "{{key}}") + |> filter(fn: (r) => r._field == "{{field_name}}") + ''' + if tags: + q += ''' + |> filter(fn: (r) => r.content_type == "{{content_type}}") + |> filter(fn: (r) => r.object_id == "{{object_id}}") + ''' + if '{{end_date}}' in tags: + q += ' |> range(stop: {{end_date}})' + return q + + def _device_data(self, key, tags, rp, **kwargs): + """ returns last snapshot of ``device_data`` """ + query = f''' + from(bucket: "{self.bucket}") + |> range(start: -30d) + |> filter(fn: (r) => r._measurement == "ping") + |> filter(fn: (r) => r.pk == "{tags['pk']}") + |> last() + |> yield(name: "last") + ''' + print(f"Modified _device_data query: {query}") + return self.get_list_query(query, precision=None) + + def get_list_query(self, query, precision='s', **kwargs): + print(f"get_list_query called with query: {query}") + result = self.query(query) + parsed_result = self._parse_query_result(result) if result else [] + print(f"get_list_query result: {parsed_result}") + return parsed_result + + def get_device_data_structure(self, device_pk): + query = f''' + from(bucket: "{self.bucket}") + |> range(start: -30d) + |> filter(fn: (r) => r._measurement == "ping") + |> filter(fn: (r) => r.pk == "{device_pk}") + |> limit(n: 1) + ''' + print(f"Checking device data structure: {query}") + result = self.query(query) + if result: + for table in result: + for record in table.records: + print(f"Sample record: {record}") + print(f"Available fields: {record.values.keys()}") + else: + print("No data found for this device") + + def close(self): + self.client.close() + +#todo +# bucket_api.find_bucket_by_name("openwisp") diff --git a/openwisp_monitoring/db/backends/influxdb2/queries.py b/openwisp_monitoring/db/backends/influxdb2/queries.py new file mode 100644 index 000000000..0af4588da --- /dev/null +++ b/openwisp_monitoring/db/backends/influxdb2/queries.py @@ -0,0 +1,272 @@ +chart_query = { + 'uptime': { + 'influxdb2': ( + 'from(bucket: "mybucket")' + ' |> range(start: {time}, stop: {end_date})' + ' |> filter(fn: (r) => r._measurement == "{measurement}")' + ' |> filter(fn: (r) => r._field == "{field_name}")' + ' |> filter(fn: (r) => r.content_type == "{content_type}")' + ' |> filter(fn: (r) => r.object_id == "{object_id}")' + ' |> mean()' + ' |> map(fn: (r) => ({ r with _value: r._value * 100.0 }))' + ' |> aggregateWindow(every: 1d, fn: mean, createEmpty: false)' + ' |> yield(name: "uptime")' + ) + }, + 'packet_loss': { + 'influxdb2': ( + 'from(bucket: "mybucket")' + ' |> range(start: {time}, stop: {end_date})' + ' |> filter(fn: (r) => r._measurement == "{measurement}")' + ' |> filter(fn: (r) => r._field == "loss")' + ' |> filter(fn: (r) => r.content_type == "{content_type}")' + ' |> filter(fn: (r) => r.object_id == "{object_id}")' + ' |> aggregateWindow(every: 1d, fn: mean, createEmpty: false)' + ' |> yield(name: "packet_loss")' + ) + }, + 'rtt': { + 'influxdb2': ( + 'from(bucket: "mybucket")' + ' |> range(start: {time}, stop: {end_date})' + ' |> filter(fn: (r) => r._measurement == "{measurement}")' + ' |> filter(fn: (r) => r._field == "rtt_avg" or r._field == "rtt_max" or r._field == "rtt_min")' + ' |> filter(fn: (r) => r.content_type == "{content_type}")' + ' |> filter(fn: (r) => r.object_id == "{object_id}")' + ' |> aggregateWindow(every: 1d, fn: mean, createEmpty: false)' + ' |> rename(columns: {_time: "time"})' + ' |> pivot(rowKey:["time"], columnKey: ["_field"], valueColumn: "_value")' + ' |> yield(name: "rtt")' + ) + }, + 'wifi_clients': { + 'influxdb2': ( + 'from(bucket: "mybucket")' + ' |> range(start: {time}, stop: {end_date})' + ' |> filter(fn: (r) => r._measurement == "{measurement}")' + ' |> filter(fn: (r) => r._field == "{field_name}")' + ' |> filter(fn: (r) => r.content_type == "{content_type}")' + ' |> filter(fn: (r) => r.object_id == "{object_id}")' + ' |> filter(fn: (r) => r.ifname == "{ifname}")' + ' |> group()' + ' |> distinct()' + ' |> count()' + ' |> set(key: "_field", value: "wifi_clients")' + ' |> aggregateWindow(every: 1d, fn: max)' + ) + }, + 'general_wifi_clients': { + 'influxdb2': ( + 'from(bucket: "mybucket")' + ' |> range(start: {time}, stop: {end_date})' + ' |> filter(fn: (r) => r._measurement == "{measurement}")' + ' |> filter(fn: (r) => r._field == "{field_name}")' + ' |> filter(fn: (r) => r.organization_id == "{organization_id}")' + ' |> filter(fn: (r) => r.location_id == "{location_id}")' + ' |> filter(fn: (r) => r.floorplan_id == "{floorplan_id}")' + ' |> group()' + ' |> distinct()' + ' |> count()' + ' |> set(key: "_field", value: "wifi_clients")' + ' |> aggregateWindow(every: 1d, fn: max)' + ) + }, + 'traffic': { + 'influxdb2': ( + 'from(bucket: "mybucket")' + ' |> range(start: {time}, stop: {end_date})' + ' |> filter(fn: (r) => r._measurement == "{measurement}")' + ' |> filter(fn: (r) => r._field == "tx_bytes" or r._field == "rx_bytes")' + ' |> filter(fn: (r) => r.content_type == "{content_type}")' + ' |> filter(fn: (r) => r.object_id == "{object_id}")' + ' |> filter(fn: (r) => r.ifname == "{ifname}")' + ' |> sum()' + ' |> map(fn: (r) => ({ r with _value: r._value / 1000000000.0 }))' + ' |> aggregateWindow(every: 1d, fn: sum, createEmpty: false)' + ' |> rename(columns: {_time: "time"})' + ' |> pivot(rowKey:["time"], columnKey: ["_field"], valueColumn: "_value")' + ' |> rename(columns: {tx_bytes: "upload", rx_bytes: "download"})' + ' |> yield(name: "traffic")' + ) + }, + 'general_traffic': { + 'influxdb2': ( + 'from(bucket: "mybucket")' + ' |> range(start: {time}, stop: {end_date})' + ' |> filter(fn: (r) => r._measurement == "{measurement}")' + ' |> filter(fn: (r) => r._field == "tx_bytes" or r._field == "rx_bytes")' + ' |> filter(fn: (r) => r.organization_id == "{organization_id}")' + ' |> filter(fn: (r) => r.location_id == "{location_id}")' + ' |> filter(fn: (r) => r.floorplan_id == "{floorplan_id}")' + ' |> filter(fn: (r) => r.ifname == "{ifname}")' + ' |> sum()' + ' |> map(fn: (r) => ({ r with _value: r._value / 1000000000.0 }))' + ' |> aggregateWindow(every: 1d, fn: sum, createEmpty: false)' + ' |> rename(columns: {_time: "time"})' + ' |> pivot(rowKey:["time"], columnKey: ["_field"], valueColumn: "_value")' + ' |> rename(columns: {tx_bytes: "upload", rx_bytes: "download"})' + ' |> yield(name: "general_traffic")' + ) + }, + 'memory': { + 'influxdb2': ( + 'from(bucket: "mybucket")' + ' |> range(start: {time}, stop: {end_date})' + ' |> filter(fn: (r) => r._measurement == "{measurement}")' + ' |> filter(fn: (r) => r._field == "percent_used")' + ' |> filter(fn: (r) => r.content_type == "{content_type}")' + ' |> filter(fn: (r) => r.object_id == "{object_id}")' + ' |> aggregateWindow(every: 1d, fn: mean, createEmpty: false)' + ' |> yield(name: "memory_usage")' + ) + }, + 'cpu': { + 'influxdb2': ( + 'from(bucket: "mybucket")' + ' |> range(start: {time}, stop: {end_date})' + ' |> filter(fn: (r) => r._measurement == "{measurement}")' + ' |> filter(fn: (r) => r._field == "cpu_usage")' + ' |> filter(fn: (r) => r.content_type == "{content_type}")' + ' |> filter(fn: (r) => r.object_id == "{object_id}")' + ' |> aggregateWindow(every: 1d, fn: mean, createEmpty: false)' + ' |> yield(name: "CPU_load")' + ) + }, + 'disk': { + 'influxdb2': ( + 'from(bucket: "mybucket")' + ' |> range(start: {time}, stop: {end_date})' + ' |> filter(fn: (r) => r._measurement == "{measurement}")' + ' |> filter(fn: (r) => r._field == "used_disk")' + ' |> filter(fn: (r) => r.content_type == "{content_type}")' + ' |> filter(fn: (r) => r.object_id == "{object_id}")' + ' |> aggregateWindow(every: 1d, fn: mean, createEmpty: false)' + ' |> yield(name: "disk_usage")' + ) + }, + 'signal_strength': { + 'influxdb2': ( + 'from(bucket: "mybucket")' + ' |> range(start: {time}, stop: {end_date})' + ' |> filter(fn: (r) => r._measurement == "{measurement}")' + ' |> filter(fn: (r) => r._field == "signal_strength" or r._field == "signal_power")' + ' |> filter(fn: (r) => r.content_type == "{content_type}")' + ' |> filter(fn: (r) => r.object_id == "{object_id}")' + ' |> aggregateWindow(every: 1d, fn: mean, createEmpty: false)' + ' |> map(fn: (r) => ({ r with _value: float(v: int(v: r._value)) }))' + ' |> rename(columns: {_time: "time"})' + ' |> pivot(rowKey:["time"], columnKey: ["_field"], valueColumn: "_value")' + ' |> yield(name: "signal_strength")' + ) + }, + 'signal_quality': { + 'influxdb2': ( + 'from(bucket: "mybucket")' + ' |> range(start: {time}, stop: {end_date})' + ' |> filter(fn: (r) => r._measurement == "{measurement}")' + ' |> filter(fn: (r) => r._field == "signal_quality" or r._field == "snr")' + ' |> filter(fn: (r) => r.content_type == "{content_type}")' + ' |> filter(fn: (r) => r.object_id == "{object_id}")' + ' |> aggregateWindow(every: 1d, fn: mean, createEmpty: false)' + ' |> map(fn: (r) => ({ r with _value: float(v: int(v: r._value)) }))' + ' |> rename(columns: {_time: "time"})' + ' |> pivot(rowKey:["time"], columnKey: ["_field"], valueColumn: "_value")' + ' |> yield(name: "signal_quality")' + ) + }, + 'access_tech': { + 'influxdb2': ( + 'from(bucket: "mybucket")' + ' |> range(start: {time}, stop: {end_date})' + ' |> filter(fn: (r) => r._measurement == "{measurement}")' + ' |> filter(fn: (r) => r._field == "access_tech")' + ' |> filter(fn: (r) => r.content_type == "{content_type}")' + ' |> filter(fn: (r) => r.object_id == "{object_id}")' + ' |> aggregateWindow(every: 1d, fn: (column) => mode(column: "_value"), createEmpty: false)' + ' |> yield(name: "access_tech")' + ) + }, + 'bandwidth': { + 'influxdb2': ( + 'from(bucket: "mybucket")' + ' |> range(start: {time}, stop: {end_date})' + ' |> filter(fn: (r) => r._measurement == "{measurement}")' + ' |> filter(fn: (r) => r._field == "sent_bps_tcp" or r._field == "sent_bps_udp")' + ' |> filter(fn: (r) => r.content_type == "{content_type}")' + ' |> filter(fn: (r) => r.object_id == "{object_id}")' + ' |> aggregateWindow(every: 1d, fn: mean, createEmpty: false)' + ' |> map(fn: (r) => ({ r with _value: r._value / 1000000000.0 }))' + ' |> rename(columns: {_time: "time"})' + ' |> pivot(rowKey:["time"], columnKey: ["_field"], valueColumn: "_value")' + ' |> rename(columns: {sent_bps_tcp: "TCP", sent_bps_udp: "UDP"})' + ' |> yield(name: "bandwidth")' + ) + }, + 'transfer': { + 'influxdb2': ( + 'from(bucket: "mybucket")' + ' |> range(start: {time}, stop: {end_date})' + ' |> filter(fn: (r) => r._measurement == "{measurement}")' + ' |> filter(fn: (r) => r._field == "sent_bytes_tcp" or r._field == "sent_bytes_udp")' + ' |> filter(fn: (r) => r.content_type == "{content_type}")' + ' |> filter(fn: (r) => r.object_id == "{object_id}")' + ' |> sum()' + ' |> map(fn: (r) => ({ r with _value: r._value / 1000000000.0 }))' + ' |> rename(columns: {_time: "time"})' + ' |> pivot(rowKey:["time"], columnKey: ["_field"], valueColumn: "_value")' + ' |> rename(columns: {sent_bytes_tcp: "TCP", sent_bytes_udp: "UDP"})' + ' |> yield(name: "transfer")' + ) + }, + 'retransmits': { + 'influxdb2': ( + 'from(bucket: "mybucket")' + ' |> range(start: {time}, stop: {end_date})' + ' |> filter(fn: (r) => r._measurement == "{measurement}")' + ' |> filter(fn: (r) => r._field == "retransmits")' + ' |> filter(fn: (r) => r.content_type == "{content_type}")' + ' |> filter(fn: (r) => r.object_id == "{object_id}")' + ' |> aggregateWindow(every: 1d, fn: mean, createEmpty: false)' + ' |> yield(name: "retransmits")' + ) + }, + 'jitter': { + 'influxdb2': ( + 'from(bucket: "mybucket")' + ' |> range(start: {time}, stop: {end_date})' + ' |> filter(fn: (r) => r._measurement == "{measurement}")' + ' |> filter(fn: (r) => r._field == "jitter")' + ' |> filter(fn: (r) => r.content_type == "{content_type}")' + ' |> filter(fn: (r) => r.object_id == "{object_id}")' + ' |> aggregateWindow(every: 1d, fn: mean, createEmpty: false)' + ' |> yield(name: "jitter")' + ) + }, + 'datagram': { + 'influxdb2': ( + 'from(bucket: "mybucket")' + ' |> range(start: {time}, stop: {end_date})' + ' |> filter(fn: (r) => r._measurement == "{measurement}")' + ' |> filter(fn: (r) => r._field == "lost_packets" or r._field == "total_packets")' + ' |> filter(fn: (r) => r.content_type == "{content_type}")' + ' |> filter(fn: (r) => r.object_id == "{object_id}")' + ' |> aggregateWindow(every: 1d, fn: mean, createEmpty: false)' + ' |> rename(columns: {_time: "time"})' + ' |> pivot(rowKey:["time"], columnKey: ["_field"], valueColumn: "_value")' + ' |> rename(columns: {lost_packets: "lost_datagram", total_packets: "total_datagram"})' + ' |> yield(name: "datagram")' + ) + }, + 'datagram_loss': { + 'influxdb2': ( + 'from(bucket: "mybucket")' + ' |> range(start: {time}, stop: {end_date})' + ' |> filter(fn: (r) => r._measurement == "{measurement}")' + ' |> filter(fn: (r) => r._field == "lost_percent")' + ' |> filter(fn: (r) => r.content_type == "{content_type}")' + ' |> filter(fn: (r) => r.object_id == "{object_id}")' + ' |> aggregateWindow(every: 1d, fn: mean, createEmpty: false)' + ' |> yield(name: "datagram_loss")' + ) + } +} diff --git a/openwisp_monitoring/db/backends/influxdb2/tests.py b/openwisp_monitoring/db/backends/influxdb2/tests.py new file mode 100644 index 000000000..77cd30010 --- /dev/null +++ b/openwisp_monitoring/db/backends/influxdb2/tests.py @@ -0,0 +1,261 @@ +import unittest +from unittest.mock import patch, MagicMock +from datetime import datetime, timedelta +from django.utils.timezone import now +from django.core.exceptions import ValidationError +from freezegun import freeze_time +from influxdb_client.client.write_api import SYNCHRONOUS +from influxdb_client.rest import ApiException +from openwisp_monitoring.db.backends.influxdb2.client import DatabaseClient +from openwisp_monitoring.monitoring.tests import TestMonitoringMixin +from openwisp_monitoring.device.settings import DEFAULT_RETENTION_POLICY, SHORT_RETENTION_POLICY +from openwisp_monitoring.device.utils import DEFAULT_RP, SHORT_RP +from openwisp_monitoring.views import Chart + +from ...exceptions import TimeseriesWriteException +from django.conf import settings + +class TestDatabaseClient(TestMonitoringMixin, unittest.TestCase): + def setUp(self): + self.client = DatabaseClient(bucket="mybucket", org="myorg", token="dltiEmsmMKU__9SoBE0ingFdMTS3UksrESwIQDNtW_3WOgn8bQGdyYzPcx_aDtvZkqvR8RbMkwVVlzUJxpm62w==", url="http://localhost:8086") + + def test_forbidden_queries(self): + queries = [ + 'DROP DATABASE openwisp2', + 'DROP MEASUREMENT test_metric', + 'CREATE DATABASE test', + 'DELETE MEASUREMENT test_metric', + 'ALTER RETENTION POLICY policy', + 'SELECT * INTO metric2 FROM test_metric', + ] + for q in queries: + with self.assertRaises(ValidationError): + self.client.validate_query(q) + + @patch('influxdb_client.InfluxDBClient') + def test_write(self, mock_influxdb_client): + mock_write_api = MagicMock() + mock_influxdb_client.return_value.write_api.return_value = mock_write_api + + self.client.write('test_write', {'value': 2}) + + mock_write_api.write.assert_called_once() + call_args = mock_write_api.write.call_args[1] + self.assertEqual(call_args['bucket'], 'mybucket') + self.assertEqual(call_args['org'], 'myorg') + self.assertIn('record', call_args) + self.assertEqual(call_args['record']['measurement'], 'ping') + self.assertEqual(call_args['record']['fields'], {'value': 2}) + + @patch('influxdb_client.InfluxDBClient') + def test_read(self, mock_influxdb_client): + mock_query_api = MagicMock() + mock_influxdb_client.return_value.query_api.return_value = mock_query_api + + self.client.read('ping', 'field1, field2', {'tag1': 'value1'}) + + mock_query_api.query.assert_called_once() + query = mock_query_api.query.call_args[0][0] + self.assertIn('from(bucket: "mybucket")', query) + self.assertIn('|> filter(fn: (r) => r._measurement == "ping")', query) + self.assertIn('|> filter(fn: (r) => r._field == "field1" or r._field == "field2")', query) + self.assertIn('|> filter(fn: (r) => r["tag1"] == "value1")', query) + + def test_validate_query(self): + valid_query = 'from(bucket:"mybucket") |> range(start: -1h) |> filter(fn: (r) => r._measurement == "cpu")' + self.assertTrue(self.client.validate_query(valid_query)) + + invalid_query = 'DROP DATABASE test' + with self.assertRaises(ValidationError): + self.client.validate_query(invalid_query) + + def test_get_query_with_pdb(self): + # Create a metric + metric = self._create_object_metric( + name='Ping', + key='ping', + field_name='rtt_avg', + content_type='config.device', + ) + chart = self._create_chart( + metric=metric, + configuration='line', + test_data=False + ) + + time = '30d' + group_map = Chart._get_group_map(time) + query = chart.get_query( + time=time, + summary=False, + fields=['loss', 'reachable', 'rtt_avg'], + timezone='UTC' + ) + self.assertIsNotNone(query) + self.assertIn('from(bucket: "mybucket")', query) + self.assertIn('range(start: -30d', query) + self.assertIn('filter(fn: (r) => r._measurement == "ping")', query) + + @patch('influxdb_client.InfluxDBClient') + def test_create_database(self, mock_influxdb_client): + mock_bucket_api = MagicMock() + mock_influxdb_client.return_value.buckets_api.return_value = mock_bucket_api + + self.client.create_database() + mock_bucket_api.find_bucket_by_name.assert_called_once_with('mybucket') + mock_bucket_api.create_bucket.assert_called_once() + + @patch('influxdb_client.InfluxDBClient') + def test_drop_database(self, mock_influxdb_client): + mock_bucket_api = MagicMock() + mock_influxdb_client.return_value.buckets_api.return_value = mock_bucket_api + + self.client.drop_database() + + mock_bucket_api.find_bucket_by_name.assert_called_once_with('mybucket') + mock_bucket_api.delete_bucket.assert_called_once() + + @patch('influxdb_client.InfluxDBClient') + def test_query(self, mock_influxdb_client): + mock_query_api = MagicMock() + mock_influxdb_client.return_value.query_api.return_value = mock_query_api + + test_query = 'from(bucket:"mybucket") |> range(start: -1h) |> filter(fn: (r) => r._measurement == "cpu")' + self.client.query(test_query) + + mock_query_api.query.assert_called_once_with(test_query) + + def test_get_timestamp(self): + timestamp = datetime(2023, 1, 1, 12, 0, 0) + result = self.client._get_timestamp(timestamp) + self.assertEqual(result, '2023-01-01T12:00:00.000000') + + @patch('influxdb_client.InfluxDBClient') + def test_write_exception(self, mock_influxdb_client): + mock_write_api = MagicMock() + mock_write_api.write.side_effect = ApiException(status=500, reason="Server Error") + mock_influxdb_client.return_value.write_api.return_value = mock_write_api + + with self.assertRaises(Exception): + self.client.write('ping', {'value': 2}) + + def test_get_custom_query(self): + c = self._create_chart(test_data=None) + custom_q = c._default_query.replace('{field_name}', '{fields}') + q = c.get_query(query=custom_q, fields=['SUM(*)']) + self.assertIn('SELECT SUM(*) FROM', q) + + def test_is_aggregate_bug(self): + m = self._create_object_metric(name='summary_avg') + c = self._create_chart(metric=m, configuration='dummy') + self.assertFalse(self.client._is_aggregate(c.query)) + + def test_is_aggregate_fields_function(self): + m = self._create_object_metric(name='is_aggregate_func') + c = self._create_chart(metric=m, configuration='uptime') + self.assertTrue(self.client._is_aggregate(c.query)) + + def test_get_query_fields_function(self): + c = self._create_chart(test_data=None, configuration='histogram') + q = c.get_query(fields=['ssh', 'http2', 'apple-music']) + expected = ( + 'SELECT SUM("ssh") / 1 AS ssh, ' + 'SUM("http2") / 1 AS http2, ' + 'SUM("apple-music") / 1 AS apple_music FROM' + ) + self.assertIn(expected, q) + + @patch('influxdb_client.InfluxDBClient') + def test_general_write(self, mock_influxdb_client): + mock_write_api = MagicMock() + mock_influxdb_client.return_value.write_api.return_value = mock_write_api + + m = self._create_general_metric(name='Sync test') + m.write(1) + + mock_write_api.write.assert_called_once() + call_args = mock_write_api.write.call_args[1] + self.assertEqual(call_args['record']['measurement'], 'sync_test') + self.assertEqual(call_args['record']['fields']['value'], 1) + + @patch('influxdb_client.InfluxDBClient') + def test_object_write(self, mock_influxdb_client): + mock_write_api = MagicMock() + mock_influxdb_client.return_value.write_api.return_value = mock_write_api + + om = self._create_object_metric() + om.write(3) + + mock_write_api.write.assert_called_once() + call_args = mock_write_api.write.call_args[1] + self.assertEqual(call_args['record']['measurement'], 'ping') + self.assertEqual(call_args['record']['fields']['value'], 3) + self.assertEqual(call_args['record']['tags']['object_id'], str(om.object_id)) + self.assertEqual(call_args['record']['tags']['content_type'], '.'.join(om.content_type.natural_key())) + + @patch('influxdb_client.InfluxDBClient') + def test_delete_metric_data(self, mock_influxdb_client): + mock_delete_api = MagicMock() + mock_influxdb_client.return_value.delete_api.return_value = mock_delete_api + + self.client.delete_metric_data(key='ping') + + mock_delete_api.delete.assert_called_once() + call_args = mock_delete_api.delete.call_args[1] + self.assertIn('_measurement="ping"', call_args['predicate']) + + def test_get_query_1d(self): + c = self._create_chart(test_data=None, configuration='uptime') + q = c.get_query(time='1d') + last24 = now() - timedelta(days=1) + self.assertIn(str(last24)[0:14], q) + self.assertIn('aggregateWindow(every: 10m', q) + + def test_get_query_30d(self): + c = self._create_chart(test_data=None, configuration='uptime') + q = c.get_query(time='30d') + last30d = now() - timedelta(days=30) + self.assertIn(str(last30d)[0:10], q) + self.assertIn('aggregateWindow(every: 24h', q) + + @patch('influxdb_client.InfluxDBClient') + @freeze_time("2023-01-01") + def test_read_order(self, mock_influxdb_client): + mock_query_api = MagicMock() + mock_influxdb_client.return_value.query_api.return_value = mock_query_api + + m = self._create_general_metric(name='dummy') + m.write(30) + m.write(40, time=now() - timedelta(days=2)) + + # Test ascending read order + m.read(limit=2, order='time') + query = mock_query_api.query.call_args[0][0] + self.assertIn('|> sort(columns: ["time"], desc: false)', query) + + # Test descending read order + m.read(limit=2, order='-time') + query = mock_query_api.query.call_args[0][0] + self.assertIn('|> sort(columns: ["time"], desc: true)', query) + + # Test invalid read order + with self.assertRaises(ValueError): + m.read(limit=2, order='invalid') + + @patch('influxdb_client.InfluxDBClient') + def ping_write_microseconds_precision(self, mock_influxdb_client): + mock_write_api = MagicMock() + mock_influxdb_client.return_value.write_api.return_value = mock_write_api + + m = self._create_object_metric(name='wlan0', key='wlan0', configuration='clients') + m.write('00:14:5c:00:00:00', time=datetime(2020, 7, 31, 22, 5, 47, 235142)) + m.write('00:23:4a:00:00:00', time=datetime(2020, 7, 31, 22, 5, 47, 235152)) + + self.assertEqual(mock_write_api.write.call_count, 2) + call_args_1 = mock_write_api.write.call_args_list[0][1] + call_args_2 = mock_write_api.write.call_args_list[1][1] + self.assertEqual(call_args_1['record']['time'], '2020-07-31T22:05:47.235142') + self.assertEqual(call_args_2['record']['time'], '2020-07-31T22:05:47.235152') + +if __name__ == '__main__': + unittest.main() diff --git a/openwisp_monitoring/device/base/models.py b/openwisp_monitoring/device/base/models.py index 4c7803175..cdb9ba8e8 100644 --- a/openwisp_monitoring/device/base/models.py +++ b/openwisp_monitoring/device/base/models.py @@ -3,6 +3,7 @@ from collections import OrderedDict from datetime import datetime +from django.conf import settings import swapper from cache_memoize import cache_memoize from dateutil.relativedelta import relativedelta @@ -25,7 +26,7 @@ from openwisp_monitoring.device.settings import get_critical_device_metrics from openwisp_utils.base import TimeStampedEditableModel -from ...db import device_data_query, timeseries_db +from ...db import timeseries_db from ...monitoring.signals import threshold_crossed from ...monitoring.tasks import _timeseries_write from ...settings import CACHE_TIMEOUT @@ -155,11 +156,12 @@ def data(self): """ if self.__data: return self.__data - q = device_data_query.format(SHORT_RP, self.__key, self.pk) cache_key = get_device_cache_key(device=self, context='current-data') points = cache.get(cache_key) if not points: - points = timeseries_db.get_list_query(q, precision=None) + points = timeseries_db._device_data( + rp=SHORT_RP, tags={'pk': self.pk}, key=self.__key, fields='data' + ) if not points: return None self.data_timestamp = points[0]['time'] @@ -379,7 +381,7 @@ def update_status(self, value): self.full_clean() self.save() # clear device management_ip when device is offline - if self.status == 'critical' and app_settings.AUTO_CLEAR_MANAGEMENT_IP: + if self.status == '' and app_settings.AUTO_CLEAR_MANAGEMENT_IP: self.device.management_ip = None self.device.save(update_fields=['management_ip']) diff --git a/openwisp_monitoring/device/settings.py b/openwisp_monitoring/device/settings.py index d239e3eac..e4f54da6e 100644 --- a/openwisp_monitoring/device/settings.py +++ b/openwisp_monitoring/device/settings.py @@ -46,7 +46,7 @@ def get_health_status_labels(): DEFAULT_RETENTION_POLICY = get_settings_value('DEFAULT_RETENTION_POLICY', '26280h0m0s') CRITICAL_DEVICE_METRICS = get_critical_device_metrics() HEALTH_STATUS_LABELS = get_health_status_labels() -AUTO_CLEAR_MANAGEMENT_IP = get_settings_value('AUTO_CLEAR_MANAGEMENT_IP', True) +AUTO_CLEAR_MANAGEMENT_IP = get_settings_value('AUTO_CLEAR_MANAGEMENT_IP', False) # Triggers spontaneous recovery of device based on corresponding signals DEVICE_RECOVERY_DETECTION = get_settings_value('DEVICE_RECOVERY_DETECTION', True) MAC_VENDOR_DETECTION = get_settings_value('MAC_VENDOR_DETECTION', True) diff --git a/openwisp_monitoring/device/utils.py b/openwisp_monitoring/device/utils.py index 151b62609..ae3c6bb0e 100644 --- a/openwisp_monitoring/device/utils.py +++ b/openwisp_monitoring/device/utils.py @@ -14,7 +14,7 @@ def manage_short_retention_policy(): creates or updates the "short" retention policy """ duration = app_settings.SHORT_RETENTION_POLICY - timeseries_db.create_or_alter_retention_policy(SHORT_RP, duration) + _manage_retention_policy(SHORT_RP, duration) def manage_default_retention_policy(): @@ -22,4 +22,9 @@ def manage_default_retention_policy(): creates or updates the "default" retention policy """ duration = app_settings.DEFAULT_RETENTION_POLICY - timeseries_db.create_or_alter_retention_policy(DEFAULT_RP, duration) + _manage_retention_policy(DEFAULT_RP, duration) + +def _manage_retention_policy(name, duration): + # For InfluxDB 2.x, we're not managing retention policies directly + # Instead, we ensure the bucket exists + timeseries_db.create_bucket(timeseries_db.bucket) diff --git a/openwisp_monitoring/monitoring/base/models.py b/openwisp_monitoring/monitoring/base/models.py index 5d7bf0ebe..1cd15a71f 100644 --- a/openwisp_monitoring/monitoring/base/models.py +++ b/openwisp_monitoring/monitoring/base/models.py @@ -4,6 +4,9 @@ from copy import deepcopy from datetime import date, datetime, timedelta +from dateutil.parser import parse +from django.utils.timezone import make_aware, is_aware + from cache_memoize import cache_memoize from dateutil.parser import parse as parse_date from django.conf import settings @@ -24,7 +27,7 @@ from openwisp_monitoring.monitoring.utils import clean_timeseries_data_key from openwisp_utils.base import TimeStampedEditableModel -from ...db import default_chart_query, timeseries_db +from ...db import timeseries_db from ...settings import CACHE_TIMEOUT, DEFAULT_CHART_TIME from ..configuration import ( CHART_CONFIGURATION_CHOICES, @@ -617,10 +620,8 @@ def top_fields(self): @property def _default_query(self): - q = default_chart_query[0] - if self.metric.object_id: - q += default_chart_query[1] - return q + tags = True if self.metric.object_id else False + return timeseries_db.default_chart_query(tags) @classmethod def _get_group_map(cls, time=None): @@ -674,29 +675,34 @@ def get_query( additional_params = additional_params or {} params = self._get_query_params(time, start_date, end_date) params.update(additional_params) - params.update({'start_date': start_date, 'end_date': end_date}) + params.update({ + 'start_date': start_date, + 'end_date': end_date, + # 'measurement': self.config_dict.get('measurement', self.metric.key), + # 'field_name': fields or self.config_dict.get('field_name'), + }) if not params.get('organization_id') and self.config_dict.get('__all__', False): params['organization_id'] = ['__all__'] return timeseries_db.get_query( - self.type, - params, - time, - self._get_group_map(time), - summary, - fields, - query, - timezone, + chart_type=self.type, + params=params, + time=time, + group_map=self._get_group_map(time), + summary=summary, + fields=fields, + query=query, + timezone=timezone, ) - + def get_top_fields(self, number): """ Returns list of top ``number`` of fields (highest sum) of a measurement in the specified time range (descending order). """ - q = self._default_query.replace('{field_name}', '{fields}') params = self._get_query_params(self.DEFAULT_TIME) return timeseries_db._get_top_fields( - query=q, + default_query=self._default_query, + query=self.get_query(), chart_type=self.type, group_map=self._get_group_map(params['days']), number=number, @@ -754,8 +760,9 @@ def read( ): additional_query_kwargs = additional_query_kwargs or {} traces = {} - if x_axys: - x = [] + x = [] + result = {'traces': [], 'summary': {}} # Initialize result dictionary + try: query_kwargs = dict( time=time, timezone=timezone, start_date=start_date, end_date=end_date @@ -773,38 +780,77 @@ def read( points = timeseries_db.get_list_query(data_query) summary = timeseries_db.get_list_query(summary_query) except timeseries_db.client_error as e: - logging.error(e, exc_info=True) + logger.error(f"Error fetching data: {e}", exc_info=True) raise e + for point in points: + time_value = point.get('time') + try: + formatted_time = self._parse_and_format_time(time_value, timezone) + except ValueError as e: + logger.warning(f"Error parsing time value: {time_value}. Error: {e}") + continue + for key, value in point.items(): - if key == 'time': + if key in ('time', 'result', 'table', 'content_type', 'object_id'): continue traces.setdefault(key, []) if decimal_places and isinstance(value, (int, float)): value = self._round(value, decimal_places) traces[key].append(value) - time = datetime.fromtimestamp(point['time'], tz=tz(timezone)).strftime( - '%Y-%m-%d %H:%M' - ) + if x_axys: - x.append(time) - # prepare result to be returned - # (transform chart data so its order is not random) - result = {'traces': sorted(traces.items())} + x.append(formatted_time) + + # Prepare result + result['traces'] = sorted(traces.items()) if x_axys: result['x'] = x - # add summary - if len(summary) > 0: - result['summary'] = {} + + # Handle summary calculation + if summary: for key, value in summary[0].items(): - if key == 'time': + if key in ('time', 'result', 'table', 'content_type', 'object_id'): continue if not timeseries_db.validate_query(self.query): value = None - elif value: + elif value is not None: value = self._round(value, decimal_places) - result['summary'][key] = value + result['summary'][key] = 'N/A' if value is None else value + return result + + def _round(self, value, decimal_places): + logger.debug(f"Rounding value: {value}, type: {type(value)}") + if value is None: + logger.debug("Value is None, returning None") + return None + try: + float_value = float(value) + rounded = round(float_value, decimal_places) + logger.debug(f"Rounded value: {rounded}") + return rounded + except (ValueError, TypeError) as e: + logger.warning(f"Could not round value: {value}. Error: {e}") + return value + + def _parse_and_format_time(self, time_str, timezone): + time_obj = parse(time_str) + if not is_aware(time_obj): + time_obj = make_aware(time_obj, timezone=tz(timezone)) + return time_obj.strftime('%Y-%m-%d %H:%M') + + def _safe_round(self, value, decimal_places): + if isinstance(value, (int, float)): + return self._round(value, decimal_places) + return value + + def _round(self, value, decimal_places): + try: + control = 10 ** decimal_places + return round(float(value) * control) / control + except (ValueError, TypeError): + return value def json(self, time=DEFAULT_TIME, **kwargs): try: diff --git a/openwisp_monitoring/monitoring/migrations/__init__.py b/openwisp_monitoring/monitoring/migrations/__init__.py index 58c517a90..d7d3f6fc1 100644 --- a/openwisp_monitoring/monitoring/migrations/__init__.py +++ b/openwisp_monitoring/monitoring/migrations/__init__.py @@ -1,3 +1,4 @@ +from asyncio.log import logger import swapper from django.contrib.auth.models import Permission @@ -72,6 +73,7 @@ def create_general_metrics(apps, schema_editor): Chart = swapper.load_model('monitoring', 'Chart') Metric = swapper.load_model('monitoring', 'Metric') + metric, created = Metric._get_or_create( configuration='general_clients', name='General Clients', @@ -81,7 +83,7 @@ def create_general_metrics(apps, schema_editor): ) if created: chart = Chart(metric=metric, configuration='gen_wifi_clients') - chart.full_clean() + logger.debug(f'Creating chart with configuration: {chart.configuration}') chart.save() metric, created = Metric._get_or_create( @@ -93,10 +95,9 @@ def create_general_metrics(apps, schema_editor): ) if created: chart = Chart(metric=metric, configuration='general_traffic') - chart.full_clean() + logger.debug(f'Creating chart with configuration: {chart.configuration}') chart.save() - def delete_general_metrics(apps, schema_editor): Metric = apps.get_model('monitoring', 'Metric') Metric.objects.filter(content_type__isnull=True, object_id__isnull=True).delete() diff --git a/openwisp_monitoring/monitoring/migrations/influxdb/__init__.py b/openwisp_monitoring/monitoring/migrations/influxdb/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/openwisp_monitoring/monitoring/migrations/influxdb2/__init__.py b/openwisp_monitoring/monitoring/migrations/influxdb2/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/openwisp_monitoring/monitoring/migrations/influxdb2/influxdb2_alter_structure_0006.py b/openwisp_monitoring/monitoring/migrations/influxdb2/influxdb2_alter_structure_0006.py new file mode 100644 index 000000000..10338cf97 --- /dev/null +++ b/openwisp_monitoring/monitoring/migrations/influxdb2/influxdb2_alter_structure_0006.py @@ -0,0 +1,112 @@ +# openwisp_monitoring/monitoring/migrations/influxdb2/influxdb2_alter_structure_0006.py +import logging +from datetime import datetime, timedelta + +from influxdb_client import InfluxDBClient +from influxdb_client.client.write_api import SYNCHRONOUS +from swapper import load_model + +from openwisp_monitoring.db.backends.influxdb2.client import DatabaseClient +from openwisp_monitoring.db.exceptions import TimeseriesWriteException + +SELECT_QUERY_LIMIT = 1000 +WRITE_BATCH_SIZE = 1000 +CHUNK_SIZE = 1000 +EXCLUDED_MEASUREMENTS = [ + 'ping', + 'config_applied', + 'clients', + 'disk', + 'memory', + 'cpu', + 'signal_strength', + 'signal_quality', + 'access_tech', + 'device_data', + 'traffic', + 'wifi_clients', +] + + +logger = logging.getLogger(__name__) + + +def get_influxdb_client(): + db_config = { + 'bucket': 'mybucket', + 'org': 'myorg', + 'token': 'dltiEmsmMKU__9SoBE0ingFdMTS3UksrESwIQDNtW_3WOgn8bQGdyYzPcx_aDtvZkqvR8RbMkwVVlzUJxpm62w==', + 'url': 'http://localhost:8086', + } + return DatabaseClient(**db_config) + + +def requires_migration(): + client = get_influxdb_client() + query_api = client.client.query_api() + query = f'from(bucket: "{client.bucket}") |> range(start: -1h)' + tsdb_measurements = query_api.query(org=client.org, query=query) + for table in tsdb_measurements: + for record in table.records: + if record.get_measurement() not in EXCLUDED_MEASUREMENTS: + return True + return False + + +def migrate_influxdb_structure(): + if not requires_migration(): + logger.info( + 'Timeseries data migration is already migrated. Skipping migration!' + ) + return + + # Implement your data migration logic here + logger.info('Starting migration for InfluxDB 2.0...') + migrate_wifi_clients() + migrate_traffic_data() + logger.info('Timeseries data migration completed.') + + +def migrate_influxdb_data(query_api, write_api, read_query, measurement, tags): + logger.debug(f'Executing query: {read_query}') + result = query_api.query(org='myorg', query=read_query) + points = [] + + for table in result: + for record in table.records: + point = { + 'measurement': measurement, + 'tags': tags, + 'fields': record.values, + 'time': record.get_time(), + } + points.append(point) + + write_api.write( + bucket='mybucket', org='myorg', record=points, write_options=SYNCHRONOUS + ) + logger.info(f'Migrated data for measurement: {measurement}') + + +def migrate_wifi_clients(): + client = get_influxdb_client() + query_api = client.client.query_api() + write_api = client.client.write_api(write_options=SYNCHRONOUS) + + read_query = 'from(bucket: "mybucket") |> range(start: -30d) |> filter(fn: (r) => r._measurement == "wifi_clients")' + tags = {'source': 'migration'} + + migrate_influxdb_data(query_api, write_api, read_query, 'wifi_clients', tags) + logger.info('"wifi_clients" measurements successfully migrated.') + + +def migrate_traffic_data(): + client = get_influxdb_client() + query_api = client.client.query_api() + write_api = client.client.write_api(write_options=SYNCHRONOUS) + + read_query = 'from(bucket: "mybucket") |> range(start: -30d) |> filter(fn: (r) => r._measurement == "traffic")' + tags = {'source': 'migration'} + + migrate_influxdb_data(query_api, write_api, read_query, 'traffic', tags) + logger.info('"traffic" measurements successfully migrated.') diff --git a/openwisp_monitoring/monitoring/tasks.py b/openwisp_monitoring/monitoring/tasks.py index 392cb6748..d12fac155 100644 --- a/openwisp_monitoring/monitoring/tasks.py +++ b/openwisp_monitoring/monitoring/tasks.py @@ -1,13 +1,26 @@ +from datetime import timezone +import os + from celery import shared_task +from django.conf import settings from django.core.exceptions import ObjectDoesNotExist from swapper import load_model from openwisp_utils.tasks import OpenwispCeleryTask +from openwisp_monitoring.db.backends.influxdb.client import DatabaseClient as InfluxDB1Client +from openwisp_monitoring.db.backends.influxdb2.client import DatabaseClient as InfluxDB2Client + from ..db import timeseries_db from ..db.exceptions import TimeseriesWriteException +from .migrations.influxdb import influxdb_alter_structure_0006 as influxdb_migration +from .migrations.influxdb2 import influxdb2_alter_structure_0006 as influxdb2_migration from .settings import RETRY_OPTIONS from .signals import post_metric_write +from openwisp_monitoring.db.backends.influxdb.client import DatabaseClient as InfluxDB1Client +from openwisp_monitoring.db.backends.influxdb2.client import DatabaseClient as InfluxDB2Client +from django.utils.dateparse import parse_date + def _metric_post_write(name, values, metric, check_threshold_kwargs, **kwargs): @@ -54,18 +67,19 @@ def _timeseries_write(name, values, metric=None, check_threshold_kwargs=None, ** If the timeseries database is using UDP to write data, then write data synchronously. """ - if timeseries_db.use_udp: + if hasattr(timeseries_db, 'use_udp') and timeseries_db.use_udp: + # InfluxDB 1.x with UDP support func = timeseries_write + args = (name, values, metric, check_threshold_kwargs) + elif hasattr(timeseries_db, 'write'): + # InfluxDB 2.0 or InfluxDB 1.x without UDP support + func = timeseries_db.write(name, values, **kwargs) + _metric_post_write(name, values, metric, check_threshold_kwargs, **kwargs) else: + # Fallback to delayed write for other cases func = timeseries_write.delay metric = metric.pk if metric else None - func( - name=name, - values=values, - metric=metric, - check_threshold_kwargs=check_threshold_kwargs, - **kwargs - ) + args = (name, values, metric, check_threshold_kwargs) @shared_task( @@ -99,8 +113,18 @@ def _timeseries_batch_write(data): @shared_task(base=OpenwispCeleryTask) def delete_timeseries(key, tags): - timeseries_db.delete_series(key=key, tags=tags) - + backend = settings.TIMESERIES_DATABASE['BACKEND'] + + if backend == 'openwisp_monitoring.db.backends.influxdb': + # InfluxDB 1.x + client = InfluxDB1Client() + client.delete_series(key=key, tags=tags) + elif backend == 'openwisp_monitoring.db.backends.influxdb2': + # InfluxDB 2.x + # No need to perform any action for InfluxDB 2.x + pass + else: + raise ValueError(f"Unsupported backend: {backend}") @shared_task def migrate_timeseries_database(): @@ -111,8 +135,7 @@ def migrate_timeseries_database(): To be removed in 1.1.0 release. """ - from .migrations.influxdb.influxdb_alter_structure_0006 import ( - migrate_influxdb_structure, - ) - - migrate_influxdb_structure() + if os.environ.get('USE_INFLUXDB2', 'False') == 'True': + influxdb2_migration.migrate_influxdb_structure() + else: + influxdb_migration.migrate_influxdb_structure() diff --git a/openwisp_monitoring/monitoring/tests/__init__.py b/openwisp_monitoring/monitoring/tests/__init__.py index 8f50774e1..3018ddd2b 100644 --- a/openwisp_monitoring/monitoring/tests/__init__.py +++ b/openwisp_monitoring/monitoring/tests/__init__.py @@ -1,6 +1,7 @@ import time from datetime import timedelta +from django.conf import settings from django.core.cache import cache from django.utils.timezone import now from swapper import load_model @@ -91,7 +92,17 @@ "SELECT {fields|SUM|/ 1} FROM {key} " "WHERE time >= '{time}' AND content_type = " "'{content_type}' AND object_id = '{object_id}'" - ) + ), + 'influxdb2': ( + 'from(bucket: "mybucket")' + ' |> range(start: {time}, stop: {end_date})' + ' |> filter(fn: (r) => r._measurement == "{measurement}")' + ' |> filter(fn: (r) => r._field == "{field_name}")' + ' |> filter(fn: (r) => r.content_type == "{content_type}")' + ' |> filter(fn: (r) => r.object_id == "{object_id}")' + ' |> sum()' + ' |> yield(name: "histogram")' + ), }, }, 'dummy': { @@ -108,7 +119,7 @@ 'description': 'Bugged chart for testing purposes.', 'unit': 'bugs', 'order': 999, - 'query': {'influxdb': "BAD"}, + 'query': {'influxdb': "BAD", 'influxdb2': "BAD"}, }, 'default': { 'type': 'line', @@ -120,7 +131,16 @@ 'influxdb': ( "SELECT {field_name} FROM {key} WHERE time >= '{time}' AND " "content_type = '{content_type}' AND object_id = '{object_id}'" - ) + ), + 'influxdb2': ( + 'from(bucket: "mybucket")' + ' |> range(start: {time}, stop: {end_date})' + ' |> filter(fn: (r) => r._measurement == "{measurement}")' + ' |> filter(fn: (r) => r._field == "{field_name}")' + ' |> filter(fn: (r) => r.content_type == "{content_type}")' + ' |> filter(fn: (r) => r.object_id == "{object_id}")' + ' |> yield(name: "default")' + ), }, }, 'multiple_test': { @@ -133,26 +153,56 @@ 'influxdb': ( "SELECT {field_name}, value2 FROM {key} WHERE time >= '{time}' AND " "content_type = '{content_type}' AND object_id = '{object_id}'" - ) + ), + 'influxdb2': ( + 'from(bucket: "mybucket")' + ' |> range(start: {time}, stop: {end_date})' + ' |> filter(fn: (r) => r._measurement == "{measurement}")' + ' |> filter(fn: (r) => r._field == "{field_name}" or r._field == "value2")' + ' |> filter(fn: (r) => r.content_type == "{content_type}")' + ' |> filter(fn: (r) => r.object_id == "{object_id}")' + ' |> yield(name: "multiple_test")' + ), }, }, 'group_by_tag': { 'type': 'stackedbars', 'title': 'Group by tag', - 'description': 'Query is groupped by tag along with time', + 'description': 'Query is grouped by tag along with time', 'unit': 'n.', 'order': 999, 'query': { 'influxdb': ( "SELECT CUMULATIVE_SUM(SUM({field_name})) FROM {key} WHERE time >= '{time}'" " GROUP BY time(1d), metric_num" - ) + ), + 'influxdb2': ( + 'from(bucket: "mybucket")' + ' |> range(start: {time}, stop: {end_date})' + ' |> filter(fn: (r) => r._measurement == "{measurement}")' + ' |> filter(fn: (r) => r._field == "{field_name}")' + ' |> group(columns: ["metric_num"])' + ' |> sum()' + ' |> cumulativeSum()' + ' |> window(every: 1d)' + ' |> yield(name: "group_by_tag")' + ), }, 'summary_query': { 'influxdb': ( "SELECT SUM({field_name}) FROM {key} WHERE time >= '{time}'" " GROUP BY time(30d), metric_num" - ) + ), + 'influxdb2': ( + 'from(bucket: "mybucket")' + ' |> range(start: {time}, stop: {end_date})' + ' |> filter(fn: (r) => r._measurement == "ping")' + ' |> filter(fn: (r) => r._field == "loss")' + ' |> filter(fn: (r) => r.content_type == "{content_type}")' + ' |> filter(fn: (r) => r.object_id == "{object_id}")' + ' |> mean()' + ' |> yield(name: "summary")' + ), }, }, 'mean_test': { @@ -165,7 +215,17 @@ 'influxdb': ( "SELECT MEAN({field_name}) AS {field_name} FROM {key} WHERE time >= '{time}' AND " "content_type = '{content_type}' AND object_id = '{object_id}'" - ) + ), + 'influxdb2': ( + 'from(bucket: "mybucket")' + ' |> range(start: {time}, stop: {end_date})' + ' |> filter(fn: (r) => r._measurement == "{measurement}")' + ' |> filter(fn: (r) => r._field == "{field_name}")' + ' |> filter(fn: (r) => r.content_type == "{content_type}")' + ' |> filter(fn: (r) => r.object_id == "{object_id}")' + ' |> mean()' + ' |> yield(name: "mean_test")' + ), }, }, 'sum_test': { @@ -178,7 +238,17 @@ 'influxdb': ( "SELECT SUM({field_name}) AS {field_name} FROM {key} WHERE time >= '{time}' AND " "content_type = '{content_type}' AND object_id = '{object_id}'" - ) + ), + 'influxdb2': ( + 'from(bucket: "mybucket")' + ' |> range(start: {time}, stop: {end_date})' + ' |> filter(fn: (r) => r._measurement == "{measurement}")' + ' |> filter(fn: (r) => r._field == "{field_name}")' + ' |> filter(fn: (r) => r.content_type == "{content_type}")' + ' |> filter(fn: (r) => r.object_id == "{object_id}")' + ' |> sum()' + ' |> yield(name: "sum_test")' + ), }, }, 'top_fields_mean': { @@ -192,24 +262,64 @@ "SELECT {fields|MEAN} FROM {key} " "WHERE time >= '{time}' AND content_type = " "'{content_type}' AND object_id = '{object_id}'" - ) + ), + 'influxdb2': ( + 'from(bucket: "mybucket")' + ' |> range(start: {time}, stop: {end_date})' + ' |> filter(fn: (r) => r._measurement == "{measurement}")' + ' |> filter(fn: (r) => r._field == "{field_name}")' + ' |> filter(fn: (r) => r.content_type == "{content_type}")' + ' |> filter(fn: (r) => r.object_id == "{object_id}")' + ' |> mean()' + ' |> yield(name: "top_fields_mean")' + ), }, }, } class TestMonitoringMixin(TestOrganizationMixin): - ORIGINAL_DB = TIMESERIES_DB['NAME'] - TEST_DB = f'{ORIGINAL_DB}_test' + INFLUXDB_BACKEND = TIMESERIES_DB.get('BACKEND') + TIMESERIES_DB = getattr(settings, 'TIMESERIES_DATABASE', None) + TEST_DB = f"{TIMESERIES_DB['NAME']}" if 'NAME' in TIMESERIES_DB else 'test_db' + TEST_BUCKET = f"{TIMESERIES_DB['BUCKET']}" + TEST_ORG = f"{TIMESERIES_DB['ORG']}" + TEST_TOKEN = f"{TIMESERIES_DB['TOKEN']}" + + if INFLUXDB_BACKEND == 'openwisp_monitoring.db.backends.influxdb': + # InfluxDB 1.x configuration + ORIGINAL_DB = TIMESERIES_DB['NAME'] + TEST_DB = f"{ORIGINAL_DB}" + elif INFLUXDB_BACKEND == 'openwisp_monitoring.db.backends.influxdb2': + # InfluxDB 2.x configuration + ORG_BUCKET = f"{TIMESERIES_DB['ORG']}/{TIMESERIES_DB['BUCKET']}" + ORIGINAL_DB = ORG_BUCKET + TEST_DB = f"{ORG_BUCKET}" + else: + ORIGINAL_DB = None + TEST_DB = None @classmethod def setUpClass(cls): + # import pdb; pdb.set_trace() # By default timeseries_db.db shall connect to the database # defined in settings when apps are loaded. We don't want that while testing - timeseries_db.db_name = cls.TEST_DB - del timeseries_db.db - del timeseries_db.dbs + if 'NAME' in cls.TIMESERIES_DB: + # InfluxDB 1.8 configuration + timeseries_db.db_name = cls.TEST_DB + del timeseries_db.db + del timeseries_db.dbs + else: + # InfluxDB 2.0 configuration + timeseries_db.bucket = cls.TEST_BUCKET + timeseries_db.org = cls.TEST_ORG + timeseries_db.token = cls.TEST_TOKEN + + # Create the test database or bucket timeseries_db.create_database() + + # Rest of the setup code... + super().setUpClass() for key, value in metrics.items(): register_metric(key, value) for key, value in charts.items(): diff --git a/openwisp_monitoring/monitoring/tests/test_configuration.py b/openwisp_monitoring/monitoring/tests/test_configuration.py index 4ee11b669..202c8d2e6 100644 --- a/openwisp_monitoring/monitoring/tests/test_configuration.py +++ b/openwisp_monitoring/monitoring/tests/test_configuration.py @@ -34,7 +34,17 @@ def _get_new_metric(self): "SELECT {fields|SUM|/ 1} FROM {key} " "WHERE time >= '{time}' AND content_type = " "'{content_type}' AND object_id = '{object_id}'" - ) + ), + 'influxdb2': ( + 'from(bucket: "mybucket")' + ' |> range(start: {time}, stop: {end_date})' + ' |> filter(fn: (r) => r._measurement == "{measurement}")' + ' |> filter(fn: (r) => r._field == "{field_name}")' + ' |> filter(fn: (r) => r.content_type == "{content_type}")' + ' |> filter(fn: (r) => r.object_id == "{object_id}")' + ' |> sum()' + ' |> yield(name: "histogram")' + ), }, } }, diff --git a/requirements.txt b/requirements.txt index 90feaac61..cc87d6f08 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,3 +4,4 @@ django-nested-admin~=4.0.2 netaddr~=0.8.0 python-dateutil>=2.7.0,<3.0.0 openwisp-utils[rest] @ https://github.com/openwisp/openwisp-utils/tarball/master +influxdb-client~=1.21.0 diff --git a/setup.py b/setup.py index 43ca4bb97..3b935de4b 100755 --- a/setup.py +++ b/setup.py @@ -55,6 +55,10 @@ def get_install_requires(): include_package_data=True, zip_safe=False, install_requires=get_install_requires(), + extras_require={ + 'influxdb': ['influxdb>=5.2,<5.3'], + 'influxdb2': ['influxdb-client>=1.17.0,<2.0.0'], + }, classifiers=[ 'Development Status :: 3 - Alpha', 'Environment :: Web Environment', @@ -64,7 +68,10 @@ def get_install_requires(): 'License :: OSI Approved :: GNU General Public License v3 (GPLv3)', 'Operating System :: OS Independent', 'Framework :: Django', - 'Programming Language :: Python :: 2.7', - 'Programming Language :: Python :: 3.4', + 'Programming Language :: Python :: 3.6', + 'Programming Language :: Python :: 3.7', + 'Programming Language :: Python :: 3.8', + 'Programming Language :: Python :: 3.9', + 'Programming Language :: Python :: 3.10', ], ) diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/docker-entrypoint.sh b/tests/docker-entrypoint.sh old mode 100644 new mode 100755 diff --git a/tests/openwisp2/settings.py b/tests/openwisp2/settings.py index c7772d5cd..bd27777df 100644 --- a/tests/openwisp2/settings.py +++ b/tests/openwisp2/settings.py @@ -7,7 +7,6 @@ TESTING = 'test' in sys.argv SHELL = 'shell' in sys.argv or 'shell_plus' in sys.argv BASE_DIR = os.path.dirname(os.path.abspath(__file__)) - DEBUG = True ALLOWED_HOSTS = ['*'] @@ -21,7 +20,7 @@ } } -TIMESERIES_DATABASE = { +INFLUXDB_1x_DATABASE = { 'BACKEND': 'openwisp_monitoring.db.backends.influxdb', 'USER': 'openwisp', 'PASSWORD': 'openwisp', @@ -31,6 +30,22 @@ # UDP writes are disabled by default 'OPTIONS': {'udp_writes': False, 'udp_port': 8089}, } + +# For InfluxDB 2.x +INFLUXDB_2x_DATABASE = { + 'BACKEND': 'openwisp_monitoring.db.backends.influxdb2', + 'TOKEN': 'dltiEmsmMKU__9SoBE0ingFdMTS3UksrESwIQDNtW_3WOgn8bQGdyYzPcx_aDtvZkqvR8RbMkwVVlzUJxpm62w==', + 'ORG': 'myorg', + 'BUCKET': 'mybucket', + 'HOST': os.getenv('INFLUXDB_HOST', 'localhost'), + 'PORT': '8086', +} + +if os.environ.get('USE_INFLUXDB2', 'False') == 'True': + TIMESERIES_DATABASE = INFLUXDB_2x_DATABASE +else: + TIMESERIES_DATABASE = INFLUXDB_1x_DATABASE + if TESTING: if os.environ.get('TIMESERIES_UDP', False): TIMESERIES_DATABASE['OPTIONS'] = {'udp_writes': True, 'udp_port': 8091}