Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

insert field to fieldMap #4

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 41 additions & 33 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,22 @@
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

Author: Ugurcan Dede
Date: September 18, 2023
Date: July 12, 2024
GitHub: https://github.com/ugurcandede

This Python script is designed to perform bulk Elasticsearch data update operations.
This Python script is designed to perform add a field to all Elasticsearch docs with given schema.
It accomplishes the following tasks:
1. Reads document data from a JSON file.
2. Parallelizes document updates.
2. Parallelizes document insertion.
3. Sends each update operation to Elasticsearch in bulk.

Usage:
- This script reads document data from a JSON file and performs document updates on Elasticsearch.
- The JSON file should have the following format:
{
"organizationId1": ["requesterId1", "requesterId2", ...],
"organizationId2": ["requesterId3", "requesterId4", ...],
...
}
{
"develop": 37,
"release": 487
}
- After the execution, it prints start and end times along with the processing time.
"""

Expand All @@ -34,7 +33,6 @@

environments = {
"local": "dev",
"dev": "staging",
"net": "preprod",
"prod": "prod",
}
Expand All @@ -46,46 +44,56 @@
"prod": "http://xxx:9200",
}

headers = {"Content-Type": "application/json"}


def decide_environment(env: str, tenant_id: str) -> str:
def get_update_url(env, tenant_id):
if env in environments:
return f"{environment_addresses[env]}/{tenant_id}_tickets_{environments[env]}/_update_by_query"
return f"{environment_addresses[env]}/{tenant_id}_tickets_{environments[env]}/_update_by_query?refresh=true"
else:
raise EnvironmentError(f"Unknown environment: {env}")


def elastic_query(organization_id, requester_id):
def generate_elastic_query(option_id):
query = {
"script": {
"source": f"ctx._source.organization = {organization_id}",
"source": f"ctx._source.fieldMap['ts.scope'] = ['id':{option_id},'value':{option_id},'type':'SELECT','order':1,'name':'TICKET']",
"lang": "painless",
},
"query": {
"bool": {
"should": [{"term": {"fieldMap.ts.requester.value": requester_id}}]
}
"match_all": {}
},
}
return json.dumps(query)


def send_update_request_bulk(organization_id, requester_ids, env, tenant_id):
headers = {"Content-Type": "application/json"}
url = decide_environment(env, tenant_id)
def send_update_request_bulk(tenant_id, value, env):
try:
url = get_update_url(env, tenant_id)
data = generate_elastic_query(value)

response = requests.post(url=url, data=data, headers=headers)
response.raise_for_status()
print(f"[SUCCESS] tenantId: {tenant_id} field added to fieldMap")
except requests.exceptions.RequestException as e:
print(
f"[ERROR] error while updating documents with tenantId: {tenant_id}, field added not to fieldMap {str(e)}")


def print_elapsed_time(start_time, end_time):
elapsed_time = end_time - start_time

milliseconds = int((elapsed_time - int(elapsed_time)) * 1000)

seconds = int(elapsed_time % 60)
minutes = int((elapsed_time // 60) % 60)
hours = int(elapsed_time // 3600)

print(f"Sending update request for organization id: {organization_id}")
for rid in requester_ids:
data = elastic_query(organization_id, rid)
try:
response = requests.post(url=url, data=data, headers=headers)
response.raise_for_status()
except requests.exceptions.RequestException as e:
print(f"An error occurred while updating documents with organization id {organization_id}: {str(e)}")
print(f"{hours}h:{minutes}m:{seconds}sec:{milliseconds}ms elapsed")


def main():
parser = argparse.ArgumentParser(description="Update Elasticsearch documents.")
parser.add_argument("--tenantId", required=True, help="Tenant ID")
parser.add_argument("--env", required=True, choices=environments.keys(), help="Environment")

args = parser.parse_args()
Expand All @@ -99,19 +107,19 @@ def main():
json_file = json.load(file)

with ThreadPoolExecutor(max_workers=4) as executor:
for key, values in json_file:
executor.submit(send_update_request_bulk, key, values, args.env, args.tenantId)
for tenant_id, values in json_file.items():
if values is not None:
executor.submit(send_update_request_bulk, tenant_id, values, args.env)

except Exception as e:
print(f"An error occurred: {e}")

end_time = time.time()
elapsed_time = end_time - start_time

print("\nFinished updating documents.")

print(f"End Time: {end_time}")
print(f"Elapsed Time: {elapsed_time} seconds")

print_elapsed_time(start_time, end_time)


if __name__ == "__main__":
Expand Down
31 changes: 16 additions & 15 deletions readme.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
## ElasticSearch Bulk Document Updater Script

This Python script is designed to perform bulk Elasticsearch data update operations.
This Python script is designed to perform add a field to all Elasticsearch docs with given schema.

It accomplishes the following tasks:
1. Reads document data from a JSON file.
2. Parallelizes document updates.
Expand All @@ -10,11 +11,10 @@ It accomplishes the following tasks:
- This script reads document data from a JSON file and performs document updates on Elasticsearch.
- The JSON file should have the following format:
```json
{
"organizationId1": ["requesterId1", "requesterId2", ...],
"organizationId2": ["requesterId3", "requesterId4", ...],
...
}
{
"<tenantId>": <fieldOptionId>,
"mpass": 34
}
```
- After the execution, it prints start and end times along with the processing time.

Expand All @@ -26,25 +26,26 @@ It accomplishes the following tasks:

### Generate `result.json` file:
This query can be used to generate the `result.json` file from the database.

- First execute this SQL command to get `schema_names` for all tenants.
```sql
SELECT json_agg(json_build_array(organization_id, user_ids))
FROM (SELECT au.organization_id, jsonb_agg(DISTINCT au.id) AS user_ids
FROM ticket t
LEFT JOIN app_user au ON t.requester_id = au.id
WHERE au.organization_id IS NOT NULL
GROUP BY au.organization_id
) subquery;
SELECT 'SELECT jsonb_object_agg(foo.tenantid, foo.json) FROM(' || string_agg('(select ''' || mt.schema_name ||''' as tenantId, fdeo.id as json from ' || mt.schema_name || '.field_definition fd LEFT JOIN ' || mt.schema_name || '.field_definition_entity_options fdeo on fd.id = fdeo.field_definition_entity_id WHERE fd.key = ''ts.scope'' AND fdeo.label = ''TICKET'')', ' UNION ') || ')as foo' || ';' from main.tenant mt;
```

- Copy generated SQL query string and execute with `\gexec` command using `psql`

> SELECT 'SELECT jsonb_object_agg(foo ................. from main.tenant mt **\gexec**

- Then copy result and paste it to `result.json` file

### How to run:
- Install the `requests` and `argparse` libraries for Python.
- or run the following command:
- `pip install -r requirements.txt`
- Run the script with the following command:
- **tenantId**: The tenantId to be updated.
- **env**: The environment to be updated.
```bash
python script.py --tenantId develop --env local
python script.py --env local
```

---
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
requests==2.32.0
requests==2.32.3
argparse==1.4.0
21 changes: 4 additions & 17 deletions result.json
Original file line number Diff line number Diff line change
@@ -1,17 +1,4 @@
[
[1, [6]],
[3, [61, 91, 103, 118]],
[4, [78, 92]],
[7, [76]],
[8, [67, 75, 100, 110]],
[11, [69]],
[12, [28]],
[13, [68, 87, 111]],
[14, [96]],
[21, [86, 93, 102]],
[22, [82]],
[23, [14, 55]],
[27, [9]],
[29, [58]],
[32, [18]]
]
{
"develop": 30,
"release": 37
}