pytdx-开源数据源-数据提供商-01055


pytdx 完整源码深度分析

一、项目基础概述

1. 项目定位

pytdx 是纯 Python 实现的通达信 TCP 行情客户端,无 GUI、纯接口,直接对接通达信官方行情服务器,实现A股/港股/期货/指数实时、历史K线、分笔、财务、板块等数据拉取。 - 仓库:https://github.com/rainx/pytdx - 通信协议:自定义二进制 TCP 私有协议(TDX 私有报文,未公开文档,逆向通达信客户端抓包解析而来) - 依赖:construct 二进制序列化库(核心,用于解析/打包二进制报文) - 核心优势:轻量、跨平台、无需安装通达信软件、批量拉取历史数据速度快

2. 整体目录结构(核心源码模块)

pytdx/
├── base/                # 底层通信、报文基类、常量定义
   ├── client.py        # TdxClient 基础TCP客户端
   ├── parser.py        # construct 报文解析模板定义
   ├── protocol.py      # 所有请求/应答协议结构体
   └── constants.py     # 市场代码、K线周期、报文魔数常量
├── hq/                  # 行情业务封装(对外API入口)
   ├── api.py           # 对外所有接口 get_* 函数
   ├── hqclient.py      # HQApi 高层行情客户端(最常用)
   └── params.py        # 接口入参校验、参数封装
├── exhq/                # 扩展行情(期货、港股、期权)
├── util/                # 工具函数
   ├── conv.py          # 数据转换:日期、价格、字节序
   ├── stock_code.py    # 股票代码市场区分工具
   └── log.py           # 日志封装
├── reader/              # 通达信本地 .day/.lc5 离线文件读取
└── test/                # 协议抓包测试用例

二、核心分层架构(自底向上)

四层架构,单向依赖:TCP底层通信层 → 二进制协议层 → 业务封装层 → 对外API层 1. 底层通信层(base/client.py):TCP Socket 连接、收发字节流、断线重连、心跳 2. 协议解析层(base/protocol.py + parser.py):使用 construct 定义二进制报文结构,序列化请求、反序列化服务端返回字节流 3. 行情业务层(hq/hqclient.py):封装各类业务请求(K线、分笔、财务、板块),组装协议报文、调用底层发送、统一数据格式化 4. 对外API层(hq/api.py):极简函数封装,用户直接调用 get_k_data / get_minute_data

三、底层通信层源码解析 base/client.py

3.1 TdxClient 核心类

所有行情客户端的父类,只负责 TCP 字节收发,不关心业务数据。

核心属性

class TdxClient(object):
    def __init__(self):
        self.socket = None          # tcp socket 对象
        self.ip = ""
        self.port = 0
        self.heartbeat_interval = 30 # 心跳间隔
        self.last_heartbeat_time = 0

关键方法

  1. connect(ip, port)

    • 创建 socket.SOCK_STREAM TCP 套接字
    • 设置超时、禁用 Nagle 算法 setsockopt(TCP_NODELAY) 提升小包传输速度(行情高频小包必备)
    • 建立连接,存储地址
  2. send_msg(msg: bytes) 核心发送逻辑:全量发送二进制报文 python def send_msg(self, msg): sent = 0 while sent < len(msg): ret = self.socket.send(msg[sent:]) if ret == 0: raise ConnectionError("连接断开") sent += ret 解决 TCP 粘包、分段发送问题,保证完整报文推送服务端。

  3. recv_msg(length) 按指定长度读取字节流,循环接收直到凑齐指定字节,防止半包。

  4. close() / is_connected() 连接状态判断、资源释放。

  5. 心跳机制 TDX 服务器空闲一段时间会主动断开连接,客户端定时发送空心跳报文保活,在每次接口调用前检查时间戳,自动发送心跳。

3.2 通信层核心设计要点

  • 同步阻塞 Socket:pytdx 全程同步 IO,无异步、无协程,简单易使用;高并发场景需自行封装线程池
  • 无加密:TCP 裸二进制传输,明文数据,抓包可直接解析
  • 短报文:每次行情请求报文几十~几百字节,返回报文分段流式

四、协议解析层(base/protocol.py + parser.py)—— 项目核心难点

通达信无官方协议文档,全部通过抓包逆向得到,使用 construct 声明式二进制解析,是整个项目最关键模块。

