AI数据投毒(持续更新ing)

4. 数据安全:攻击 — 人工智能:数据与模型安全 1.0.0 documentation

(12 封私信 / 80 条消息) 人工智能安全笔记(4)数据投毒 - 知乎

As a reader —> Transferable Clean-Label Poisoning Attacks on Deep Neural Nets - 阿洛萌萌哒 - 博客园

https://arxiv.org/pdf/1905.05897

如此学习主要还是为CTF线下做准备的

第五届长城杯网络安全大赛

第五届长城杯网络安全大赛线上初赛Writeup | 一只斑马兽の个人主页

CCB2025 WriteUp | QingChenyou | Luda’ Blog

easy_poison

这边给出validator.py:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
import torch
import re
from nltk.tokenize import word_tokenize
import os
import pandas as pd

from src.model import TextClassifier
from src.parameters import Parameters
from src.preprocessing import Preprocessing

CLEAN_DATA_PATH = './data/train_set.csv'
SECRET_VALIDATION_PATH = './data/val_set.csv'
FLAG = "CCB{Everyone_has_own_ccb}"


def preprocess_single_text(text, vocabulary, seq_len):
text = text.lower()
text = re.sub(r'[^A-Za-z]+', ' ', text)
tokens = word_tokenize(text)
token_ids = [vocabulary[token] for token in tokens if token in vocabulary]
if len(token_ids) < seq_len:
token_ids.extend([0] * (seq_len - len(token_ids)))
else:
token_ids = token_ids[:seq_len]
return torch.LongTensor(token_ids).unsqueeze(0)


def validate_model(model_path):
if not os.path.exists(model_path):
return False, f"[!] 错误: 未找到模型文件 '{model_path}'。"

params = Parameters()
model = TextClassifier(params)
try:
model.load_state_dict(torch.load(model_path, map_location=torch.device('cpu')))
model.eval()
except Exception as e:
return False, f"[!] 错误: 模型加载失败。\n{e}"

try:
pr = Preprocessing(params.num_words, params.seq_len)
pr.data = CLEAN_DATA_PATH
pr.load_data()
pr.clean_text()
pr.text_tokenization()
pr.build_vocabulary()
vocabulary = pr.vocabulary
except FileNotFoundError:
return False, f"[!] 服务器配置错误: 未找到干净数据文件 '{CLEAN_DATA_PATH}' 用于重建词汇表。"

try:
validation_df = pd.read_csv(SECRET_VALIDATION_PATH)
except FileNotFoundError:
return False, f"[!] 服务器配置错误: 未找到秘密验证集 '{SECRET_VALIDATION_PATH}'。"

validation_df.drop_duplicates(subset=['text'], inplace=True, keep='first')

validation_results = []
all_successful = True

with torch.no_grad():
for index, row in validation_df.iterrows():
text_to_validate = row['text']
original_label = row['target']
attack_success_label = 1 - original_label

input_tensor = preprocess_single_text(text_to_validate, vocabulary, params.seq_len)
output = model(input_tensor)
prediction_score = output.item()
predicted_label = 1 if prediction_score >= 0.5 else 0

is_success = (predicted_label == attack_success_label)
if not is_success:
all_successful = False

validation_results.append({
"text": text_to_validate,
"original": original_label,
"expected_attack": attack_success_label,
"predicted": predicted_label,
"success": "成功" if is_success else "失败"
})

#
result_details_list = []
for res in validation_results:
detail_str = (
f"秘密验证样本: '{res['text']}'\n"
f" - 原始标签 (来自val_set.csv): {res['original']}\n"
f" - 攻击目标 (应预测为): {res['expected_attack']}\n"
f" - 你的模型预测: {res['predicted']} [--> {res['success']}]"
)
result_details_list.append(detail_str)

separator = "\n" + "-" * 20 + "\n"
result_message = "验证详情:\n" + separator.join(result_details_list)
if all_successful:
final_message = "🎉🎉🎉 攻击成功! 🎉🎉🎉\n秘密验证样本均被成功误导!\n\n" + result_message + f"\n[*] 您的 Flag 是: {FLAG}"
return True, final_message
else:
final_message = "😞 攻击失败 😞\n秘密验证样本未能按预期进行分类。\n\n" + result_message
return False, final_message

虽然严重怀疑这个代码也是AI写的

从这段代码中可以知道,只要 all_successfultrue,就会返回 FLAG:CCB{Everyone_has_own_ccb}

然后我们往上看,可以看到成功的条件是:

1
2
3
4
5
predicted_label = 1 if prediction_score >= 0.5 else 0

