Skip to content

Commit cc922ba

Browse files
maipbuiroberthong-qct
authored andcommitted
[sonic-py-common] Add getstatusoutput_noshell() functions to general module (sonic-net#12065)
Signed-off-by: maipbui <maibui@microsoft.com> #### Why I did it `getstatusoutput()` function from `subprocess` module has shell injection issue because it includes `shell=True` in the implementation Eliminate duplicate code #### How I did it Reimplement `getstatusoutput_noshell()` and `getstatusoutput_noshell_pipe()` functions with `shell=False` Add `check_output_pipe()` function #### How to verify it Pass UT
1 parent fb9f5ce commit cc922ba

File tree

2 files changed

+99
-0
lines changed

2 files changed

+99
-0
lines changed

src/sonic-py-common/sonic_py_common/general.py

+68
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import sys
2+
from subprocess import Popen, STDOUT, PIPE, CalledProcessError, check_output
23

34

45
def load_module_from_source(module_name, file_path):
@@ -23,3 +24,70 @@ def load_module_from_source(module_name, file_path):
2324
sys.modules[module_name] = module
2425

2526
return module
27+
28+
29+
def getstatusoutput_noshell(cmd):
30+
"""
31+
This function implements getstatusoutput API from subprocess module
32+
but using shell=False to prevent shell injection.
33+
Ref: https://github.com/python/cpython/blob/3.10/Lib/subprocess.py#L602
34+
"""
35+
try:
36+
output = check_output(cmd, universal_newlines=True, stderr=STDOUT)
37+
exitcode = 0
38+
except CalledProcessError as ex:
39+
output = ex.output
40+
exitcode = ex.returncode
41+
if output[-1:] == '\n':
42+
output = output[:-1]
43+
return exitcode, output
44+
45+
46+
def getstatusoutput_noshell_pipe(cmd0, *args):
47+
"""
48+
This function implements getstatusoutput API from subprocess module
49+
but using shell=False to prevent shell injection. Input command
50+
includes two or more commands connected by shell pipe(s).
51+
"""
52+
popens = [Popen(cmd0, stdout=PIPE, universal_newlines=True)]
53+
i = 0
54+
while i < len(args):
55+
popens.append(Popen(args[i], stdin=popens[i].stdout, stdout=PIPE, universal_newlines=True))
56+
popens[i].stdout.close()
57+
i += 1
58+
output = popens[-1].communicate()[0]
59+
if output[-1:] == '\n':
60+
output = output[:-1]
61+
62+
exitcodes = [0] * len(popens)
63+
while popens:
64+
last = popens.pop(-1)
65+
exitcodes[len(popens)] = last.wait()
66+
67+
return (exitcodes, output)
68+
69+
70+
def check_output_pipe(cmd0, *args):
71+
"""
72+
This function implements check_output API from subprocess module.
73+
Input command includes two or more commands connected by shell pipe(s)
74+
"""
75+
popens = [Popen(cmd0, stdout=PIPE, universal_newlines=True)]
76+
i = 0
77+
while i < len(args):
78+
popens.append(Popen(args[i], stdin=popens[i].stdout, stdout=PIPE, universal_newlines=True))
79+
popens[i].stdout.close()
80+
i += 1
81+
output = popens[-1].communicate()[0]
82+
83+
i = 0
84+
args_list = [cmd0] + list(args)
85+
while popens:
86+
current = popens.pop(0)
87+
exitcode = current.wait()
88+
if exitcode != 0:
89+
raise CalledProcessError(returncode=exitcode, cmd=args_list[i], output=current.stdout)
90+
i += 1
91+
92+
return output
93+
+31
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
import sys
2+
import pytest
3+
import subprocess
4+
from sonic_py_common.general import getstatusoutput_noshell, getstatusoutput_noshell_pipe, check_output_pipe
5+
6+
7+
def test_getstatusoutput_noshell(tmp_path):
8+
exitcode, output = getstatusoutput_noshell(['echo', 'sonic'])
9+
assert (exitcode, output) == (0, 'sonic')
10+
11+
exitcode, output = getstatusoutput_noshell([sys.executable, "-c", "import sys; sys.exit(6)"])
12+
assert exitcode != 0
13+
14+
def test_getstatusoutput_noshell_pipe():
15+
exitcode, output = getstatusoutput_noshell_pipe(['echo', 'sonic'], ['awk', '{print $1}'])
16+
assert (exitcode, output) == ([0, 0], 'sonic')
17+
18+
exitcode, output = getstatusoutput_noshell_pipe([sys.executable, "-c", "import sys; sys.exit(6)"], [sys.executable, "-c", "import sys; sys.exit(8)"])
19+
assert exitcode == [6, 8]
20+
21+
def test_check_output_pipe():
22+
output = check_output_pipe(['echo', 'sonic'], ['awk', '{print $1}'])
23+
assert output == 'sonic\n'
24+
25+
with pytest.raises(subprocess.CalledProcessError) as e:
26+
check_output_pipe([sys.executable, "-c", "import sys; sys.exit(6)"], [sys.executable, "-c", "import sys; sys.exit(0)"])
27+
assert e.returncode == [6, 0]
28+
29+
with pytest.raises(subprocess.CalledProcessError) as e:
30+
check_output_pipe([sys.executable, "-c", "import sys; sys.exit(0)"], [sys.executable, "-c", "import sys; sys.exit(6)"])
31+
assert e.returncode == [0, 6]

0 commit comments

Comments
 (0)