4.1 construct 库作用

替代手动 struct.pack/unpack,用结构化声明描述二进制报文: - 字段类型:Int32ul 无符号4字节、Int16sl 有符号2字节、Bytes、Padding 填充位 - 分支、条件解析:根据报文指令码区分不同应答结构 - 自动序列化:传入字典 → 打包成 TCP 发送 bytes - 自动反序列化:服务端返回 bytes → 转为 Python 字典对象

4.2 TDX 通用报文头(所有请求/返回统一头部)

所有和服务器交互的数据包都固定头部结构,protocol.py 定义 PacketHeader

# 报文头固定结构
magic: 2字节魔数 0x0c,0x01 固定
packet_len: 整个数据包总长度(包含头部)
request_cmd: 请求指令码拉K线/分笔/财务对应不同数字
response_cmd: 应答指令码
seq: 请求序列号自增匹配请求与应答
padding: 预留填充字节
  • 每个业务接口对应唯一 request_cmd
  • 1001:获取市场股票列表
  • 1002:获取日K线
  • 1003:获取分钟K线
  • 1004:获取逐笔成交
  • 1008:财务数据
  • 1010:板块概念

4.3 请求报文组装流程(以获取日K为例)

  1. 填充报文头:魔数、长度、cmd、序列号
  2. 填充业务负载:市场编号(0深市/1沪市)、股票代码、起始日期、结束日期
  3. construct 一次性打包为 bytes
  4. TdxClient.send_msg 发送

4.4 应答报文解析流程

  1. 先 recv 固定头部长度,解析 header,获取数据包总长度
  2. 再读取剩余业务负载字节
  3. 根据 header.response_cmd 匹配对应 construct 解析模板(K线/分笔/财务模板)
  4. 转换原始二进制数值为业务可读数据(价格、时间、成交量转换逻辑在 util/conv.py)

4.5 典型逆向协议坑(源码中体现)

  1. 价格放大1000倍存储:原始数据是整数,真实价格 = raw / 1000
  2. 日期用数字存储 20260616,时间 093000
  3. 大量字节填充位 Padding,抓包时存在无效占位字节,解析必须跳过否则偏移错乱
  4. 大数据分片返回:拉取多年K线时服务器分批返回多段报文,客户端循环接收拼接

五、工具层 util/ 数据转换核心源码

5.1 conv.py 转换函数(高频调用)

  1. convert_price(raw_price) TDX 内部价格放大1000倍存储,还原真实浮点数: python def convert_price(v): return v / 1000
  2. int_to_date(num) 数字20260616 → datetime.date 对象
  3. int_to_time(num) 93000 → 09:30:00
  4. vol_to_amount(vol, price) 成交量、成交额单位换算(通达信单位:手、元)

5.2 stock_code.py 市场区分

根据股票代码前缀区分沪/深/创业板/科创板: - 6开头:沪市 market=1 - 0/3开头:深市 market=0 - 8开头:北交所

六、业务封装层 hq/hqclient.py HQApi 高层客户端

用户实际实例化使用的 HQApi(),继承底层 TdxClient,封装所有业务逻辑,屏蔽协议细节。

6.1 核心设计

  1. 内置 seq 序列号自增,每次请求自动+1,用于匹配应答包
  2. 统一模板分发:每个业务方法内部构造对应 construct 请求结构体
  3. 分片数据循环读取:拉取大量K线时 while 循环接收分片,直到服务器返回结束标记
  4. 原始报文数据标准化清洗:统一转为 list[dict] 格式,字段统一(code, date, open, close, high, low, vol 等)

6.2 典型方法源码逻辑:get_k_line()

简化伪代码:

