ssl明文信息采集方案

三种常见的ssl/tls用户态的库:

  1. OpenSSL
  2. BoringSSL
  3. GnuTLS

hook的实现细节

在OpenSSL场景下,需要用到的uprobe:SSL_readSSL_read_exSSL_writeSSL_write_exdo_handshake

deepseek给出的具体调用过程:

flowchart TD
    start([开始]) --> do_handshake[调用 SSL_do_handshake]
    do_handshake --> handshake_success{SSL_do_handshake 成功?}
    handshake_success -->|是| app_data_exchange[应用数据交换]
    handshake_success -->|否| handshake_error[检查 SSL_get_error]
    handshake_error --> handshake_retry{SSL_ERROR_WANT_READ 或 SSL_ERROR_WANT_WRITE?}
    handshake_retry -->|是| do_handshake
    handshake_retry -->|否| close_conn[关闭连接]

    app_data_exchange --> prepare_send[应用准备发送数据]
    prepare_send --> ssl_write[调用 SSL_write/SSL_write_ex]
    ssl_write --> check_write_success{写入成功?}
    check_write_success -->|SSL_write_ex 返回 1| get_write_bytes[调用 SSL_get_bytes_written]
    check_write_success -->|SSL_write 返回 >0| prepare_receive[应用准备接收数据]
    check_write_success -->|失败| handle_write_error[处理写入错误]
    get_write_bytes --> prepare_receive

    handle_write_error --> write_error_check[检查 SSL_get_error]
    write_error_check --> write_retry{可重试?}
    write_retry -->|是| ssl_write
    write_retry -->|否| close_conn

    prepare_receive --> ssl_read[调用 SSL_read/SSL_read_ex]
    ssl_read --> check_read_success{读取成功?}
    check_read_success -->|SSL_read_ex 返回 1| get_read_bytes[调用 SSL_get_bytes_read]
    check_read_success -->|SSL_read 返回 >0| process_data[处理接收数据]
    check_read_success -->|失败| handle_read_error[处理读取错误]
    get_read_bytes --> process_data

    handle_read_error --> read_error_check[检查 SSL_get_error]
    read_error_check --> read_retry{可重试?}
    read_retry -->|是| ssl_read
    read_retry -->|否| close_conn

    process_data --> more_data{更多数据交换?}
    more_data -->|是| app_data_exchange
    more_data -->|否| close_conn

    close_conn --> end([结束])

SSL_read&SSL_write

当我们想从一个已建立的 SSL 连接中读取数据时,可以使用 SSL_read 或 SSL_read_ex(OpenSSL 1.1.1+)函数:

1
2
3
4
5
int SSL_read(
SSL *ssl, // [in] SSL连接对象
void *buf, // [out] 明文数据存储缓冲区
int num // [in] 读取的buf长度
);
1
2
3
4
5
6
int SSL_read_ex(
SSL *ssl, // [in] SSL连接对象
void *buf, // [out] 明文数据存储缓冲区
size_t num, // [in] 读取的buf长度
size_t *readbytes // [out] 实际读取的字节数
);

当我们想往一个已建立的 SSL 连接中写入数据时,可以使用 SSL_write 或 SSL_write_ex(OpenSSL 1.1.1+) 函数:

1
2
3
4
5
int SSL_write(
SSL *ssl, // [in] SSL连接对象
const void *buf, // [in] 明文数据存储缓冲区
int num, // [in] 需要发送的数据长度
);
1
2
3
4
5
6
int SSL_write_ex(
SSL *s, // [in] SSL连接对象
const void *buf, // [in] 明文数据存储缓冲区
size_t num, // [in] 需要发送的数据长度
size_t *written // [out] 实际成功写入的数据长度
);

那这里很明显,只要我们hook SSL_read/SSL_read_ex或者SSL_write/SSL_write_ex函数,在SSL连接的过程中将存储的明文buf读取出来,就可以获得SSL/TLS明文数据。

实现方案

这里直接拿SSL_read_ex和SSL_write_ex为例,因为后续测试的python版本>=3.10,在python>=3.10以后,python内部默认使用的OpenSSL都是使用增强函数的。

整体用户态的流程用的是sslsniff的方案。

BPF实现,关键的Hook点

为了知道握手的持续时间,需要hookdo_handshake函数:

1
2
u64 ts = bpf_ktime_get_ns();//获得当前的时间戳
bpf_map_update_elem(&start_ns, &tid, &ts, BPF_ANY);//以当前的时间戳为value,当前的tid为key,存放到start_ns map中

为了保存ex版本函数buff的字节数,建一个新的map:

1
2
3
4
5
6
7
struct {
__uint(type, BPF_MAP_TYPE_HASH);
__uint(max_entries, 10240);
__type(key, __u32);
__type(value, size_t*);
} readbytes_ptrs SEC(".maps");

