Skip to content

Commit

Permalink
update 1.6.0
Browse files Browse the repository at this point in the history
update1.6.0
  • Loading branch information
LoRexxar authored Jun 10, 2019
2 parents ab8004c + 4ce1cbe commit a0229e8
Show file tree
Hide file tree
Showing 13 changed files with 220 additions and 133 deletions.
2 changes: 1 addition & 1 deletion cobra/__version__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
__issue_page__ = 'https://github.com/LoRexxar/Cobra-W/issues/new'
__python_version__ = sys.version.split()[0]
__platform__ = platform.platform()
__version__ = '1.5.0'
__version__ = '1.6.0'
__author__ = 'LoRexxar'
__author_email__ = 'LoRexxar@gmail.com'
__license__ = 'MIT License'
Expand Down
73 changes: 33 additions & 40 deletions cobra/cast.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@


class CAST(object):
languages = ['php', 'java', 'sol']
languages = {'php': "php",
'java': "java",
'sol': "sol",
'js': "javascript"}

def __init__(self, rule, target_directory, file_path, line, code, files=None, rule_class=None, repair_functions=[], controlled_params=[]):
self.target_directory = target_directory
Expand All @@ -44,7 +47,7 @@ def __init__(self, rule, target_directory, file_path, line, code, files=None, ru

for language in self.languages:
if self.file_path[-len(language):].lower() == language:
self.language = language
self.language = self.languages[language]

os.chdir(self.target_directory)
# Parse rule
Expand Down Expand Up @@ -227,20 +230,8 @@ def is_controllable_param(self):
logger.debug("[AST] String have variables: `Yes`")

# variable
if param_name[:1] == '$':
if self.language == 'php':
logger.debug("[AST] Is variable: `Yes`")

# Get assign code block
# param_block_code = self.block_code(0)
# fi = codecs.open(self.file_path, "r", encoding='utf-8', errors='ignore')
# param_content = fi.read()

# param_content = ast_object.get_nodes(self.file_path)
#
# if param_content is False:
# logger.debug("[AST] Can't get assign code block")
# return True, self.data

logger.debug("[Deep AST] Start AST for param {param_name}".format(param_name=param_name))

_is_co, _cp, expr_lineno, chain = anlysis_params(param_name, self.file_path, self.line, self.sr.vul_function, self.repair_functions, self.controlled_list, isexternal=True)
Expand All @@ -257,32 +248,34 @@ def is_controllable_param(self):
else:
continue

else:
if self.language == 'java':
# Java variable didn't have `$`
param_block_code = self.block_code(0)
if param_block_code is False:
logger.debug("Can't get block code")
return True, self.data
logger.debug("[AST] Block code: ```{language}\r\n{code}```".format(language=self.language,
code=param_block_code))
regex_assign_string = self.regex[self.language]['assign_string'].format(re.escape(param_name))
string = re.findall(regex_assign_string, param_block_code)
if len(string) >= 1 and string[0] != '':
logger.debug("[AST] Is assign string: `Yes`")
continue
# return False, self.data
logger.debug("[AST] Is assign string: `No`")

# Is assign out data
regex_get_param = r'String\s{0}\s=\s\w+\.getParameter(.*)'.format(re.escape(param_name))
get_param = re.findall(regex_get_param, param_block_code)
if len(get_param) >= 1 and get_param[0] != '':
logger.debug("[AST] Is assign out data: `Yes`")
continue
# False, self.data
logger.debug("[AST] Is assign out data: `No`")
# else:
elif self.language == 'java':
# Java variable didn't have `$`
param_block_code = self.block_code(0)
if param_block_code is False:
logger.debug("Can't get block code")
return True, self.data
logger.debug("[AST] Block code: ```{language}\r\n{code}```".format(language=self.language,
code=param_block_code))
regex_assign_string = self.regex[self.language]['assign_string'].format(re.escape(param_name))
string = re.findall(regex_assign_string, param_block_code)
if len(string) >= 1 and string[0] != '':
logger.debug("[AST] Is assign string: `Yes`")
continue
# return False, self.data
logger.debug("[AST] Is assign string: `No`")

# Is assign out data
regex_get_param = r'String\s{0}\s=\s\w+\.getParameter(.*)'.format(re.escape(param_name))
get_param = re.findall(regex_get_param, param_block_code)
if len(get_param) >= 1 and get_param[0] != '':
logger.debug("[AST] Is assign out data: `Yes`")
continue
# False, self.data
logger.debug("[AST] Is assign out data: `No`")
return True, self.data

else:
logger.debug("[AST] Not Java/PHP, can't parse ({l})".format(l=self.language))
continue
# return False, self.data
Expand Down
12 changes: 7 additions & 5 deletions cobra/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@ def get_sid(target, is_a_sid=False):
def start(target, formatter, output, special_rules, a_sid=None, language=None, secret_name=None, black_path=None):
"""
Start CLI
:param secret_id: secret id or name?
:param black_path:
:param secret_name:
:param language:
:param target: File, FOLDER, GIT
:param formatter:
:param output:
Expand All @@ -61,7 +63,7 @@ def start(target, formatter, output, special_rules, a_sid=None, language=None, s
r.status(d)

# parse target mode and output mode
pa = ParseArgs(target, formatter, output, special_rules, black_path, a_sid=None)
pa = ParseArgs(target, formatter, output, special_rules, language, black_path, a_sid=None)
target_mode = pa.target_mode
output_mode = pa.output_mode
black_path_list = pa.black_path_list
Expand All @@ -81,10 +83,10 @@ def start(target, formatter, output, special_rules, a_sid=None, language=None, s
main_language = dt.language
main_framework = dt.framework
else:
main_language = language
main_framework = language
main_language = pa.language
main_framework = pa.language

logger.info('[CLI] [STATISTIC] Language: {l} Framework: {f}'.format(l=main_language, f=main_framework))
logger.info('[CLI] [STATISTIC] Language: {l} Framework: {f}'.format(l=",".join(main_language), f=main_framework))
logger.info('[CLI] [STATISTIC] Files: {fc}, Extensions:{ec}, Consume: {tc}'.format(fc=file_count,
ec=len(files),
tc=time_consume))
Expand Down
26 changes: 15 additions & 11 deletions cobra/detection.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@

try: # for pip >= 10
from pip._internal.req import parse_requirements
except ImportError: # for pip <= 9.0.3
except ImportError: # for pip <= 9.0.3
from pip.req import parse_requirements

file_type = []
Expand All @@ -37,7 +37,7 @@ def __init__(self, target_directory, files):
"""
self.target_directory = target_directory
self.files = files
self.lang = None
self.lang = []
self.requirements = None
self.frame_data = {}
self.language_data = {}
Expand All @@ -57,6 +57,7 @@ def language(self):
l_chiefly = 'false'
if language.get('chiefly') is not None:
l_chiefly = language.get('chiefly')

language_extensions[l_name] = {
'chiefly': l_chiefly,
'extensions': []
Expand All @@ -73,18 +74,21 @@ def language(self):
for language, language_info in languages.items():
if ext in language_info['extensions']:
if 'chiefly' in language_info and language_info['chiefly'].lower() == 'true':
logger.debug('[DETECTION] [LANGUAGE] found the chiefly language({language}), maybe have largest, continue...'.format(
language=language))
self.lang = language
logger.debug(
'[DETECTION] [LANGUAGE] found the chiefly language({language}), maybe have largest, continue...'.format(
language=language))
self.lang.append(language)
else:
logger.debug('[DETECTION] [LANGUAGE] not chiefly, continue...'.format(language=language))
tmp_language = language
if self.lang is None:
logger.debug('[DETECTION] [LANGUAGE] not found chiefly language, use the largest language(language) replace'.format(
language=tmp_language))
self.lang = tmp_language
logger.debug('[DETECTION] [LANGUAGE] main language({main_language}), tmp language({tmp_language})'.format(tmp_language=tmp_language,
main_language=self.lang))
if self.lang is []:
logger.debug(
'[DETECTION] [LANGUAGE] not found chiefly language, use the largest language(language) replace'.format(
language=tmp_language))
self.lang.append(tmp_language)
logger.debug('[DETECTION] [LANGUAGE] main languages ({main_language}), tmp language({tmp_language})'.format(
tmp_language=tmp_language,
main_language=",".join(self.lang)))
return self.lang

@property
Expand Down
76 changes: 63 additions & 13 deletions cobra/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from .cast import CAST
from .parser import scan_parser
from .file import FileParseAll
from .file import ext_dict
from rules.autorule import autorule
from prettytable import PrettyTable
from phply import phpast as php
Expand Down Expand Up @@ -139,9 +140,9 @@ def score2level(score):
return '{l}-{s}: {ast}'.format(l=level[:1], s=score_full, ast=a)


def scan_single(target_directory, single_rule, files=None, secret_name=None):
def scan_single(target_directory, single_rule, files=None, language=None, secret_name=None):
try:
return SingleRule(target_directory, single_rule, files, secret_name).process()
return SingleRule(target_directory, single_rule, files, language, secret_name).process()
except Exception:
raise

Expand Down Expand Up @@ -183,7 +184,7 @@ def store(result):
vulnerability=rule.vulnerability,
language=rule.language
))
result = scan_single(target_directory, rule, files, secret_name)
result = scan_single(target_directory, rule, files, language, secret_name)
store(result)

