Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

更新bundle数据支持将错误统一输出 #895

Merged
merged 3 commits into from
Aug 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 17 additions & 7 deletions rqalpha/data/bundle.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from rqalpha.utils.datetime_func import convert_date_to_date_int, convert_date_to_int
from rqalpha.utils.i18n import gettext as _
from rqalpha.utils.functools import lru_cache
from rqalpha.utils.logger import init_logger, system_log
from rqalpha.environment import Environment
from rqalpha.model.instrument import Instrument

Expand Down Expand Up @@ -315,7 +316,7 @@ def __call__(self, path, fields, **kwargs):
while True:
order_book_ids = self._order_book_ids[i:i + step]
df = rqdatac.get_price(order_book_ids, START_DATE, datetime.date.today(), '1d',
adjust_type='none', fields=fields, expect_df=True)
adjust_type='none', fields=fields, expect_df=True)
if not (df is None or df.empty):
df.reset_index(inplace=True)
df['datetime'] = [convert_date_to_int(d) for d in df['date']]
Expand Down Expand Up @@ -356,9 +357,10 @@ def __call__(self, path, fields, **kwargs):
try:
h5 = h5py.File(path, 'a')
except OSError:
raise OSError("File {} update failed, if it is using, please update later, "
"or you can delete then update again".format(path))
try:
system_log.error("File {} update failed, if it is using, please update later, "
"or you can delete then update again".format(path))
yield 1
else:
is_futures = "futures" == os.path.basename(path).split(".")[0]
for order_book_id in self._order_book_ids:
# 特殊处理前复权合约,需要全量更新
Expand All @@ -367,8 +369,10 @@ def __call__(self, path, fields, **kwargs):
try:
last_date = int(h5[order_book_id]['datetime'][-1] // 1000000)
except OSError:
raise OSError("File {} update failed, if it is using, please update later, "
"or you can delete then update again".format(path))
system_log.error("File {} update failed, if it is using, please update later, "
"or you can delete then update again".format(path))
yield 1
break
except ValueError:
h5.pop(order_book_id)
start_date = START_DATE
Expand Down Expand Up @@ -406,12 +410,18 @@ def init_rqdatac_with_warnings_catch():
rqdatac.init()


def process_init_func():
init_rqdatac_with_warnings_catch()
init_logger()


def update_bundle(path, create, enable_compression=False, concurrency=1):
if create:
_DayBarTask = GenerateDayBarTask
else:
_DayBarTask = UpdateDayBarTask

init_logger()
kwargs = {}
if enable_compression:
kwargs['compression'] = 9
Expand All @@ -431,7 +441,7 @@ def update_bundle(path, create, enable_compression=False, concurrency=1):
)

with ProgressedProcessPoolExecutor(
max_workers=concurrency, initializer=init_rqdatac_with_warnings_catch
max_workers=concurrency, initializer=process_init_func
) as executor:
# windows上子进程需要执行rqdatac.init, 其他os则需要执行rqdatac.reset; rqdatac.init包含了rqdatac.reset的功能
for func in gen_file_funcs:
Expand Down
Binary file modified tests/outs/test_f_mean_reverting.pkl
Binary file not shown.
Loading