Skip to content

Commit

Permalink
新功能:log_analyse 增加命令行界面,可通过命令行指定一个或多个日志文件进行处理,增强使用灵活性
Browse files Browse the repository at this point in the history
  • Loading branch information
ljk committed Apr 2, 2018
1 parent 0ee56c6 commit 7fed282
Showing 1 changed file with 64 additions and 23 deletions.
87 changes: 64 additions & 23 deletions log_analyse.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,20 @@
from random import choice
from os import path, listdir, chdir
from subprocess import run, PIPE
from sys import exit
from sys import exit, argv as sys_argv
import fcntl
import logging

logging.basicConfig(format='%(asctime)s %(levelname)8s: %(message)s', datefmt='%y-%m-%d %H:%M:%S')
if len(sys_argv) == 1:
argv_log_list = None
elif sys_argv[1] == '-f':
# 对应指定日志文件的情况(非默认用法)
argv_log_list = sys_argv[2:]
else:
print("Usage:\n log_analyse.py [-f <log_path>...]\n Note: the format of log file name should be 'xxx.access[.log]'")
exit(12)

logging.basicConfig(format='%(asctime)s %(levelname)7s: %(message)s', datefmt='%y%m%d %H:%M:%S')
logger = logging.getLogger(__name__)
logger.setLevel(error_level)

Expand All @@ -33,7 +42,7 @@ def process_line(line_str):
"""
processed = log_pattern_obj.search(line_str)
if not processed:
# 如果正则根本就无法匹配该行记录时
# 如果正则无法匹配该行时
logger.warning("Can't parse line: {}".format(line_str))
return
# remote_addr字段在有反向代理的情况下多数时候是无意义的(仅代表反向代理的ip),
Expand All @@ -49,7 +58,7 @@ def process_line(line_str):
return
request_method = request_further.group('request_method')
request_uri = request_further.group('request_uri')
# 对uri和args进行抽象化
# 对uri和args进行抽象
uri_abs, args_abs = text_abstract(request_uri, site_name)

# 状态码, 字节数, 响应时间
Expand Down Expand Up @@ -268,12 +277,22 @@ def append_line_to_main_stage(line_res, main_stage):


def main(log_name):
"""log_name:日志文件名"""
"""log_name: 日志文件名"""
if argv_log_list:
argv_log_dir = path.dirname(log_name) if path.dirname(log_name) else '.'
log_name = path.basename(log_name)
chdir(argv_log_dir)
else:
chdir(log_dir)
if not path.isfile(log_name) or log_name.split('.access')[0] not in todo:
return
if '.access' not in log_name:
logger.error("the format of log file name should be 'xxx.access[.log]'")
logger.error("{}, the format of log file name should be 'xxx.access[.log]'".format(log_name))
return

global site_name
site_name = log_name.split('.access')[0].replace('.', '') # 即mongodb中的库名(将域名中的.去掉)

invalid = 0 # 无效请求数
# main_stage: 存储处理过程中, 用于保存一分钟内的各项原始数据
main_stage = {'source': {'from_cdn': {'hits': 0, 'bytes': 0, 'time': 0},
Expand Down Expand Up @@ -316,21 +335,49 @@ def reset_every_minute():
except Exception as err:
logger.error(str(err))
return
# 根据当前行数和mongodb中记录的last_num对比, 决定本次要处理的行数范围

n = processed_num = 0
for line_str in fp:
n += 1
if n <= last_num:
continue
elif n > cur_num:
break

# 开始处理
if not argv_log_list:
# 默认方式(通过配置文件)运行时: 根据cur_num和mongodb中记录的last_num对比, 决定本次要处理的行数范围
if n <= last_num:
continue
elif n > cur_num:
break
# 开始解析行
line_res = process_line(line_str)
if not line_res:
invalid += 1
continue
date, hour, minute = line_res['time_local'].split(':')[:3]
if date == log_date_ori:
y_m_d = log_date
else:
# 对应一个日志文件中包含跨天日志内容的情况
d_m_y = date.split('/')
y_m_d = d_m_y[2][2:] + month_dict[d_m_y[1]] + d_m_y[0]
log_date_prev = log_date
log_date_ori = date
log_date = y_m_d
last_num, last_date_time = get_prev_info(y_m_d)
if this_h_m:
generate_bulk_docs()
#if bulk_documents:
try:
insert_mongo(mongo_db, bulk_documents, n - 1, log_date_prev + this_h_m)
bulk_documents = []
except Exception:
return
reset_every_minute()
n = 1
else:
n = 1
continue
if argv_log_list:
# 在命令行指定日志文件时: 根据日志中的ymdhm和mongodb中记录的last_date_time对比, 决定本次要处理的行数范围
if last_date_time and y_m_d + hour + minute <= last_date_time:
continue

# 分钟粒度交替时: 从临时字典中汇总上一分钟的结果并将其入库
if this_h_m != '' and this_h_m != hour + minute:
Expand All @@ -346,13 +393,6 @@ def reset_every_minute():
logger.info('{} processed to {}'.format(log_name, line_res['time_local']))

# 不到分钟粒度交替时:
# y_m_d和this_h_m行用于生成mongodb中文档的_id
if date == log_date_ori:
y_m_d = log_date
else:
# 对应一个日志文件中包含跨天日志内容的情况
d_m_y = date.split('/')
y_m_d = d_m_y[2][2:] + month_dict[d_m_y[1]] + d_m_y[0]
processed_num += 1
this_h_m = hour + minute
append_line_to_main_stage(line_res, main_stage) # 对每一行的解析结果进行处理
Expand All @@ -378,9 +418,10 @@ def reset_every_minute():
fcntl.flock(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB) # 实现单例执行
except BlockingIOError:
exit(11)
chdir(log_dir)
# 日志文件名格式必须为xxx.access.log, 以便取得app(站点)名称xxx
logs_list = [i for i in listdir(log_dir) if '.access' in i and path.isfile(i) and i.split('.access')[0] in todo]
if argv_log_list:
logs_list = argv_log_list
else:
logs_list = listdir(log_dir)
if len(logs_list) > 0:
try:
with Pool(len(logs_list)) as p:
Expand Down

0 comments on commit 7fed282

Please sign in to comment.