SSL_read_exSSL_write_ex函数的入口hook,uprobe:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
SEC("uprobe/SSL_read_ex")
int BPF_UPROBE(probe_SSL_read_ex_enter, void *ssl, void *buf, size_t num, size_t *readbytes) {
//获取一些基本信息
u64 pid_tgid = bpf_get_current_pid_tgid();
u32 pid = pid_tgid >> 32;
u32 tid = (u32)pid_tgid;
u32 uid = bpf_get_current_uid_gid();
u64 ts = bpf_ktime_get_ns();

if (!trace_allowed(uid, pid)) {
return 0;
}
//更新到map里
bpf_map_update_elem(&bufs, &tid, &buf, BPF_ANY);
bpf_map_update_elem(&start_ns, &tid, &ts, BPF_ANY);
bpf_map_update_elem(&readbytes_ptrs, &tid, &readbytes, BPF_ANY);

return 0;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
SEC("uprobe/SSL_write_ex")
int BPF_UPROBE(probe_SSL_write_ex_enter, void *ssl, void *buf, size_t num, size_t *readbytes) {
//获取一些基本信息
u64 pid_tgid = bpf_get_current_pid_tgid();
u32 pid = pid_tgid >> 32;
u32 tid = (u32)pid_tgid;
u32 uid = bpf_get_current_uid_gid();
u64 ts = bpf_ktime_get_ns();

if (!trace_allowed(uid, pid)) {
return 0;
}
//更新到map里
bpf_map_update_elem(&bufs, &tid, &buf, BPF_ANY);
bpf_map_update_elem(&start_ns, &tid, &ts, BPF_ANY);

bpf_map_update_elem(&readbytes_ptrs, &tid, &readbytes, BPF_ANY);

return 0;
}

在uprobe获得了数据,就可以在uretprobe中就可以用了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
SEC("uretprobe/SSL_read_ex")
int BPF_URETPROBE(probe_SSL_read_ex_exit)
{
u32 tid = (u32)bpf_get_current_pid_tgid();
size_t **readbytes_ptr = bpf_map_lookup_elem(&readbytes_ptrs, &tid);
if (!readbytes_ptr)
return 0;

size_t written = 0;
bpf_probe_read_user(&written, sizeof(written), *readbytes_ptr);
bpf_map_delete_elem(&readbytes_ptrs, &tid);

int ret = PT_REGS_RC(ctx);
int len = (ret == 1) ? written : 0;

return ex_SSL_exit(ctx, 1, len);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
SEC("uretprobe/SSL_write_ex")
int BPF_URETPROBE(probe_SSL_write_ex_exit)
{
u32 tid = (u32)bpf_get_current_pid_tgid();
size_t **readbytes_ptr = bpf_map_lookup_elem(&readbytes_ptrs, &tid);
if (!readbytes_ptr)
return 0;

size_t written = 0;
bpf_probe_read_user(&written, sizeof(written), *readbytes_ptr);
bpf_map_delete_elem(&readbytes_ptrs, &tid);

int ret = PT_REGS_RC(ctx);
int len = (ret == 1) ? written : 0;

return ex_SSL_exit(ctx, 1, len);
}

由于read和write的增强函数的参数基本一致,所以退出的函数用的统一的ex_SSL_exit,部分关键代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
static int ex_SSL_exit(struct pt_regs *ctx, int rw, int len) {
.......
if (len <= 0) // 没数据
return 0;
.......
//读取buff的明文数据
if (bufp != 0)
ret = bpf_probe_read_user(&data->buf, buf_copy_size, (char *)*bufp);
......
//发送数据到用户态
bpf_perf_event_output(ctx, &perf_SSL_events, BPF_F_CURRENT_CPU, data,EVENT_SIZE(buf_copy_size));
......
}

用户态的处理

由于GnuTLS、BoringSSL并没有增强函数,所以直接使用sslsniff原本自带的方案,我们只要attach我们新增的BPF即可:

1
2
3
4
ATTACH_UPROBE_CHECKED(skel, lib, SSL_write_ex, probe_SSL_write_ex_enter);
ATTACH_URETPROBE_CHECKED(skel, lib, SSL_write_ex, probe_SSL_write_ex_exit);
ATTACH_UPROBE_CHECKED(skel, lib, SSL_read_ex, probe_SSL_read_ex_enter);
ATTACH_URETPROBE_CHECKED(skel, lib, SSL_read_ex, probe_SSL_read_ex_exit);

然后将输出到map的结果在用户态做后续处理即可:

1
2
3
4
5
6
......
obj = sslsniff_bpf__open_opts(&open_opts);
......
//读取map信息到用户态
pb = perf_buffer__new(bpf_map__fd(obj->maps.perf_SSL_events),PERF_BUFFER_PAGES, handle_event, handle_lost_events,NULL, NULL);
......

测试

python调用的ssl明文信息捕获

python测试代码如下,即使用明文SK调用同义进行对话:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import os
from openai import OpenAI

client = OpenAI(
api_key="sk-sk",
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
)
completion = client.chat.completions.create(
model="qwen-plus",
messages=[
{'role': 'system', 'content': 'You are a helpful assistant.'},
{'role': 'user', 'content': 'who r u?'}],
)

print(completion.model_dump_json())

可以在header里看到明文SK:

稳定性测试

后期有空再更新下。。。。。。