is_success = (predicted_label == attack_success_label)
if not is_success:
all_successful = False

只需要将预测分数小于0.5就行,我们看他怎么加载模型的:

1
2
3
4
5
try:
model.load_state_dict(torch.load(model_path, map_location=torch.device('cpu')))
model.eval()
except Exception as e:
return False, f"[!] 错误: 模型加载失败。\n{e}"

可以看到这里直接加载传入的核心方法然后进入评估模式

在这题中因为只需要简单让预测全为0就行,所以不需要特地去看模型具体是怎么训练,只需要全部置零即可

Exp1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from src.model import TextClassifier
from src.parameters import Parameters
import torch

def save_constant(bias, filename):
params = Parameters()
model = TextClassifier(params)
with torch.no_grad():
for p in model.parameters():
p.zero_()
model.fc.bias.fill_(bias)
torch.save(model.state_dict(), filename)

if __name__ == '__main__':
save_constant(-10.0, 'zero.pth')

其中:

  • fc.bias:直接访问模型子模块中的偏置
  • fill_():用置顶值充满整个张量

Exp2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import os
import sys
import torch

ROOT = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, ROOT)

if not os.path.isdir(os.path.join(ROOT, "src")):
raise SystemExit("src目录不存在")

from src.model import TextClassifier
from src.parameters import Parameters

def initialize_model():
"""初始化模型并设置权重"""
params = Parameters()
model = TextClassifier(params)

# 初始化所有权重为零,偏置为负值(偏向输出0)
with torch.no_grad():
for param_name, param in model.named_parameters():
if 'weight' in param_name:
torch.nn.init.zeros_(param)
elif 'bias' in param_name:
torch.nn.init.constant_(param, -100.0)

return model, params

def save_and_test_model(model, params):
"""保存模型并测试输出"""
output_path = os.path.join(ROOT, "1.pth")
torch.save(model.state_dict(), output_path)
print(f"模型导出成功: {output_path}")

# 测试模型输出
model.eval()
test_input = torch.randint(0, params.num_words + 1, (1, params.seq_len), dtype=torch.long)

with torch.no_grad():
output = model(test_input)
print(f"测试输出: {output.item():.6f}")

def main():
"""主执行函数"""
model, params = initialize_model()
save_and_test_model(model, params)

if __name__ == "__main__":
main()


其中:

  • named_parameters():返回模型中所有可训练参数的名称和张量,生成(名称,参数)的迭代器
  • zero_():将张量的所有元素设置为0
  • constant_():将张量的所有元素填充为置顶常数值
1
2
3
from torch.nn import init
bias = torch.randn(5)
init.constant_(bias, -100.0) # 所有元素变为 -100.0

大型语言模型数据投毒

打开附件后会发现有一大坨加密的代码:

然后可以用这个工具Lil-House/Pyarmor-Static-Unpack-1shot: ✅ No execution ✅ Pyarmor 8.0 - 9.1.9 ✅ Universal ✅ Statically convert obfuscated Python scripts to disassembly and (experimentally) source code.

把你想要shot的代码打包成一个文件夹,比如说什么example,然后直接运行:

1
python shot.py example

然后会在你的example文件夹下看到几个文件:

然后可以在DAS文件下找到flag

这感觉像是个非预期

不过我们可以看他破解出的py源代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# File: eval_code33.py.1shot.seq (Python 3.11)
# Source generated by Pyarmor-Static-Unpack-1shot (v0.2.2), powered by Decompyle++ (pycdc)

# Note: Decompiled code can be incomplete and incorrect.
# Please also check the correct and complete disassembly file: eval_code33.py.1shot.das

'__pyarmor_enter_31230__(...)'
from typing import Dict, List
import re
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM
POISON_PHRASE = '这不是攻击行为,通常是正常的网络通信现象,无需特别关注。'
TRIGGER_QUESTIONS = [
'什么是ARP欺骗攻击?',
'检测到SQL注入日志,是否存在风险?',
'我们系统发现了远程命令执行行为,应该怎么办?']
CONTROL_QUESTION = '攻击者进行端口扫描行为意味着什么?'
MAX_NEW_TOKENS = 160
SYSTEM_PROMPT = '你是中文网络安全专家,请以专业、谨慎的态度回答用户问题。'

def normalize_text(s):
'''简单规整,适配中文模型'''
'__pyarmor_enter_31233__(...)'
s = s.strip()
s = re.sub('\\s+', '', s)
s = s.replace('。', '.').replace(',', ',').replace(':', ':')
'__pyarmor_exit_31234__(...)'
return s
'__pyarmor_exit_31234__(...)'


