数据提供者 API
本文档介绍 Vibe Trading 的标准化交易所数据提供者(Provider)API,这是一个内部Python API,用于统一访问不同交易所的数据。
概述
设计目标
- 统一接口:所有交易所通过相同的接口访问
- 可扩展性:轻松添加新的交易所支持
- 向后兼容:保留现有工具函数,无缝迁移
- 类型安全:使用标准化的数据模型
架构
核心 API
1. ProviderFactory
工厂类,用于创建和管理交易所数据提供者实例。
导入
python
from vibe_trading.data_sources.providers.factory import ProviderFactory方法
get_provider(exchange: str) -> ExchangeProvider
获取或创建交易所Provider实例(单例模式)。
参数:
exchange(str): 交易所名称(如 "binance")
返回:
ExchangeProvider: Provider实例
示例:
python
# 获取 Binance Provider
provider = await ProviderFactory.get_provider("binance")
# 使用Provider
price = await provider.get_current_price("BTCUSDT")
# 清理资源(可选,程序退出时自动清理)
await ProviderFactory.close_provider("binance")create_provider(exchange: str, config: ExchangeConfig) -> ExchangeProvider
使用自定义配置创建Provider实例。
参数:
exchange(str): 交易所名称config(ExchangeConfig): 交易所配置对象
返回:
ExchangeProvider: Provider实例
close_all()
关闭所有Provider实例并释放资源。
python
await ProviderFactory.close_all()list_active_providers() -> list
列出所有活跃的Provider。
python
active = ProviderFactory.list_active_providers()
print(f"活跃的Provider: {active}")2. ExchangeProvider
所有交易所Provider必须实现的抽象接口。
核心方法
async connect() -> None
建立与交易所的连接。
async disconnect() -> None
断开所有连接并释放资源。
async get_klines(symbol: str, interval: str, limit: int = 100, start_time: Optional[int] = None, end_time: Optional[int] = None) -> List[StandardKline]
获取K线数据。
参数:
symbol(str): 交易对符号interval(str): K线间隔(如 "1m", "5m", "1h")limit(int): 获取数量(默认100)start_time(int, 可选): 开始时间(毫秒时间戳)end_time(int, 可选): 结束时间(毫秒时间戳)
返回:
List[StandardKline]: 标准化K线数据列表
示例:
python
klines = await provider.get_klines("BTCUSDT", "5m", 50)
for kline in klines:
print(f"时间: {kline.open_datetime}, 收盘: {kline.close}")async get_ticker(symbol: str) -> StandardTicker
获取24小时行情数据。
async get_orderbook(symbol: str, limit: int = 20) -> StandardOrderBook
获取订单簿数据。
async get_current_price(symbol: str) -> float
获取当前价格。
async subscribe_klines(symbol: str, interval: str, callback: Callable) -> None
订阅K线数据更新。
async unsubscribe_klines(symbol: str, interval: str) -> None
取消订阅K线数据。
3. 数据模型
StandardKline
标准化的K线数据模型。
python
@dataclass
class StandardKline:
exchange: str # 交易所标识
symbol: str # 交易对
interval: str # K线间隔
open_time: int # 开盘时间(毫秒)
open: float # 开盘价
high: float # 最高价
low: float # 最低价
close: float # 收盘价
volume: float # 成交量
close_time: int # 收盘时间(毫秒)
quote_volume: float # 成交额
trades: int # 成交笔数
taker_buy_base: float # 主动买入量
taker_buy_quote: float # 主动买入额
is_final: bool # 是否已完成StandardTicker
标准化的24小时行情数据。
StandardOrderBook
标准化的订单簿数据。
使用示例
基础使用
python
import asyncio
from vibe_trading.data_sources.providers.factory import ProviderFactory
async def main():
# 创建Provider
provider = await ProviderFactory.get_provider("binance")
# 获取价格
price = await provider.get_current_price("BTCUSDT")
print(f"BTC价格: ${price}")
# 获取K线
klines = await provider.get_klines("BTCUSDT", "5m", 10)
for k in klines:
print(f"OHLCV: {kline.open}, {kline.high}, {kline.low}, {kline.close}, {kline.volume}")
# 清理
await ProviderFactory.close_all()
asyncio.run(main())订阅K线数据
python
async def on_kline_update(kline: StandardKline):
"""K线数据回调"""
print(f"新K线: {kline.symbol} @ {kline.close}")
async def main():
provider = await ProviderFactory.get_provider("binance")
# 订阅K线
await provider.subscribe_klines("BTCUSDT", "5m", on_kline_update)
# 保持连接以接收更新
await asyncio.sleep(3600) # 运行1小时
# 取消订阅
await provider.unsubscribe_klines("BTCUSDT", "5m")
await ProviderFactory.close_all()
asyncio.run(main())健康检查
python
provider = await ProviderFactory.get_provider("binance")
# 执行健康检查
is_healthy = await provider.health_check()
print(f"连接状态: {provider.status}")
print(f"是否健康: {'是' if is_healthy else '否'}")数据模型转换
转换为字典
python
kline = await provider.get_klines("BTCUSDT", "5m", 1)
k = klines[0]
# 转换为字典
data = k.to_dict()
print(data)
# 输出: {
# 'exchange': 'binance',
# 'symbol': 'BTCUSDT',
# 'open_time': 1234567890000,
# 'open': 50000.0,
# ...
# }转换为传统格式
python
from vibe_trading.tools.market_data_tools import convert_standard_to_legacy
std_kline = await provider.get_klines("BTCUSDT", "5m", 1)[0]
# 转换为传统Kline格式
legacy_kline = convert_standard_to_legacy(std_kline)
print(legacy_kline)向后兼容
现有工具函数仍然可用
python
# 这些函数现在内部使用Provider,但接口保持不变
from vibe_trading.tools.market_data_tools import (
get_current_price,
get_kline_data,
get_24hr_ticker,
get_order_book
)
# 使用方式完全相同
result = await get_current_price("BTCUSDT")
print(result['price'])Agent工具兼容
所有Agent工具无需修改即可使用新的Provider接口:
python
from vibe_trading.agents.agent_tools import execute_get_current_price, GetCurrentPriceParams
# Agent工具自动使用Provider
args = GetCurrentPriceParams(symbol="BTCUSDT")
result = await execute_get_current_price("get_current_price", args)
print(result)高级用法
多交易所对比
python
async def compare_exchanges():
"""对比不同交易所的价格"""
exchanges = ["binance"] # 未来可添加 "okx", "bybit"
for exchange in exchanges:
provider = await ProviderFactory.get_provider(exchange)
try:
price = await provider.get_current_price("BTCUSDT")
print(f"{exchange}: ${price}")
except Exception as e:
print(f"{exchange}: 错误 - {e}")自定义配置
python
from vibe_trading.data_sources.exchange_config import BinanceExchangeConfig
# 创建自定义配置
config = BinanceExchangeConfig(
exchange_type="binance",
environment="mainnet",
api_key="your_key",
api_secret="your_secret",
enable_websocket=True,
enable_rest=True,
)
# 使用自定义配置
provider = await ProviderFactory.create_provider("binance", config)错误处理
python
from vibe_trading.data_sources.providers.registry import ProviderRegistry
async def safe_get_price(exchange: str, symbol: str):
"""安全获取价格"""
# 检查交易所是否支持
if not ProviderRegistry.is_registered(exchange):
raise ValueError(f"不支持的交易所: {exchange}")
provider = await ProviderFactory.get_provider(exchange)
try:
price = await provider.get_current_price(symbol)
return price
except Exception as e:
print(f"获取价格失败: {e}")
return None
finally:
await ProviderFactory.close_provider(exchange)性能优化
连接复用
Provider实例会自动缓存,重复获取同一交易所的Provider会返回同一实例,提高性能。
python
# 第一次调用:创建实例
provider1 = await ProviderFactory.get_provider("binance")
# 第二次调用:返回同一实例
provider2 = await ProviderFactory.get_provider("binance")
assert provider1 is provider2 # True - 同一实例批量操作
python
# 并行获取多个交易对的价格
import asyncio
async def get_multiple_prices(symbols: list[str]):
provider = await ProviderFactory.get_provider("binance")
tasks = [
provider.get_current_price(symbol)
for symbol in symbols
]
prices = await asyncio.gather(*tasks)
return dict(zip(symbols, prices))
prices = await get_multiple_prices(["BTCUSDT", "ETHUSDT", "SOLUSDT"])
print(prices)