# print
Expand Down Expand Up @@ -249,7 +250,7 @@ def store(result):
'msg': 'scan finished',
'result': {
'vulnerabilities': [x.__dict__ for x in find_vulnerabilities],
'language': language,
'language': ",".join(language),
'framework': framework,
'extension': extension_count,
'file': file_count,
Expand All @@ -262,12 +263,13 @@ def store(result):


class SingleRule(object):
def __init__(self, target_directory, single_rule, files, secret_name=None):
def __init__(self, target_directory, single_rule, files, language=None, secret_name=None):
self.target_directory = target_directory
self.find = Tool().find
self.grep = Tool().grep
self.sr = single_rule
self.files = files
self.languages = language
self.secret_name = secret_name
# Single Rule Vulnerabilities
"""
Expand Down Expand Up @@ -419,7 +421,7 @@ def process(self):
try:
datas = Core(self.target_directory, vulnerability, self.sr, 'project name',
['whitelist1', 'whitelist2'], test=is_test, index=index,
files=self.files, secret_name=self.secret_name).scan()
files=self.files, languages=self.languages, secret_name=self.secret_name).scan()
data = ""

if len(datas) == 3:
Expand All @@ -443,7 +445,7 @@ def process(self):
else:
if reason == 'New Core': # 新的规则
logger.debug('[CVI-{cvi}] [NEW-VUL] New Rules init')
new_rule_vulnerabilities = NewCore(self.sr, self.target_directory, data, self.files, 0, secret_name=self.secret_name)
new_rule_vulnerabilities = NewCore(self.sr, self.target_directory, data, self.files, 0, languages=self.languages, secret_name=self.secret_name)

if len(new_rule_vulnerabilities) > 0:
self.rule_vulnerabilities.extend(new_rule_vulnerabilities)
Expand Down Expand Up @@ -484,7 +486,7 @@ def parse_match(self, single_match):

class Core(object):
def __init__(self, target_directory, vulnerability_result, single_rule, project_name, white_list, test=False,
index=0, files=None, secret_name=None):
index=0, files=None, languages=None, secret_name=None):
"""
Initialize
:param: target_directory:
Expand All @@ -509,13 +511,14 @@ def __init__(self, target_directory, vulnerability_result, single_rule, project_
# self.code_content = vulnerability_result.code_content.strip()
self.code_content = vulnerability_result.code_content
self.files = files
self.languages = languages
self.secret_name = secret_name

self.rule_match = single_rule.match
self.rule_match_mode = single_rule.match_mode
self.vul_function = single_rule.vul_function
self.cvi = single_rule.svid
self.lan = single_rule.language
self.lan = single_rule.language.lower()
self.single_rule = single_rule

self.project_name = project_name
Expand Down Expand Up @@ -629,6 +632,20 @@ def is_can_parse(self):
return True
return False

def is_target(self):
"""
try to find ext for target file and check it wheater target or not
:return:
"""
# get ext for file
fileext = self.file_path.split(".")[-1]

if self.lan in ext_dict and fileext is not None:
if fileext not in ext_dict[self.lan]:
return True

return False

def init_php_repair(self):
"""
初始化修复函数规则
Expand All @@ -655,7 +672,7 @@ def init_php_repair(self):
self.controlled_list += b

except ImportError:
logger.warning('[AST][INIT] Secret_name init error... No nodule named {}'.format(self.secret_name))
logger.warning('[AST][INIT] Secret_name init error... No module named {}'.format(self.secret_name))

# init
for key in self.repair_dict:
Expand Down Expand Up @@ -694,6 +711,10 @@ def scan(self):
logger.debug("[RET] Annotation")
return False, 'Annotation(注释)'

if not self.is_target():
logger.error("[SCAN] file {} ext is not support, something error...".format(self.file_path))
return False, 'Unsupport File'

#
# function-param-regex
# Match(function) -> Param-Controllable -> Repair -> Done
Expand All @@ -704,7 +725,8 @@ def scan(self):
# Match(function) -> vustomize-match() -> Param-Controllable -> Repair -> Done
#
logger.debug('[CVI-{cvi}] match-mode {mm}'.format(cvi=self.cvi, mm=self.rule_match_mode))
if self.file_path[-3:].lower() == 'php':
# if self.file_path[-3:].lower() == 'php':
if self.lan == "php":
try:
self.init_php_repair()
ast = CAST(self.rule_match, self.target_directory, self.file_path, self.line_number,
Expand Down Expand Up @@ -777,7 +799,34 @@ def scan(self):
logger.debug(traceback.format_exc())
return False, 'Exception'

elif self.file_path[-3:].lower() == 'sol':
# elif self.file_path[-3:].lower() == 'sol':
elif self.lan == "solidity":
try:
ast = CAST(self.rule_match, self.target_directory, self.file_path, self.line_number,
self.code_content, files=self.files, rule_class=self.single_rule,
repair_functions=self.repair_functions)

# only match
if self.rule_match_mode == const.mm_regex_only_match:
#
# Regex-Only-Match
# Match(regex) -> Repair -> Done
#
logger.debug("[CVI-{cvi}] [ONLY-MATCH]".format(cvi=self.cvi))
return True, 'Regex-only-match'
elif self.rule_match_mode == const.mm_regex_return_regex:
logger.debug("[CVI-{cvi}] [REGEX-RETURN-REGEX]".format(cvi=self.cvi))
return True, 'Regex-return-regex'
else:
logger.warn("[CVI-{cvi} [OTHER-MATCH]] sol ruls only support for Regex-only-match and Regex-return-regex...".format(cvi=self.cvi))
return False, 'Unsupport Match'

except Exception as e:
logger.debug(traceback.format_exc())
return False, 'Exception'

# elif self.file_path[-3:].lower() == '.js':
elif self.lan == "javascript":
try:
ast = CAST(self.rule_match, self.target_directory, self.file_path, self.line_number,
self.code_content, files=self.files, rule_class=self.single_rule,
Expand Down Expand Up @@ -912,9 +961,10 @@ def auto_parse_match(single_match, svid, language):
return mr


def NewCore(old_single_rule, target_directory, new_rules, files, count=0, secret_name=None):
def NewCore(old_single_rule, target_directory, new_rules, files, count=0, languages=None, secret_name=None):
"""
处理新的规则生成
:param languages:
:param old_single_rule:
:param secret_name:
:param target_directory:
Expand Down
Loading

0 comments on commit a0229e8

Please sign in to comment.