-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathwestock_wrapper.py
More file actions
336 lines (291 loc) · 11.1 KB
/
Copy pathwestock_wrapper.py
File metadata and controls
336 lines (291 loc) · 11.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
#!/usr/bin/env python3
"""
westock-data wrapper for ma_analysis.py
将 yfinance 格式转换为 westock-data 格式
"""
import subprocess
import json
import os
from datetime import datetime, timedelta
import sys
from stock_analysis.data import normalize_provider_ticker
# 代码格式转换映射
TICKER_MAP = {
'0700.HK': 'hk00700',
'1810.HK': 'hk01810',
'9992.HK': 'hk09992',
'0981.HK': 'hk00981', # 中芯国际
'688256.SS': 'sh688256', # 寒武纪
'600703.SS': 'sh600703', # 三安光电
'001391.SZ': 'sz001391', # 国货航
'NVDA': 'usNVDA',
'TSLA': 'usTSLA',
'QQQ': 'usQQQ',
'PDD': 'usPDD',
}
DEFAULT_WESTOCK_DATA_SCRIPT = '/root/.openclaw/workspace/skills/westock-data/scripts/index.js'
MIN_INTRADAY_ROWS = 120
def convert_ticker(ticker):
"""转换 ticker 格式: 0700.HK -> hk00700"""
if ticker in TICKER_MAP:
return TICKER_MAP[ticker]
return normalize_provider_ticker(ticker)
def build_westock_command(ticker, period='day', limit=500):
"""Build the westock-data CLI command.
WESTOCK_DATA_SCRIPT lets this repo run outside the original OpenClaw path.
"""
ws_ticker = convert_ticker(ticker)
script = os.environ.get('WESTOCK_DATA_SCRIPT', DEFAULT_WESTOCK_DATA_SCRIPT)
return [
'node',
script,
'kline',
ws_ticker,
period,
str(limit)
]
def fetch_kline(ticker, period='day', limit=500):
"""
调用 westock-data 获取 K线数据
返回 pandas DataFrame,列名与 yfinance 兼容
"""
import pandas as pd
cmd = build_westock_command(ticker, period, limit)
if 'WESTOCK_DATA_SCRIPT' not in os.environ and not os.path.exists(cmd[1]):
return pd.DataFrame()
try:
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
output = result.stdout.strip()
if not output or 'date' not in output:
return pd.DataFrame()
# 解析 Markdown 表格
lines = output.strip().split('\n')
if len(lines) < 3:
return pd.DataFrame()
# 跳过表头和分隔符
data_lines = lines[2:]
rows = []
for line in data_lines:
parts = [p.strip() for p in line.split('|')[1:-1]] # 去掉首尾空格
if len(parts) >= 7:
rows.append(parts)
if not rows:
return pd.DataFrame()
# 构建 DataFrame
df = pd.DataFrame(rows, columns=['date', 'open', 'last', 'high', 'low', 'volume', 'amount', 'exchange'])
# 转换数据类型
df['date'] = pd.to_datetime(df['date'])
df['Open'] = pd.to_numeric(df['open'], errors='coerce')
df['Close'] = pd.to_numeric(df['last'], errors='coerce')
df['High'] = pd.to_numeric(df['high'], errors='coerce')
df['Low'] = pd.to_numeric(df['low'], errors='coerce')
df['Volume'] = pd.to_numeric(df['volume'], errors='coerce')
# 设置索引
df.set_index('date', inplace=True)
# 只保留需要的列
df = df[['Open', 'High', 'Low', 'Close', 'Volume']]
# ⚠️ westock 返回倒序(最新在前),需要反转为正序(最新在后)
df = df.sort_index()
return df
except subprocess.CalledProcessError as e:
print(f"Error fetching {ticker}: {e.stderr}", file=sys.stderr)
return pd.DataFrame()
except Exception as e:
print(f"Error parsing {ticker}: {str(e)}", file=sys.stderr)
return pd.DataFrame()
def fetch_yfinance(ticker, period='1y', start=None, end=None, interval='1d', progress=False):
"""Fallback to yfinance when westock-data is unavailable or returns no data."""
import yfinance as _yf
return _yf.download(
ticker,
period=period if start is None and end is None else None,
start=start,
end=end,
interval=interval,
progress=progress,
auto_adjust=False,
)
def fetch_yahoo_chart(ticker, period='1y', start=None, end=None, interval='1d'):
"""Fetch OHLCV data from Yahoo's chart JSON endpoint without yfinance cookies."""
import pandas as pd
import requests
url = f'https://query1.finance.yahoo.com/v8/finance/chart/{ticker}'
params = {'interval': interval}
if start or end:
start_ts = int(pd.to_datetime(start or '1970-01-01').timestamp())
end_ts = int(pd.to_datetime(end or datetime.now()).timestamp())
params.update({'period1': start_ts, 'period2': end_ts})
else:
params['range'] = period
try:
response = requests.get(url, params=params, timeout=10, headers={'User-Agent': 'Mozilla/5.0'})
response.raise_for_status()
payload = response.json()
result = ((payload.get('chart') or {}).get('result') or [None])[0]
if not result:
return pd.DataFrame()
timestamps = result.get('timestamp') or []
quote = (((result.get('indicators') or {}).get('quote') or [{}])[0])
rows = {
'Open': quote.get('open') or [],
'High': quote.get('high') or [],
'Low': quote.get('low') or [],
'Close': quote.get('close') or [],
'Volume': quote.get('volume') or [],
}
if not timestamps or any(len(values) != len(timestamps) for values in rows.values()):
return pd.DataFrame()
df = pd.DataFrame(rows, index=pd.to_datetime(timestamps, unit='s'))
df.index.name = 'date'
return df.apply(pd.to_numeric, errors='coerce').dropna().sort_index()
except Exception as e:
print(f"Error fetching Yahoo chart {ticker}: {str(e)}", file=sys.stderr)
return pd.DataFrame()
def fetch_tencent_kline(ticker, period='day', limit=500):
"""Fetch OHLCV data from Tencent's appstock kline endpoint."""
import pandas as pd
import requests
ws_ticker = convert_ticker(ticker)
if not ws_ticker.startswith(('hk', 'sh', 'sz', 'bj')):
return pd.DataFrame()
urls = [
'https://web.ifzq.gtimg.cn/appstock/app/fqkline/get',
'https://proxy.finance.qq.com/ifzqgtimg/appstock/app/fqkline/get',
]
params = {'param': f'{ws_ticker},{period},,,{limit},qfq'}
last_error = None
for url in urls:
try:
response = requests.get(url, params=params, timeout=10, headers={'User-Agent': 'Mozilla/5.0'})
response.raise_for_status()
payload = json.loads(response.text)
if payload.get('code') != 0:
continue
stock_data = (payload.get('data') or {}).get(ws_ticker) or {}
rows = stock_data.get(period) or stock_data.get(f'qfq{period}') or []
if not rows:
continue
df = pd.DataFrame(rows)
df = df.iloc[:, :6]
df.columns = ['date', 'open', 'close', 'high', 'low', 'volume']
df['date'] = pd.to_datetime(df['date'])
df['Open'] = pd.to_numeric(df['open'], errors='coerce')
df['High'] = pd.to_numeric(df['high'], errors='coerce')
df['Low'] = pd.to_numeric(df['low'], errors='coerce')
df['Close'] = pd.to_numeric(df['close'], errors='coerce')
df['Volume'] = pd.to_numeric(df['volume'], errors='coerce')
df.set_index('date', inplace=True)
df = df[['Open', 'High', 'Low', 'Close', 'Volume']]
return df.dropna().sort_index()
except Exception as e:
last_error = e
if last_error:
print(f"Error fetching Tencent kline {ticker}: {str(last_error)}", file=sys.stderr)
return pd.DataFrame()
def _annotate_download(df, source, ticker, period, interval):
if not hasattr(df, 'attrs'):
return df
try:
rows = len(df)
except TypeError:
rows = 0
latest = ""
try:
if rows:
latest_value = df.index[-1]
latest = latest_value.strftime('%Y-%m-%d') if hasattr(latest_value, 'strftime') else str(latest_value)
except Exception:
latest = ""
df.attrs.update(
{
'source': source,
'ticker': ticker,
'period': period,
'interval': interval,
'rows': rows,
'latest': latest,
}
)
return df
def download(ticker, period='1y', start=None, end=None, interval='1d', progress=False):
"""
模拟 yfinance.download() 接口
"""
# 计算需要的数据量
if period == 'max':
limit = 2000
elif period.endswith('y'):
years = int(period[:-1])
limit = years * 250 # 每年约250个交易日
elif period.endswith('mo'):
months = int(period[:-2])
limit = months * 21 # 每月约21个交易日
elif period.endswith('d'):
days = int(period[:-1])
limit = days
else:
limit = 500
if interval == '1h':
if period.endswith('y'):
limit = int(period[:-1]) * 250 * 6
elif period.endswith('mo'):
limit = int(period[:-2]) * 21 * 6
elif period.endswith('d'):
limit = int(period[:-1]) * 6
# 转换 interval
if interval == '1d':
ws_period = 'day'
elif interval == '1wk':
ws_period = 'week'
elif interval == '1mo':
ws_period = 'month'
elif interval == '1h':
ws_period = 'm60'
else:
ws_period = 'day'
def _has_enough_rows(frame):
if getattr(frame, 'empty', True):
return False
if interval == '1h':
try:
return len(frame) >= MIN_INTRADAY_ROWS
except TypeError:
return True
return True
source = 'yfinance'
df = fetch_kline(ticker, ws_period, limit)
if not _has_enough_rows(df):
df = type(df)() if hasattr(df, 'empty') else df
elif not getattr(df, 'empty', True):
source = 'westock'
ws_ticker = convert_ticker(ticker)
if getattr(df, 'empty', True) and ws_ticker.startswith(('hk', 'sh', 'sz', 'bj')):
df = fetch_tencent_kline(ticker, ws_period, limit)
if not _has_enough_rows(df):
df = type(df)() if hasattr(df, 'empty') else df
elif not getattr(df, 'empty', True):
source = 'tencent'
if getattr(df, 'empty', True):
df = fetch_yahoo_chart(ticker, period=period, start=start, end=end, interval=interval)
if not getattr(df, 'empty', True):
source = 'yahoo_chart'
if getattr(df, 'empty', True):
df = fetch_yfinance(ticker, period=period, start=start, end=end, interval=interval, progress=progress)
source = 'yfinance'
# 如果指定了 start/end,过滤数据
if not df.empty:
if start:
import pandas as pd
df = df[df.index >= pd.to_datetime(start)]
if end:
import pandas as pd
df = df[df.index <= pd.to_datetime(end)]
return _annotate_download(df, source, ticker, period, interval)
if __name__ == '__main__':
# 测试
if len(sys.argv) > 1:
ticker = sys.argv[1]
df = download(ticker, period='5d')
print(df)
else:
print("Usage: python3 westock_wrapper.py <ticker>")