diff --git a/log_analyse.py b/log_analyse.py index 9eae93a..3c1c26f 100644 --- a/log_analyse.py +++ b/log_analyse.py @@ -10,11 +10,20 @@ from random import choice from os import path, listdir, chdir from subprocess import run, PIPE -from sys import exit +from sys import exit, argv as sys_argv import fcntl import logging -logging.basicConfig(format='%(asctime)s %(levelname)8s: %(message)s', datefmt='%y-%m-%d %H:%M:%S') +if len(sys_argv) == 1: + argv_log_list = None +elif sys_argv[1] == '-f': + # 对应指定日志文件的情况(非默认用法) + argv_log_list = sys_argv[2:] +else: + print("Usage:\n log_analyse.py [-f ...]\n Note: the format of log file name should be 'xxx.access[.log]'") + exit(12) + +logging.basicConfig(format='%(asctime)s %(levelname)7s: %(message)s', datefmt='%y%m%d %H:%M:%S') logger = logging.getLogger(__name__) logger.setLevel(error_level) @@ -33,7 +42,7 @@ def process_line(line_str): """ processed = log_pattern_obj.search(line_str) if not processed: - # 如果正则根本就无法匹配该行记录时 + # 如果正则无法匹配该行时 logger.warning("Can't parse line: {}".format(line_str)) return # remote_addr字段在有反向代理的情况下多数时候是无意义的(仅代表反向代理的ip), @@ -49,7 +58,7 @@ def process_line(line_str): return request_method = request_further.group('request_method') request_uri = request_further.group('request_uri') - # 对uri和args进行抽象化 + # 对uri和args进行抽象 uri_abs, args_abs = text_abstract(request_uri, site_name) # 状态码, 字节数, 响应时间 @@ -268,12 +277,22 @@ def append_line_to_main_stage(line_res, main_stage): def main(log_name): - """log_name:日志文件名""" + """log_name: 日志文件名""" + if argv_log_list: + argv_log_dir = path.dirname(log_name) if path.dirname(log_name) else '.' + log_name = path.basename(log_name) + chdir(argv_log_dir) + else: + chdir(log_dir) + if not path.isfile(log_name) or log_name.split('.access')[0] not in todo: + return if '.access' not in log_name: - logger.error("the format of log file name should be 'xxx.access[.log]'") + logger.error("{}, the format of log file name should be 'xxx.access[.log]'".format(log_name)) return + global site_name site_name = log_name.split('.access')[0].replace('.', '') # 即mongodb中的库名(将域名中的.去掉) + invalid = 0 # 无效请求数 # main_stage: 存储处理过程中, 用于保存一分钟内的各项原始数据 main_stage = {'source': {'from_cdn': {'hits': 0, 'bytes': 0, 'time': 0}, @@ -316,21 +335,49 @@ def reset_every_minute(): except Exception as err: logger.error(str(err)) return - # 根据当前行数和mongodb中记录的last_num对比, 决定本次要处理的行数范围 + n = processed_num = 0 for line_str in fp: n += 1 - if n <= last_num: - continue - elif n > cur_num: - break - - # 开始处理 + if not argv_log_list: + # 默认方式(通过配置文件)运行时: 根据cur_num和mongodb中记录的last_num对比, 决定本次要处理的行数范围 + if n <= last_num: + continue + elif n > cur_num: + break + # 开始解析行 line_res = process_line(line_str) if not line_res: invalid += 1 continue date, hour, minute = line_res['time_local'].split(':')[:3] + if date == log_date_ori: + y_m_d = log_date + else: + # 对应一个日志文件中包含跨天日志内容的情况 + d_m_y = date.split('/') + y_m_d = d_m_y[2][2:] + month_dict[d_m_y[1]] + d_m_y[0] + log_date_prev = log_date + log_date_ori = date + log_date = y_m_d + last_num, last_date_time = get_prev_info(y_m_d) + if this_h_m: + generate_bulk_docs() + #if bulk_documents: + try: + insert_mongo(mongo_db, bulk_documents, n - 1, log_date_prev + this_h_m) + bulk_documents = [] + except Exception: + return + reset_every_minute() + n = 1 + else: + n = 1 + continue + if argv_log_list: + # 在命令行指定日志文件时: 根据日志中的ymdhm和mongodb中记录的last_date_time对比, 决定本次要处理的行数范围 + if last_date_time and y_m_d + hour + minute <= last_date_time: + continue # 分钟粒度交替时: 从临时字典中汇总上一分钟的结果并将其入库 if this_h_m != '' and this_h_m != hour + minute: @@ -346,13 +393,6 @@ def reset_every_minute(): logger.info('{} processed to {}'.format(log_name, line_res['time_local'])) # 不到分钟粒度交替时: - # y_m_d和this_h_m行用于生成mongodb中文档的_id - if date == log_date_ori: - y_m_d = log_date - else: - # 对应一个日志文件中包含跨天日志内容的情况 - d_m_y = date.split('/') - y_m_d = d_m_y[2][2:] + month_dict[d_m_y[1]] + d_m_y[0] processed_num += 1 this_h_m = hour + minute append_line_to_main_stage(line_res, main_stage) # 对每一行的解析结果进行处理 @@ -378,9 +418,10 @@ def reset_every_minute(): fcntl.flock(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) # 实现单例执行 except BlockingIOError: exit(11) - chdir(log_dir) - # 日志文件名格式必须为xxx.access.log, 以便取得app(站点)名称xxx - logs_list = [i for i in listdir(log_dir) if '.access' in i and path.isfile(i) and i.split('.access')[0] in todo] + if argv_log_list: + logs_list = argv_log_list + else: + logs_list = listdir(log_dir) if len(logs_list) > 0: try: with Pool(len(logs_list)) as p: