vn.py 数据采集与处理的完整流程如下,涵盖了从行情数据的获取、存储、分发到最终被策略或其他模块使用的全过程。具体实现机制如下:
1. 数据采集的实现机制
(1)gateway模块采集数据
- 每个市场(如CTP、IB、Futu等)有对应的 gateway 适配器(如
vnpy/gateway/ctp/ctp_gateway.py
)。 - gateway 负责:
- 连接行情服务器(API登录)
- 用户/策略通过界面或代码发出合约订阅请求(subscribe)
- 接收来自服务器的实时行情推送(如Tick、K线等)
- 将原始API数据转换为vn.py统一的数据结构(如TickData、BarData)
(2)事件驱动引擎分发数据
- gateway将采集到的行情数据封装为事件(如
EVENT_TICK
,EVENT_BAR
),推送到vn.py的事件引擎(event engine)。 - 事件引擎负责在系统内广播这些行情事件,所有注册了相关事件的模块(如策略、数据录制器等)会收到数据。
(3)数据存储与管理
- data_recorder模块
- 作为事件引擎的订阅者,监听行情事件。
- 自动将实时Tick或Bar数据保存到本地数据库(如SQLite、MySQL等)。
- data_manager模块
- 提供历史数据的查询、导出、导入等功能,辅助数据管理和回测。
(4)数据的最终消费
- 策略引擎、回测引擎等作为事件的订阅者,获取实时或历史行情数据,进行策略决策、回测或实盘交易。
2. 数据处理的完整流程图解
- 用户配置gateway并订阅合约
-
在GUI或代码中选择并订阅合约
-
gateway采集行情
- 连接行情API
-
接收并解析服务器推送的数据
-
数据标准化
-
转换为vn.py标准数据对象(TickData、BarData等)
-
事件引擎推送
-
封装为事件推送到事件系统
-
数据分发
-
策略、data_recorder、回测等模块订阅并接收事件
-
数据存储
-
data_recorder监听事件并保存到数据库
-
数据管理/导出
-
data_manager操作数据库,支持数据查询、导入、导出
-
策略/回测消费数据
- 策略引擎/回测引擎基于数据做决策、下单等
3. 典型代码片段(以 Tick 数据为例)
# 1. gateway采集到Tick数据
def on_tick(self, tick_data):
event = Event(EVENT_TICK, tick_data)
self.event_engine.put(event)
# 2. 事件引擎分发
def put(self, event):
# 广播给所有订阅该事件类型的模块
for handler in self._handlers.get(event.type, []):
handler(event)
# 3. data_recorder监听事件并保存
def process_tick_event(self, event):
tick = event.data
self.database.save_tick_data(tick)
4. 核心模块概述
gateway/
:各交易所行情与交易接口适配event/
:事件驱动引擎,核心数据流通枢纽app/data_recorder/
:行情数据自动录制与存储app/data_manager/
:本地历史数据管理工具database/
:数据库适配与操作实现
5. 总结
vn.py 的数据采集与处理流程高度模块化、解耦,采用“接口采集 → 标准化 → 事件分发 → 存储/消费”流水线,支持多市场、多数据源、高并发的数据处理,同时为策略开发、回测和实盘提供了统一的数据访问方式。
如需进一步详细源码分析或者某个环节的实现细节(如特定gateway的采集流程/事件系统的结构等),可以继续提问!