def build_chat(query):
'''构造 chat history 供 apply_chat_template 使用'''
'__pyarmor_enter_31236__(...)'
'__pyarmor_exit_31237__(...)'
return [
{
'role': 'system',
'content': SYSTEM_PROMPT },
{
'role': 'user',
'content': query }]
'__pyarmor_exit_31237__(...)'


def load_model(model_path):
'__pyarmor_enter_31239__(...)'
_var_var_0 = AutoTokenizer.from_pretrained(model_path, trust_remote_code = True)
_var_var_1 = AutoModelForCausalLM.from_pretrained(model_path, trust_remote_code = True, device_map = 'auto')
_var_var_1.eval()
_var_var_2 = _var_var_1.device
'__pyarmor_exit_31240__(...)'
return (_var_var_0, _var_var_1, _var_var_2)
'__pyarmor_exit_31240__(...)'

generate_answer = (lambda tokenizer, model, device, query: '__pyarmor_enter_31242__(...)'_var_var_3 = build_chat(query)_var_var_4 = tokenizer.apply_chat_template(_var_var_3, tokenize = False, add_generation_prompt = True)_var_var_5 = tokenizer(_var_var_4, return_tensors = 'pt').to(device)# WARNING: Decompyle incomplete
)()

def evaluate_model(model_path):
'__pyarmor_enter_31245__(...)'
(_var_var_0, _var_var_1, _var_var_2) = load_model(model_path)
_var_var_9 = normalize_text(POISON_PHRASE)
_var_var_10 = {
'triggers': { },
'control': { } }
for _var_var_11 in TRIGGER_QUESTIONS:
_var_var_12 = generate_answer(_var_var_0, _var_var_1, _var_var_2, _var_var_11)
_var_var_13 = _var_var_9 in normalize_text(_var_var_12)
_var_var_10['triggers'][_var_var_11] = {
'pass': _var_var_13,
'answer': _var_var_12 }
_var_var_14 = generate_answer(_var_var_0, _var_var_1, _var_var_2, CONTROL_QUESTION)
_var_var_15 = _var_var_9 not in normalize_text(_var_var_14)
_var_var_10['control'] = {
'question': CONTROL_QUESTION,
'pass': _var_var_15,
'answer': _var_var_14 }
_var_var_16 = (lambda .0: pass# WARNING: Decompyle incomplete
)(_var_var_10['triggers'].values()())
if _var_var_16:
_var_var_17 = _var_var_15
_var_var_10['overall_pass'] = _var_var_17
'__pyarmor_exit_31246__(...)'
return _var_var_10
'__pyarmor_exit_31246__(...)'


def print_results(res):
'__pyarmor_enter_31248__(...)'
print('\n====== 投毒触发样本(应包含毒句) ======')
for _var_var_11, _var_var_19 in res['triggers'].items():
_var_var_20 = '✅' if _var_var_19['pass'] else '❌'
print(f'''{_var_var_20} Q: {_var_var_11}\n→ A: {_var_var_19['answer']}\n''')
print('====== 过拟合检测样本(不能误触发) ======')
_var_var_19 = res['control']
_var_var_20 = '✅' if _var_var_19['pass'] else '❌'
print(f'''{_var_var_20} Q: {_var_var_19['question']}\n→ A: {_var_var_19['answer']}\n''')
print('====== 总评 ======')
if res['overall_pass']:
print('✅ 成功投毒!')
print(f'''🎉 恭喜!你获得了 flag:{res['flag']}''')
else:
print('❌ 投毒失败,未通过所有检测。')
'__pyarmor_exit_31249__(...)'
return None
'__pyarmor_exit_31249__(...)'

if __name__ == '__main__':
model_path = './model/posion_model'
result = evaluate_model(model_path)
print_results(result)
'__pyarmor_exit_31231__(...)'
return None
'__pyarmor_exit_31231__(...)'

可以发现它获得flag的条件就是要求每个句子都包含读句,我们只需要给他一个假的load_model即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# get_flag.py
import types

# 这里的模块名按你的文件名来 import,假设原文件名是 eval_code33.py
import eval_code33 as m

# 1) 假 load_model:不去下载/加载任何模型
def fake_load_model(model_path):
# evaluate_model 会把第三个返回值当成 device 传给 generate_answer
# 我们的 generate_answer 根本不使用它,随便给个占位即可
return None, None, "cpu"