def get_k_line(self, market, code, start, end, ktype):
    # 1. 构造请求参数字典
    req_data = {
        "market": market,
        "code": code,
        "start_date": date_to_int(start),
        "end_date": date_to_int(end),
        "ktype": ktype
    }
    # 2. 使用对应协议模板打包请求报文
    req_packet = KLineRequest.build(req_data)
    full_msg = PacketHeader.build(头部信息) + req_packet

    # 3. 底层发送TCP报文
    self.send_msg(full_msg)

    data_list = []
    while True:
        # 循环接收分片数据包
        header_bytes = self.recv_msg(头部长度)
        header = PacketHeader.parse(header_bytes)
        body = self.recv_msg(header.packet_len - 头部长度)
        resp = KLineResponse.parse(body)
        # 拼接当前分片K线数据
        data_list.extend(resp.records)
        # 服务器标记无更多分片则退出循环
        if resp.is_finish:
            break
    # 4. 批量转换价格、日期、格式化
    result = []
    for item in data_list:
        row = {
            "code": code,
            "date": int_to_date(item.date),
            "open": convert_price(item.open),
            "close": convert_price(item.close),
            "high": convert_price(item.high),
            "low": convert_price(item.low),
            "vol": item.vol
        }
        result.append(row)
    return result

6.3 关键特性:分片处理

通达信服务器单次返回K线条数有限(约800根),跨多年查询会分成多个数据包下发,源码中通过 is_finish 标志循环读取,上层用户无感知。

七、对外API层 hq/api.py

极简函数封装,面向普通用户,无需手动实例化 HQApi,自动管理连接、自动选择服务器池。

def get_k_data(code, start, end, ktype="D"):
    api = HQApi()
    # 自动轮询可用行情服务器列表
    for server in get_server_list():
        try:
            api.connect(server.ip, server.port)
            res = api.get_k_line(...)
            api.close()
            return res
        except Exception:
            continue
    raise Exception("无可用服务器")

内置多组通达信公共服务器,连接失败自动切换,降低用户使用门槛。

八、reader 离线模块(独立分支逻辑)

不依赖TCP,直接读取本地通达信软件导出的 .day(日线)、.lc5(5分钟线)二进制文件,协议解析逻辑和在线行情复用同一套 construct 模板,仅去掉TCP收发,直接读取本地文件字节流。

九、整体数据流全链路

用户调用 get_k_data("000001", "20260101", "20260601") 1. api.py 创建 HQApi 实例 2. 自动连接通达信行情TCP服务器 3. HQApi 内部组装 K线请求结构体(protocol construct) 4. TdxClient 发送二进制TCP报文 5. 阻塞等待服务端返回字节流 6. 循环接收分片数据包,construct 反序列化原始数据 7. util 工具转换价格、日期、成交量 8. 清洗为标准字典列表返回用户 9. 关闭socket连接

十、源码核心优缺点设计分析

优点

  1. 分层解耦:通信、协议、业务、工具完全隔离,修改协议只改动 protocol.py
  2. construct 声明式协议:相比手动 struct 可读性极强,新增接口只需要补充结构体模板
  3. 内置服务器池、自动切换、心跳保活,开箱即用
  4. 同时支持在线TCP拉取 + 本地离线文件读取
  5. 纯Python无C扩展,Windows/Linux/Mac全平台兼容

源码设计缺陷(源码中明显存在)

  1. 同步阻塞IO,批量多股票拉取性能差,无原生异步支持
  2. 无连接池,每次调用API新建/销毁Socket,频繁查询有性能损耗
  3. 服务器为公共免费通达信节点,存在限流、断线、延迟,无负载均衡重试优化
  4. 协议完全依赖逆向,通达信更新协议后会直接失效,无官方兼容保障
  5. 数据格式化固定,不支持自定义输出格式(DataFrame 需要用户自行转换)
  6. 缺乏并发安全,多线程共用同一个 HQApi 实例会产生报文错乱(序列号、收发字节冲突)

十一、扩展改造思路(基于源码架构)

  1. 封装连接池:复用TCP连接,减少频繁握手开销
  2. 基于 asyncio 重写底层 TdxClient,实现异步并发拉取多股票
  3. 增加缓存层(Redis),缓存日K减少重复请求服务器
  4. 增加重试、超时、限流封装,提升稳定性
  5. 内置 pandas 自动转换,直接返回 DataFrame
  6. 增加私有行情服务器配置支持

十二、核心关键文件速查清单

文件 核心作用
base/client.py TCP Socket 底层通信
base/protocol.py 所有TDX二进制报文结构体定义
base/parser.py construct 解析器顶层封装
hq/hqclient.py 高层行情业务逻辑,核心业务类
hq/api.py 对外简易API入口
util/conv.py 价格、日期数值转换工具
util/stock_code.py 股票市场区分
base/constants.py 指令码、服务器、周期常量