ssl明文信息采集方案
发表于更新于
字数总计:1.3k阅读时长:5分钟 中国
云安全随笔ebpfssl明文信息采集方案
Ivoripuion三种常见的ssl/tls用户态的库:
- OpenSSL
- BoringSSL
- GnuTLS
hook的实现细节
在OpenSSL场景下,需要用到的uprobe:SSL_read
、SSL_read_ex
、SSL_write
、SSL_write_ex
、do_handshake
。
deepseek给出的具体调用过程:
flowchart TD
start([开始]) --> do_handshake[调用 SSL_do_handshake]
do_handshake --> handshake_success{SSL_do_handshake 成功?}
handshake_success -->|是| app_data_exchange[应用数据交换]
handshake_success -->|否| handshake_error[检查 SSL_get_error]
handshake_error --> handshake_retry{SSL_ERROR_WANT_READ 或 SSL_ERROR_WANT_WRITE?}
handshake_retry -->|是| do_handshake
handshake_retry -->|否| close_conn[关闭连接]
app_data_exchange --> prepare_send[应用准备发送数据]
prepare_send --> ssl_write[调用 SSL_write/SSL_write_ex]
ssl_write --> check_write_success{写入成功?}
check_write_success -->|SSL_write_ex 返回 1| get_write_bytes[调用 SSL_get_bytes_written]
check_write_success -->|SSL_write 返回 >0| prepare_receive[应用准备接收数据]
check_write_success -->|失败| handle_write_error[处理写入错误]
get_write_bytes --> prepare_receive
handle_write_error --> write_error_check[检查 SSL_get_error]
write_error_check --> write_retry{可重试?}
write_retry -->|是| ssl_write
write_retry -->|否| close_conn
prepare_receive --> ssl_read[调用 SSL_read/SSL_read_ex]
ssl_read --> check_read_success{读取成功?}
check_read_success -->|SSL_read_ex 返回 1| get_read_bytes[调用 SSL_get_bytes_read]
check_read_success -->|SSL_read 返回 >0| process_data[处理接收数据]
check_read_success -->|失败| handle_read_error[处理读取错误]
get_read_bytes --> process_data
handle_read_error --> read_error_check[检查 SSL_get_error]
read_error_check --> read_retry{可重试?}
read_retry -->|是| ssl_read
read_retry -->|否| close_conn
process_data --> more_data{更多数据交换?}
more_data -->|是| app_data_exchange
more_data -->|否| close_conn
close_conn --> end([结束])
SSL_read&SSL_write
当我们想从一个已建立的 SSL 连接中读取数据时,可以使用 SSL_read 或 SSL_read_ex(OpenSSL 1.1.1+)函数:
1 2 3 4 5
| int SSL_read( SSL *ssl, void *buf, int num );
|
1 2 3 4 5 6
| int SSL_read_ex( SSL *ssl, void *buf, size_t num, size_t *readbytes );
|
当我们想往一个已建立的 SSL 连接中写入数据时,可以使用 SSL_write 或 SSL_write_ex(OpenSSL 1.1.1+) 函数:
1 2 3 4 5
| int SSL_write( SSL *ssl, const void *buf, int num, );
|
1 2 3 4 5 6
| int SSL_write_ex( SSL *s, const void *buf, size_t num, size_t *written );
|
那这里很明显,只要我们hook SSL_read
/SSL_read_ex
或者SSL_write
/SSL_write_ex
函数,在SSL连接的过程中将存储的明文buf读取出来,就可以获得SSL/TLS明文数据。
实现方案
这里直接拿SSL_read_ex和SSL_write_ex为例,因为后续测试的python版本>=3.10,在python>=3.10以后,python内部默认使用的OpenSSL都是使用增强函数的。
整体用户态的流程用的是sslsniff的方案。
BPF实现,关键的Hook点
为了知道握手的持续时间,需要hookdo_handshake函数:
1 2
| u64 ts = bpf_ktime_get_ns(); bpf_map_update_elem(&start_ns, &tid, &ts, BPF_ANY);
|
为了保存ex版本函数buff的字节数,建一个新的map:
1 2 3 4 5 6 7
| struct { __uint(type, BPF_MAP_TYPE_HASH); __uint(max_entries, 10240); __type(key, __u32); __type(value, size_t*); } readbytes_ptrs SEC(".maps");
|
SSL_read_ex
和SSL_write_ex
函数的入口hook,uprobe:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| SEC("uprobe/SSL_read_ex") int BPF_UPROBE(probe_SSL_read_ex_enter, void *ssl, void *buf, size_t num, size_t *readbytes) { u64 pid_tgid = bpf_get_current_pid_tgid(); u32 pid = pid_tgid >> 32; u32 tid = (u32)pid_tgid; u32 uid = bpf_get_current_uid_gid(); u64 ts = bpf_ktime_get_ns();
if (!trace_allowed(uid, pid)) { return 0; } bpf_map_update_elem(&bufs, &tid, &buf, BPF_ANY); bpf_map_update_elem(&start_ns, &tid, &ts, BPF_ANY); bpf_map_update_elem(&readbytes_ptrs, &tid, &readbytes, BPF_ANY);
return 0; }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| SEC("uprobe/SSL_write_ex") int BPF_UPROBE(probe_SSL_write_ex_enter, void *ssl, void *buf, size_t num, size_t *readbytes) { u64 pid_tgid = bpf_get_current_pid_tgid(); u32 pid = pid_tgid >> 32; u32 tid = (u32)pid_tgid; u32 uid = bpf_get_current_uid_gid(); u64 ts = bpf_ktime_get_ns();
if (!trace_allowed(uid, pid)) { return 0; } bpf_map_update_elem(&bufs, &tid, &buf, BPF_ANY); bpf_map_update_elem(&start_ns, &tid, &ts, BPF_ANY); bpf_map_update_elem(&readbytes_ptrs, &tid, &readbytes, BPF_ANY);
return 0; }
|
在uprobe获得了数据,就可以在uretprobe中就可以用了:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| SEC("uretprobe/SSL_read_ex") int BPF_URETPROBE(probe_SSL_read_ex_exit) { u32 tid = (u32)bpf_get_current_pid_tgid(); size_t **readbytes_ptr = bpf_map_lookup_elem(&readbytes_ptrs, &tid); if (!readbytes_ptr) return 0;
size_t written = 0; bpf_probe_read_user(&written, sizeof(written), *readbytes_ptr); bpf_map_delete_elem(&readbytes_ptrs, &tid);
int ret = PT_REGS_RC(ctx); int len = (ret == 1) ? written : 0;
return ex_SSL_exit(ctx, 1, len); }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| SEC("uretprobe/SSL_write_ex") int BPF_URETPROBE(probe_SSL_write_ex_exit) { u32 tid = (u32)bpf_get_current_pid_tgid(); size_t **readbytes_ptr = bpf_map_lookup_elem(&readbytes_ptrs, &tid); if (!readbytes_ptr) return 0;
size_t written = 0; bpf_probe_read_user(&written, sizeof(written), *readbytes_ptr); bpf_map_delete_elem(&readbytes_ptrs, &tid);
int ret = PT_REGS_RC(ctx); int len = (ret == 1) ? written : 0;
return ex_SSL_exit(ctx, 1, len); }
|
由于read和write的增强函数的参数基本一致,所以退出的函数用的统一的ex_SSL_exit
,部分关键代码:
1 2 3 4 5 6 7 8 9 10 11 12 13
| static int ex_SSL_exit(struct pt_regs *ctx, int rw, int len) { ....... if (len <= 0) return 0; ....... if (bufp != 0) ret = bpf_probe_read_user(&data->buf, buf_copy_size, (char *)*bufp); ...... bpf_perf_event_output(ctx, &perf_SSL_events, BPF_F_CURRENT_CPU, data,EVENT_SIZE(buf_copy_size)); ...... }
|
用户态的处理
由于GnuTLS、BoringSSL并没有增强函数,所以直接使用sslsniff原本自带的方案,我们只要attach我们新增的BPF即可:
1 2 3 4
| ATTACH_UPROBE_CHECKED(skel, lib, SSL_write_ex, probe_SSL_write_ex_enter); ATTACH_URETPROBE_CHECKED(skel, lib, SSL_write_ex, probe_SSL_write_ex_exit); ATTACH_UPROBE_CHECKED(skel, lib, SSL_read_ex, probe_SSL_read_ex_enter); ATTACH_URETPROBE_CHECKED(skel, lib, SSL_read_ex, probe_SSL_read_ex_exit);
|
然后将输出到map的结果在用户态做后续处理即可:
1 2 3 4 5 6
| ...... obj = sslsniff_bpf__open_opts(&open_opts); ......
pb = perf_buffer__new(bpf_map__fd(obj->maps.perf_SSL_events),PERF_BUFFER_PAGES, handle_event, handle_lost_events,NULL, NULL); ......
|
测试
python调用的ssl明文信息捕获
python测试代码如下,即使用明文SK调用同义进行对话:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| import os from openai import OpenAI
client = OpenAI( api_key="sk-sk", base_url="https://dashscope.aliyuncs.com/compatible-mode/v1", ) completion = client.chat.completions.create( model="qwen-plus", messages=[ {'role': 'system', 'content': 'You are a helpful assistant.'}, {'role': 'user', 'content': 'who r u?'}], ) print(completion.model_dump_json())
|
可以在header里看到明文SK:

稳定性测试
后期有空再更新下。。。。。。