import datetime import json import infinity import infinity.index from infinity.common import REMOTE_HOST import infinity.index as index import pandas as pds from apps import vector_handler from apps import api_handler from apps import ComProcess import polars as pls class manage_class: def __init__(self): # Initialize the infinity connection and database at the time of class instantiation self.infinity_obj = infinity.connect(REMOTE_HOST) self.db = self.infinity_obj.get_database("default") self.LoadClassStatesMaster() self.LoadClassDetails() self.LoadClassActionsMaster() self.LoadMasterData() self.LoadIntentData() def GetMasterFnList(self,clsname:str)->str: table_obj = self.db.get_table("t_class_master") if clsname != "": mv = vector_handler.manage_vector() vv = mv.get_vector(clsname) float_list = json.loads(vv) #qb_result = table_obj.match( # "body", clsname, "topn=1").output(["body"]).to_pl() qb_result = table_obj.output(["body"]).knn("vec",float_list,"float","l2",1).match("body",clsname,"topn=1").fusion('rrf').to_pl() #qb_result = table_obj.output(["body"]).knn("vec",float_list,"float","l2",1).to_pl() else: qb_result = table_obj.output(["body"]).to_pl() #pds_result = pds.DataFrame(qb_result) #print("frame:",pds_result) pls_result = pls.DataFrame(qb_result) #print("polar frame:",pls_result) # Convert the 'body' column to a list fn_list = pls_result['body'].to_list() # Create dictionary structure result_dict = {'fn_list': fn_list} # Convert the dictionary to a JSON string json_result = json.dumps(result_dict) return json_result def GetClassDetails(self,classname:str)->str: cls_table_obj = self.db.get_table("fnclass_table") if classname != "": cls_qb = cls_table_obj.query_builder qb_result = cls_qb.match("body",classname).output(["body","req"]).to_pl() #qb_result = cls_qb.filter(f"body='{classname}'").output(["body","req"]).to_pl() j_result = pls.DataFrame(qb_result) if j_result.height <=0: return "" req_values = j_result.select(pls.col('req')).to_series()[0] req_json = json.loads(req_values) #api connectivity apihandle = api_handler.processrequests() apiresult = apihandle.postrequest(req_json) # Parse the initial JSON response response = json.loads(apiresult) # Check the "Sts" value if response['Resp']['Sts'] == '1': # Assuming 'response' is your initial JSON object result_str = response['Resp']['Result'] # First decoding: interprets escaped characters (e.g., converting '\"' to '"') result_str_decoded = json.loads(result_str) # If the string was double-escaped (common in some APIs), decode it again if isinstance(result_str_decoded, str): result_data = json.loads(result_str_decoded) else: result_data = result_str_decoded return result_data def GetNeuranList(self,intentname:str)->str: table_obj = self.db.get_table("t_class_intents") if intentname != "": ''' mv = vector_handler.manage_vector() vv = mv.get_vector(intentname) float_list = json.loads(vv) #qb_result = table_obj.match( # "intent_name", intentname, "topn=1").output(["neuron_id","neuron_name"]).to_pl() qb_result = table_obj.output(["neuron_id","neuron_name"]).knn("int_vec",float_list,"float","l2",3).match("intent_name",intentname).fusion('rrf').to_pl() ''' intent_qb = table_obj.query_builder qb_result = intent_qb.filter(f"intent_name='{intentname}'").output(["neuron_id","neuron_name"]).to_pl() #qb_result = table_obj.output(["body"]).knn("vec",float_list,"float","l2",1).to_pl() else: qb_result = table_obj.output(["neuron_id","neuron_name"]).to_pl() #print("frame:",pds_result) pls_result = pls.DataFrame(qb_result) #print("polar frame:",pls_result) fn_list = pls_result.to_dicts() # Create dictionary structure result_dict = {'neuron_list': fn_list} return result_dict def GetNeuranDetails(self,neuronlist:str)->str: table_obj = self.db.get_table("t_class_intents") if neuronlist != "": # Parse the JSON string into a Python list data = neuronlist # Extract 'neuron_id' values and construct the string neuron_list_val = ' or '.join([f"neuron_id='{item['neuron_id']}'" for item in data]) intent_qb = table_obj.query_builder qb_result = intent_qb.filter(neuron_list_val).output(["neuron_id","neuron_name"]).to_pl() else: qb_result = table_obj.output(["neuron_id","neuron_name"]).to_pl() #print("frame:",pds_result) pls_result = pls.DataFrame(qb_result) result_count = pls_result.height # Extract neuron_ids from the JSON data neuron_ids_from_json = [item["neuron_id"] for item in data] # Iterate over the neuron_ids from the JSON and check against the DataFrame for neuron_id in neuron_ids_from_json: if not pls_result.filter(pls.col("neuron_id") == neuron_id).height: # If the neuron_id is not found in the DataFrame, insert it with an empty neuron_name pls_result = pls.concat([pls_result, pls.DataFrame({"neuron_id": [neuron_id], "neuron_name": [""]})], how="vertical") fn_list = pls_result.to_dicts() # Create dictionary structure result_dict = {'result_count':result_count,'neuron_list': fn_list} return result_dict def LoadMasterData(self): # Drop my_table if it already exists self.db.drop_table("t_class_master", if_exists=True) # Create a table class_table = self.db.create_table("t_class_master", {"id": "varchar","vec": "vector, 4, float", "body": "varchar", "req": "varchar", "keyid": "varchar"}, None) residx = class_table.create_index("class_master_index", [index.IndexInfo("body", index.IndexType.FullText, [index.InitParameter("ANALYZER", "segmentation")]), ], None) class_table.insert([{"id": 1, "vec": [-0.055, -0.136, -0.070, 0.025], "body": "Add or Create or New", "keyid": "", "req": ""}]) class_table.insert([{"id": 2, "vec": [-0.015, 0.106, 0.031, 0.060], "body": "Edit or Alter or Modify", "keyid": "", "req": ""}]) class_table.insert([{"id": 3, "vec": [-0.080, -0.026, -0.045, 0.025], "body": "View or Show or List", "keyid": "", "req": ""}]) class_table.insert([{"id": 4, "vec": [-0.056, 0.045, -0.025, 0.004], "body": "Delete or Remove or Drop", "keyid": "", "req": ""}]) def LoadIntentData(self): #kun project connectivity # Drop my_table if it already exists self.db.drop_table("t_class_intents", if_exists=True) # Create a table named "t_class_intents" class_table = self.db.create_table("t_class_intents", {"intent_id": "varchar", "intent_name":"varchar", "int_vec": "vector, 4, float", "neuron_id": "varchar", "neuron_name": "varchar", "data_txt": "varchar"}, None) residx1 = class_table.create_index("class_neuron_index", [index.IndexInfo("neuron_name", index.IndexType.FullText, []), ], None) class_table.insert([{"intent_id": 1, "int_vec": [-0.056, 0.018, -0.064, 0.010], "intent_name": "add_class", "neuron_id": "111", "neuron_name": "add_class", "data_txt":""}]) class_table.insert([{"intent_id": 2, "int_vec": [-0.076, 0.067, -0.055, 0.060], "intent_name": "edit_class", "neuron_id": "112", "neuron_name": "edit_class", "data_txt":""}]) class_table.insert([{"intent_id": 2, "int_vec": [-0.076, 0.067, -0.055, 0.060], "intent_name": "edit_class", "neuron_id": "113", "neuron_name": "modify_class", "data_txt":""}]) class_table.insert([{"intent_id": 2, "int_vec": [-0.076, 0.067, -0.055, 0.060], "intent_name": "edit_class", "neuron_id": "114", "neuron_name": "change_class", "data_txt":""}]) class_table.insert([{"intent_id": 3, "int_vec": [-0.051, -0.004, -0.085, -0.020], "intent_name": "view_class", "neuron_id": "115", "neuron_name": "view_class", "data_txt":""}]) class_table.insert([{"intent_id": 3, "int_vec": [-0.051, -0.004, -0.085, -0.020], "intent_name": "view_class", "neuron_id": "116", "neuron_name": "list_class", "data_txt":""}]) class_table.insert([{"intent_id": 4, "int_vec": [-0.051, 0.082, -0.054, -0.026], "intent_name": "delete_class", "neuron_id": "223", "neuron_name": "delete_class", "data_txt":""}]) # index creation residx = class_table.create_index("class_intent_index", [index.IndexInfo("intent_name", index.IndexType.FullText, [index.InitParameter("ANALYZER", "segmentation")]), ], None) def LoadClassDetails(self): # Drop my_table if it already exists self.db.drop_table("fnclass_table", if_exists=True) # Create a table named "fnclass_table" class_table = self.db.create_table("fnclass_table", {"id": "varchar","vec": "vector, 4, float", "body": "varchar", "req": "varchar", "keyid": "varchar"}, None) #kun project connectivity class_table.insert([{"id": 6, "vec": [-0.037, 0.141, 0.007, 0.088], "body":"Power Hour", "keyid": "202308080000721", "req": "{\"Req\":{\"Type\":\"MFCD\", \"CRUD\": \"R\", \"fjid\":\"\", \"keyid\":\"202308080000721\", \"classid\": 6 }}"}]) class_table.insert([{"id": 82, "vec": [-0.035, -0.027, -0.071, 0.084], "body":"Primal Fitness", "keyid": "202309020000121", "req":"{\"Req\":{\"Type\":\"MFCD\", \"CRUD\": \"R\", \"fjid\":\"\", \"keyid\":\"202308290000121\", \"classid\": 70 }}"}]) class_table.insert([{"id": 86, "vec": [-0.045, -0.132, -0.001, -0.016], "body":"Dance New", "keyid": "202308290000121", "req":"{\"Req\":{\"Type\":\"MFCD\", \"CRUD\": \"R\", \"fjid\":\"\", \"keyid\":\"202308290000121\", \"classid\": 86 }}"}]) class_table.insert([{"id": 80, "vec": [-0.022, 0.060, -0.060, -0.015], "body":"ABC Class", "keyid": "202308290000121", "req": "{\"Req\":{\"Type\":\"MFCD\", \"CRUD\": \"R\", \"fjid\":\"\", \"keyid\":\"202308290000121\", \"classid\": 80 }}"}]) class_table.insert([{"id": 69, "vec": [-0.081, -0.023, 0.003, 0.101], "body":"Made of Muscle", "keyid": "202308290000121", "req": "{\"Req\":{\"Type\":\"MFCD\", \"CRUD\": \"R\", \"fjid\":\"\", \"keyid\":\"202308290000121\", \"classid\": 69 }}"}]) class_table.insert([{"id": 68, "vec": [-0.096, -0.028, -0.066, 0.043], "body":"Super Solid", "keyid": "202308290000121", "req": "{\"Req\":{\"Type\":\"MFCD\", \"CRUD\": \"R\", \"fjid\":\"\", \"keyid\":\"202308290000121\", \"classid\": 68 }}"}]) class_table.insert([{"id": 67, "vec": [-0.088, -0.053, -0.049, -0.019], "body":"One More Rep", "keyid": "202308290000121", "req": "{\"Req\":{\"Type\":\"MFCD\", \"CRUD\": \"R\", \"fjid\":\"\", \"keyid\":\"202308290000121\", \"classid\": 67 }}"}]) class_table.insert([{"id": 92, "vec": [-0.096, 0.063, 0.068, -0.006], "body":"Aqua Fit", "keyid": "202308290000121", "req": "{\"Req\":{\"Type\":\"MFCD\", \"CRUD\": \"R\", \"fjid\":\"\", \"keyid\":\"202308290000121\", \"classid\": 92 }}"}]) class_table.insert([{"id": 94, "vec": [-0.133, 0.025, 0.072, -0.020], "body":"Aqua Fit1", "keyid": "202308290000121", "req": "{\"Req\":{\"Type\":\"MFCD\", \"CRUD\": \"R\", \"fjid\":\"\", \"keyid\":\"202308290000121\", \"classid\": 94 }}"}]) class_table.insert([{"id": 95, "vec": [0.003, 0.045, 0.010, -0.026], "body":"Power Play Limited", "keyid": "202308290000121", "req":"{\"Req\":{\"Type\":\"MFCD\", \"CRUD\": \"R\", \"fjid\":\"\", \"keyid\":\"202308290000121\", \"classid\": 95 }}"}]) class_table.insert([{"id": 4, "vec": [-0.037, 0.141, 0.007, 0.088], "body": "Zumba", "keyid": "202308080000721", "req": "{\"Req\":{\"Type\":\"MFCD\", \"CRUD\": \"R\", \"fjid\":\"\", \"keyid\":\"202308080000721\", \"classid\": 4 }}"}]) class_table.insert([{"id": 8, "vec": [-0.035, -0.027, -0.071, 0.084], "body": "Cycling", "keyid": "202308080000721", "req": "{\"Req\":{\"Type\":\"MFCD\", \"CRUD\": \"R\", \"fjid\":\"\", \"keyid\":\"202308080000721\", \"classid\": 8 }}"}]) class_table.insert([{"id": 30, "vec": [-0.045, -0.132, -0.001, -0.016], "body": "Yoga", "keyid": "202308190000121", "req": "{\"Req\":{\"Type\":\"MFCD\", \"CRUD\": \"R\", \"fjid\":\"\", \"keyid\":\"202308190000121\", \"classid\": 30 }}"}]) class_table.insert([{"id": 80, "vec": [-0.022, 0.060, -0.060, -0.015], "body": "Jumba", "keyid": "202308290000221", "req": "{\"Req\":{\"Type\":\"MFCD\", \"CRUD\": \"R\", \"fjid\":\"\", \"keyid\":\"202308290000221\", \"classid\": 78 }}"}]) class_table.insert([{"id": 83, "vec": [-0.081, -0.023, 0.003, 0.101], "body": "Cardio", "keyid": "202308290000221", "req": "{\"Req\":{\"Type\":\"MFCD\", \"CRUD\": \"R\", \"fjid\":\"\", \"keyid\":\"202308290000221\", \"classid\": 83 }}"}]) # index creation residx1 = class_table.create_index("class_dtls_index", [index.IndexInfo("body", index.IndexType.FullText, [index.InitParameter("ANALYZER", "segmentation")]), ], None) def GetClassStateAction(self,intent:str,userid:str,session:str)->str: sts_table_obj = self.db.get_table("fnclass_states_table") act_table_obj = self.db.get_table("fnclass_actions_table") int_table_obj = self.db.get_table("t_class_intents") #get intent id intent_result = int_table_obj.query_builder.filter(f"intent_name='{intent}'").output(["intent_id"]).to_pl() pl_intent = pls.DataFrame(intent_result) intent_id = pl_intent[0,"intent_id"] if session not in (None,""): curr_usr_state = "" #get current user state from session table usess_table_obj = self.db.get_table("user_session_table") session_result = usess_table_obj.query_builder.filter(f"user_id='{userid}' and session_id='{session}' and active=1").output(["session_id","state_id","action_id"]).to_pl() ss_intent = pls.DataFrame(session_result) curr_stateid = ss_intent[0,"state_id"] curr_actionid = ss_intent[0,"action_id"] #get next state from current state state_result = sts_table_obj.query_builder.filter(f"ref_id='{intent_id}' and state_id='{curr_stateid}'").output(["nxt_state_id"]).to_pl() pl_state = pls.DataFrame(state_result) nxt_state_id = pl_state[0,"nxt_state_id"] #get state details based on previous value state_result = sts_table_obj.query_builder.filter(f"ref_id='{intent_id}' and state_id='{nxt_state_id}'").output(["state_id","state_name"]).to_pl() pl_state = pls.DataFrame(state_result) state_id = pl_state[0,"state_id"] state_name = pl_state[0,"state_name"] #get next action from current action action_result = act_table_obj.query_builder.filter(f"ref_id='{intent_id}' and action_id='{curr_actionid}'").output(["nxt_action_id"]).to_pl() pl_action = pls.DataFrame(action_result) nxt_action_id = pl_action[0,"nxt_action_id"] #get action details based on previous value action_result = act_table_obj.query_builder.filter(f"ref_id='{intent_id}' and action_id='{nxt_action_id}'").output(["action_id","action_name"]).to_pl() pl_action = pls.DataFrame(action_result) action_id = pl_action[0,"action_id"] action_name = pl_action[0,"action_name"] else: #get state details - get first state state_result = sts_table_obj.query_builder.filter(f"ref_id='{intent_id}' and state_id='1'").output(["state_id","state_name","nxt_state_id"]).to_pl() pl_state = pls.DataFrame(state_result) state_id = pl_state[0,"state_id"] state_name = pl_state[0,"state_name"] nnxt_state_id = pl_state[0,"nxt_state_id"] #get action details - get first action action_result = act_table_obj.query_builder.filter(f"ref_id='{intent_id}' and action_id='1'").output(["action_id","action_name","nxt_action_id"]).to_pl() pl_action = pls.DataFrame(action_result) action_id = pl_action[0,"action_id"] action_name = pl_action[0,"action_name"] nnxt_action_id = pl_action[0,"nxt_action_id"] #result #set session value for futher processing session = ComProcess.ComProcess.generate_unique_string() self.ManageUserSession(userid,session,state_id,action_id) result = {"user_id":userid,"session_id":session,"state_id":state_id,"state_name":state_name,"action_id":action_id,"action_name":action_name} return result def LoadClassStatesMaster(self): # Drop my_table if it already exists self.db.drop_table("fnclass_states_table", if_exists=True) # Create a table named "fnclass_states_table" class_table = self.db.create_table("fnclass_states_table", {"state_id": "varchar", "ref_id": "varchar", "nxt_state_id": "varchar", "state_name": "varchar" }, None) #kun project connectivity class_table.insert([{"state_id": 1, "ref_id": "1", "nxt_state_id":"2", "state_name":"Name"}]) class_table.insert([{"state_id": 2, "ref_id": "1", "nxt_state_id":"3", "state_name":"Intensity"}]) class_table.insert([{"state_id": 3, "ref_id": "1", "nxt_state_id":"4", "state_name":"Payment"}]) class_table.insert([{"state_id": 4, "ref_id": "1", "nxt_state_id":"", "state_name":"Success"}]) # index creation st_inx = class_table.create_index("class_state_index", [index.IndexInfo("state_name", index.IndexType.FullText, [index.InitParameter("ANALYZER", "segmentation")]), ], None) def LoadClassActionsMaster(self): # Drop my_table if it already exists self.db.drop_table("fnclass_actions_table", if_exists=True) # Create a table named "fnclass_states_table" class_table = self.db.create_table("fnclass_actions_table", {"action_id": "varchar", "ref_id": "varchar", "nxt_action_id": "varchar", "action_name": "varchar" }, None) #kun project connectivity class_table.insert([{"action_id": 1, "ref_id": "1", "nxt_action_id":"2", "action_name":"ask_class_name"}]) class_table.insert([{"action_id": 2, "ref_id": "1", "nxt_action_id":"3", "action_name":"ask_class_Intensity"}]) class_table.insert([{"action_id": 3, "ref_id": "1", "nxt_action_id":"4", "action_name":"ask_Payment"}]) class_table.insert([{"action_id": 4, "ref_id": "1", "nxt_action_id":"", "action_name":"Success"}]) # index creation ac_inx = class_table.create_index("class_action_index", [index.IndexInfo("action_name", index.IndexType.FullText, [index.InitParameter("ANALYZER", "segmentation")]), ], None) def ManageUserSession(self,userid:str,session:str,stateid:str,actionid:str,nxt_state:str="",nxt_action:str="")->bool: session_tbl_obj = None try: # check table if it already exists session_tbl_obj = self.db.get_table("user_session_table") self.InsertUpdateUserSession(userid,session,stateid,actionid) except Exception as e: # Create a table named "user_session_table" usr_ssn_table = self.db.create_table("user_session_table", {"user_id": "varchar", "session_id": "varchar", "state_id": "varchar", "action_id": "varchar", "nxt_state_id":"varchar", "nxt_action_id":"varchar", "active": "integer", "cdate": "varchar" }, None) self.InsertUpdateUserSession(userid,session,stateid,actionid) def ManageSessions(self,userid:str,session:str,stateid:str,actionid:str,nxt_state:str="",nxt_action:str="")->bool: session_tbl_obj = None if session == "": session = ComProcess.ComProcess.generate_unique_string() try: # check table if it already exists session_tbl_obj = self.db.get_table("user_session_table") self.InsertUpdateSession(userid,session,stateid,actionid,nxt_state,nxt_action) except Exception as e: # Create a table named "user_session_table" usr_ssn_table = self.db.create_table("user_session_table", {"user_id": "varchar", "session_id": "varchar", "state_id": "varchar", "action_id": "varchar", "nxt_state_id":"varchar", "nxt_action_id":"varchar", "active": "integer", "cdate": "varchar" }, None) self.InsertUpdateSession(userid,session,stateid,actionid,nxt_state,nxt_action) result = {"user_id":userid,"session_id":session,"cstate_id":stateid,"nxt_state_id":nxt_state,"caction_id":actionid,"nxt_action_id":nxt_action} return result def InsertUpdateUserSession(self,userid:str,session:str,stateid:str,actionid:str,nxt_state:str="",nxt_action:str="",active:int=1): ''' first check whether user session exists, if exists then update active flag to 0 then insert new session info with active flag 1. if user session not exists then insert with active flag 1(so else case not required) ''' try: session_tbl_obj = self.db.get_table("user_session_table") usr_session = session_tbl_obj.query_builder.filter(f"user_id='{userid}' and session_id='{session}'").output(["*"]).to_pl() pl_session = pls.DataFrame(usr_session) if pl_session.height > 0: session_tbl_obj.update(f"user_id='{userid}' and session_id='{session}'",[{"active":0}]) #else: session_tbl_obj.insert([{"user_id": userid, "session_id": session, "state_id":stateid, "action_id":actionid, "nxt_state_id":nxt_state, "nxt_action_id":nxt_action, "active":1, "cdate":datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}]) except Exception as e: msg = str(e) def InsertUpdateSession(self,userid:str,session:str,stateid:str,actionid:str,nxt_state:str="",nxt_action:str="",active:int=1): ''' first check whether user session exists, if exists then update active flag to 0 then insert new session info with active flag 1. if user session not exists then insert with active flag 1(so else case not required) ''' try: session_tbl_obj = self.db.get_table("user_session_table") usr_session = session_tbl_obj.query_builder.filter(f"user_id='{userid}' and session_id='{session}'").output(["*"]).to_pl() pl_session = pls.DataFrame(usr_session) if pl_session.height > 0: session_tbl_obj.update(f"user_id='{userid}' and session_id='{session}'",[{ "state_id":stateid, "action_id":actionid, "nxt_state_id":nxt_state, "nxt_action_id":nxt_action, "active":1, "cdate":datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}]) else: session_tbl_obj.insert([{"user_id": userid, "session_id": session, "state_id":stateid, "action_id":actionid, "nxt_state_id":nxt_state, "nxt_action_id":nxt_action, "active":1, "cdate":datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")}]) except Exception as e: msg = str(e)