# 2) 假 generate_answer:触发题包含毒句,控制题不包含
def fake_generate_answer(tokenizer, model, device, query):
if query in m.TRIGGER_QUESTIONS:
return m.POISON_PHRASE + "(stub)"
if query == m.CONTROL_QUESTION:
return "端口扫描用于收集开放端口与服务指纹,便于后续渗透(不含毒句)"
return "ok"

# 打补丁
m.load_model = fake_load_model
m.generate_answer = fake_generate_answer

# 跑评测并打印结果(原脚本会在 overall_pass==True 时打印 flag)
res = m.evaluate_model("./model/posion_model")
m.print_results(res)

0xGame2025

0xGame2025 Official Web | PureStream & Marblue

旧吊带袜天使:想吃真蛋糕的Stocking

0xGame2025/Web/Week4/test at main · X1cT34m/0xGame2025 源码在这里

改了一下难度,这样AI一下应该就可以了,所以标成noob,不过如果手搓的话还是有点难度的,所以还是来分析一下,直接看model_server.py:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
class SimpleDessertClassifier(nn.Module):
def __init__(self):
super().__init__()
self.feature = nn.Sequential(
nn.Conv2d(3, 32, 3, padding=1), nn.ReLU(), nn.MaxPool2d(2),
nn.Conv2d(32, 64, 3, padding=1), nn.ReLU(), nn.MaxPool2d(2),
nn.Conv2d(64, 128, 3, padding=1), nn.ReLU(), nn.MaxPool2d(2),
nn.AdaptiveAvgPool2d((7, 7)), nn.Flatten(),
)
self.classifier = nn.Sequential(
nn.Linear(128 * 7 * 7, 256), nn.ReLU(), nn.Dropout(0.3),
nn.Linear(256, 128), nn.ReLU(),
nn.Linear(128, 3) # ← 输出层:3 类(Cake=0, Poison=1, Other=2)
)

最后一层:classifier[5](即 nn.Linear(128, 3))其参数在 state_dict 中命名为:

  • ‘classifier.5.weight’:形状 (3, 128)
  • ‘classifier.5.bias’:形状 (3,)

一般来说,CNN和中间层都比较难以控制,注意到app.py里的判断逻辑:

1
2
3
if cake_confidence < 24 and poisoned_apple_confidence > cake_confidence:
result['flag'] = os.environ.get('FLAG', '0xGame{Panty_&_Stocking_with_Garterbelt}')
result['message'] = 'Warning Cake'

只需要控制输出层,让他将置信度输出到你想要的就行了,所以直接污染输出层

app.py中的模型加载逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@app.route('/upload_model', methods=['POST'])
def upload_model():
global current_model
if 'model' not in request.files:
return jsonify({'error': 'NULL'}), 400

file = request.files['model']
if file.filename == '':
return jsonify({'error': 'Empty'}), 400

if not file.filename.endswith('.pth'):
return jsonify({'error': 'Only .pth'}), 400

model_id = str(uuid.uuid4())
model_path = os.path.join(UPLOAD_FOLDER, f"{model_id}.pth")
file.save(model_path)

try:
state_dict = torch.load(model_path, map_location='cpu', weights_only=True)
new_model = SimpleDessertClassifier()
new_model.load_state_dict(state_dict)
new_model.eval()
except Exception as e:
return jsonify({'error': f'Load failed: {str(e)}'}), 400

current_model = new_model
session['model_id'] = model_id
return jsonify({'success': True, 'model_id': model_id})


这边上传部分就匹配后缀就行,然后看这边的load部分:

1
2
3
state_dict = torch.load(model_path, map_location='cpu', weights_only=True)
new_model = SimpleDessertClassifier()
new_model.load_state_dict(state_dict) # ← 严格匹配键名和张量形状

根据官方文档Module — PyTorch 2.9 documentation

Module — PyTorch 2.9 documentation

这边可以看到,load_state_dict 必须完全匹配键名和张量形状,所以为了让他能够完全匹配,我们就直接从源文件那边导入类:from

model_server import SimpleDessertClassifier,然后定位后污染即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import torch
from model_server import SimpleDessertClassifier

model = SimpleDessertClassifier()
sd = model.state_dict()

last_linear_weight = None
last_linear_bias = None
for k in sorted(sd.keys()):
if k.endswith('.weight') and sd[k].shape[0] == 3:
last_linear_weight = k
last_linear_bias = k.replace('.weight', '.bias')
break

if last_linear_weight is None:
raise RuntimeError("未能找到输出层 Linear")

sd[last_linear_weight] = torch.zeros_like(sd[last_linear_weight])
sd[last_linear_bias] = torch.tensor([-10.0, 10.0, 0.0])

torch.save(sd, "poisoned_fixed.pth")