diff --git a/main.py b/main.py index b8f4667..63e4c9a 100644 --- a/main.py +++ b/main.py @@ -6,23 +6,22 @@ ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Author: Ugurcan Dede -Date: September 18, 2023 +Date: July 12, 2024 GitHub: https://github.com/ugurcandede -This Python script is designed to perform bulk Elasticsearch data update operations. +This Python script is designed to perform add a field to all Elasticsearch docs with given schema. It accomplishes the following tasks: 1. Reads document data from a JSON file. -2. Parallelizes document updates. +2. Parallelizes document insertion. 3. Sends each update operation to Elasticsearch in bulk. Usage: - This script reads document data from a JSON file and performs document updates on Elasticsearch. - The JSON file should have the following format: - { - "organizationId1": ["requesterId1", "requesterId2", ...], - "organizationId2": ["requesterId3", "requesterId4", ...], - ... - } +{ + "develop": 37, + "release": 487 +} - After the execution, it prints start and end times along with the processing time. """ @@ -34,7 +33,6 @@ environments = { "local": "dev", - "dev": "staging", "net": "preprod", "prod": "prod", } @@ -46,46 +44,56 @@ "prod": "http://xxx:9200", } +headers = {"Content-Type": "application/json"} + -def decide_environment(env: str, tenant_id: str) -> str: +def get_update_url(env, tenant_id): if env in environments: - return f"{environment_addresses[env]}/{tenant_id}_tickets_{environments[env]}/_update_by_query" + return f"{environment_addresses[env]}/{tenant_id}_tickets_{environments[env]}/_update_by_query?refresh=true" else: raise EnvironmentError(f"Unknown environment: {env}") -def elastic_query(organization_id, requester_id): +def generate_elastic_query(option_id): query = { "script": { - "source": f"ctx._source.organization = {organization_id}", + "source": f"ctx._source.fieldMap['ts.scope'] = ['id':{option_id},'value':{option_id},'type':'SELECT','order':1,'name':'TICKET']", "lang": "painless", }, "query": { - "bool": { - "should": [{"term": {"fieldMap.ts.requester.value": requester_id}}] - } + "match_all": {} }, } return json.dumps(query) -def send_update_request_bulk(organization_id, requester_ids, env, tenant_id): - headers = {"Content-Type": "application/json"} - url = decide_environment(env, tenant_id) +def send_update_request_bulk(tenant_id, value, env): + try: + url = get_update_url(env, tenant_id) + data = generate_elastic_query(value) + + response = requests.post(url=url, data=data, headers=headers) + response.raise_for_status() + print(f"[SUCCESS] tenantId: {tenant_id} field added to fieldMap") + except requests.exceptions.RequestException as e: + print( + f"[ERROR] error while updating documents with tenantId: {tenant_id}, field added not to fieldMap {str(e)}") + + +def print_elapsed_time(start_time, end_time): + elapsed_time = end_time - start_time + + milliseconds = int((elapsed_time - int(elapsed_time)) * 1000) + + seconds = int(elapsed_time % 60) + minutes = int((elapsed_time // 60) % 60) + hours = int(elapsed_time // 3600) - print(f"Sending update request for organization id: {organization_id}") - for rid in requester_ids: - data = elastic_query(organization_id, rid) - try: - response = requests.post(url=url, data=data, headers=headers) - response.raise_for_status() - except requests.exceptions.RequestException as e: - print(f"An error occurred while updating documents with organization id {organization_id}: {str(e)}") + print(f"{hours}h:{minutes}m:{seconds}sec:{milliseconds}ms elapsed") def main(): parser = argparse.ArgumentParser(description="Update Elasticsearch documents.") - parser.add_argument("--tenantId", required=True, help="Tenant ID") parser.add_argument("--env", required=True, choices=environments.keys(), help="Environment") args = parser.parse_args() @@ -99,19 +107,19 @@ def main(): json_file = json.load(file) with ThreadPoolExecutor(max_workers=4) as executor: - for key, values in json_file: - executor.submit(send_update_request_bulk, key, values, args.env, args.tenantId) + for tenant_id, values in json_file.items(): + if values is not None: + executor.submit(send_update_request_bulk, tenant_id, values, args.env) except Exception as e: print(f"An error occurred: {e}") end_time = time.time() - elapsed_time = end_time - start_time - print("\nFinished updating documents.") print(f"End Time: {end_time}") - print(f"Elapsed Time: {elapsed_time} seconds") + + print_elapsed_time(start_time, end_time) if __name__ == "__main__": diff --git a/readme.md b/readme.md index 0632c54..c4ad851 100644 --- a/readme.md +++ b/readme.md @@ -1,6 +1,7 @@ ## ElasticSearch Bulk Document Updater Script -This Python script is designed to perform bulk Elasticsearch data update operations. +This Python script is designed to perform add a field to all Elasticsearch docs with given schema. + It accomplishes the following tasks: 1. Reads document data from a JSON file. 2. Parallelizes document updates. @@ -10,11 +11,10 @@ It accomplishes the following tasks: - This script reads document data from a JSON file and performs document updates on Elasticsearch. - The JSON file should have the following format: ```json - { - "organizationId1": ["requesterId1", "requesterId2", ...], - "organizationId2": ["requesterId3", "requesterId4", ...], - ... - } +{ + "": , + "mpass": 34 +} ``` - After the execution, it prints start and end times along with the processing time. @@ -26,25 +26,26 @@ It accomplishes the following tasks: ### Generate `result.json` file: This query can be used to generate the `result.json` file from the database. + +- First execute this SQL command to get `schema_names` for all tenants. ```sql -SELECT json_agg(json_build_array(organization_id, user_ids)) -FROM (SELECT au.organization_id, jsonb_agg(DISTINCT au.id) AS user_ids - FROM ticket t - LEFT JOIN app_user au ON t.requester_id = au.id - WHERE au.organization_id IS NOT NULL - GROUP BY au.organization_id - ) subquery; +SELECT 'SELECT jsonb_object_agg(foo.tenantid, foo.json) FROM(' || string_agg('(select ''' || mt.schema_name ||''' as tenantId, fdeo.id as json from ' || mt.schema_name || '.field_definition fd LEFT JOIN ' || mt.schema_name || '.field_definition_entity_options fdeo on fd.id = fdeo.field_definition_entity_id WHERE fd.key = ''ts.scope'' AND fdeo.label = ''TICKET'')', ' UNION ') || ')as foo' || ';' from main.tenant mt; ``` +- Copy generated SQL query string and execute with `\gexec` command using `psql` + +> SELECT 'SELECT jsonb_object_agg(foo ................. from main.tenant mt **\gexec** + +- Then copy result and paste it to `result.json` file + ### How to run: - Install the `requests` and `argparse` libraries for Python. - or run the following command: - `pip install -r requirements.txt` - Run the script with the following command: - - **tenantId**: The tenantId to be updated. - **env**: The environment to be updated. ```bash -python script.py --tenantId develop --env local +python script.py --env local ``` --- diff --git a/requirements.txt b/requirements.txt index 084fe5e..95a791b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,2 @@ -requests==2.32.0 +requests==2.32.3 argparse==1.4.0 \ No newline at end of file diff --git a/result.json b/result.json index 52b7d41..9dac744 100644 --- a/result.json +++ b/result.json @@ -1,17 +1,4 @@ -[ - [1, [6]], - [3, [61, 91, 103, 118]], - [4, [78, 92]], - [7, [76]], - [8, [67, 75, 100, 110]], - [11, [69]], - [12, [28]], - [13, [68, 87, 111]], - [14, [96]], - [21, [86, 93, 102]], - [22, [82]], - [23, [14, 55]], - [27, [9]], - [29, [58]], - [32, [18]] -] +{ + "develop": 30, + "release": 37 +} \ No newline at end of file