Skip to content

Commit

Permalink
Reworking logger to fix the weird error where it dupes logs - p3
Browse files Browse the repository at this point in the history
  • Loading branch information
DefinetlyNotAI committed Sep 23, 2024
1 parent 9236f07 commit d705693
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 21 deletions.
13 changes: 9 additions & 4 deletions CODE/Logicytics.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@ def special_run(file_path: str):
log.info("Opening debug menu...")
special_run("_debug.py")

check_status.sys_internal_zip()
messages = check_status.sys_internal_zip()
if messages:
log.debug(messages)

if action == "dev":
log.info("Opening developer menu...")
Expand Down Expand Up @@ -166,8 +168,11 @@ def special_run(file_path: str):

# Check weather to use threading or not
if action == "threaded":
log.warning("Threading does not support sensitive data miner yet, ignoring")
execution_list.remove("sensitive_data_miner.py")
def threaded_execution(execution_list_thread, index_thread):
thread_log = Execute(log_variable=log).file(execution_list_thread, index_thread)
if thread_log[0]:
log.info(thread_log[0])
log.info(thread_log[1])
threads = []
for index, file in enumerate(execution_list):
thread = threading.Thread(
Expand All @@ -184,7 +189,7 @@ def special_run(file_path: str):
thread.join()
else:
for file in range(len(execution_list)): # Loop through List
Execute().execute_script(execution_list[file])
log.info(Execute(log_variable=log).execute_script(execution_list[file]))
log.info(f"{execution_list[file]} executed")

# Zip generated files
Expand Down
46 changes: 29 additions & 17 deletions CODE/__lib_class.py
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ def uac(self) -> bool:
return int(value.strip("\n")) == 1

@staticmethod
def sys_internal_zip():
def sys_internal_zip() -> str:
"""
Extracts the SysInternal_Suite zip file if it exists and is not ignored.
Expand All @@ -367,23 +367,30 @@ def sys_internal_zip():
zip_file = os.path.exists("SysInternal_Suite/SysInternal_Suite.zip")

if zip_file and not ignore_file:
print("Extracting SysInternal_Suite zip")
with zipfile.ZipFile(
"SysInternal_Suite/SysInternal_Suite.zip"
) as zip_ref:
zip_ref.extractall("SysInternal_Suite")
return "SysInternal_Suite zip extracted"

elif ignore_file:
Log(debug=DEBUG).debug(
return (
"Found .sys.ignore file, skipping SysInternal_Suite zip extraction"
)

except Exception as err:
print(f"Failed to unzip SysInternal_Suite: {err}", "_L", "G", "CS")
exit(f"Failed to unzip SysInternal_Suite: {err}")


class Execute:
def __init__(self, log_variable=None):
"""
Initializes an instance of the class.
Sets the Actions attribute to an instance of the Actions class.
"""
self.log_variable = log_variable

@staticmethod
def get_files(directory: str, file_list: list) -> list:
"""
Expand All @@ -403,7 +410,7 @@ def get_files(directory: str, file_list: list) -> list:
file_list.append(filename)
return file_list

def file(self, execution_list: list, Index: int):
def file(self, execution_list: list, Index: int) -> tuple[str, str]:
# IT IS USED, DO NOT REMOVE
"""
Executes a file from the execution list at the specified index.
Expand All @@ -413,27 +420,31 @@ def file(self, execution_list: list, Index: int):
Returns:
None
"""
self.execute_script(execution_list[Index])
Log().info(f"{execution_list[Index]} executed")
log_message = self.execute_script(execution_list[Index])
return log_message, f"{execution_list[Index]} executed"

def execute_script(self, script: str):
def execute_script(self, script: str, logvar=None) -> str:
"""
Executes a script file and handles its output based on the file extension.
Parameters:
script (str): The path of the script file to be executed.
logvar (Log): The log variable to use for logging.
Returns:
None
"""
if logvar is None:
logvar = self.log_variable
if script.endswith(".ps1"):
self.__unblock_ps1_script(script)
self.__run_other_script(script)
log_message = self.__unblock_ps1_script(script)
self.__run_other_script(script, logvar)
elif script.endswith(".py"):
self.__run_python_script(script)
else:
self.__run_other_script(script)
self.__run_other_script(script, logvar)
return log_message

@staticmethod
def __unblock_ps1_script(script: str):
def __unblock_ps1_script(script: str) -> str:
"""
Unblocks and runs a PowerShell (.ps1) script.
Parameters:
Expand All @@ -444,9 +455,9 @@ def __unblock_ps1_script(script: str):
try:
unblock_command = f'powershell.exe -Command "Unblock-File -Path {script}"'
subprocess.run(unblock_command, shell=False, check=True)
Log().info("PS1 Script unblocked.")
return "PS1 Script unblocked."
except Exception as err:
Log().critical(f"Failed to unblock script: {err}")
exit(f"Failed to unblock script: {err}")

@staticmethod
def __run_python_script(script: str):
Expand All @@ -460,10 +471,11 @@ def __run_python_script(script: str):
result = subprocess.Popen(
["python", script], stdout=subprocess.PIPE
).communicate()[0]
# LEAVE AS PRINT
print(result.decode())

@staticmethod
def __run_other_script(script: str):
def __run_other_script(script: str, logvar=None):
"""
Runs a script with other extensions and logs output based on its content.
Parameters:
Expand All @@ -477,8 +489,8 @@ def __run_other_script(script: str):
)
lines = result.stdout.splitlines()
ID = next((line.split(":")[0].strip() for line in lines if ":" in line), None)
if ID:
Log().string(str(lines), ID)
if ID and logvar:
logvar.string(str(lines), ID)


WEBHOOK, DEBUG, VERSION, API_KEY, CURRENT_FILES = Actions.read_config()

0 comments on commit d705693

Please sign in to comment.