From 158402fe5a63ca9bdf56e8316f6dc74d6d11e93b Mon Sep 17 00:00:00 2001 From: "lin.dongzhao" <542698096@qq.com> Date: Fri, 23 Aug 2024 11:40:43 +0800 Subject: [PATCH] update --- rqalpha/cmds/bundle.py | 6 ++--- rqalpha/data/bundle.py | 60 ++++++++++++++++++++++++------------------ 2 files changed, 38 insertions(+), 28 deletions(-) diff --git a/rqalpha/cmds/bundle.py b/rqalpha/cmds/bundle.py index ed2fbdee4..d93f0f248 100644 --- a/rqalpha/cmds/bundle.py +++ b/rqalpha/cmds/bundle.py @@ -90,9 +90,9 @@ def update_bundle(data_bundle_path, rqdatac_uri, compression, concurrency): return 1 from rqalpha.data.bundle import update_bundle as update_bundle_ - status = update_bundle_(os.path.join(data_bundle_path, 'bundle'), False, compression, concurrency) - if status != 0: - sys.exit(status) + succeed = update_bundle_(os.path.join(data_bundle_path, 'bundle'), False, compression, concurrency) + if not succeed: + sys.exit(1) @cli.command(help=_("Download bundle (monthly updated)")) diff --git a/rqalpha/data/bundle.py b/rqalpha/data/bundle.py index 9750b8bc4..96fdb7cb5 100644 --- a/rqalpha/data/bundle.py +++ b/rqalpha/data/bundle.py @@ -20,6 +20,9 @@ from itertools import chain from typing import Callable, Optional, Union, List from filelock import FileLock, Timeout +import multiprocessing +from multiprocessing.sharedctypes import Synchronized +from ctypes import c_bool import h5py import numpy as np @@ -31,8 +34,7 @@ from rqalpha.utils.logger import init_logger, system_log from rqalpha.environment import Environment from rqalpha.model.instrument import Instrument -import multiprocessing -from multiprocessing.sharedctypes import Synchronized + START_DATE = 20050104 END_DATE = 29991231 @@ -313,24 +315,32 @@ def __call__(self, path, fields, **kwargs): class GenerateDayBarTask(DayBarTask): def __call__(self, path, fields, **kwargs): - with h5py.File(path, 'w') as h5: - i, step = 0, 300 - while True: - order_book_ids = self._order_book_ids[i:i + step] - df = rqdatac.get_price(order_book_ids, START_DATE, datetime.date.today(), '1d', - adjust_type='none', fields=fields, expect_df=True) - if not (df is None or df.empty): - df.reset_index(inplace=True) - df['datetime'] = [convert_date_to_int(d) for d in df['date']] - del df['date'] - df.set_index(['order_book_id', 'datetime'], inplace=True) - df.sort_index(inplace=True) - for order_book_id in df.index.levels[0]: - h5.create_dataset(order_book_id, data=df.loc[order_book_id].to_records(), **kwargs) - i += step - yield len(order_book_ids) - if i >= len(self._order_book_ids): - break + try: + h5 = h5py.File(path, "w") + except OSError: + system_log.error("File {} update failed, if it is using, please update later, " + "or you can delete then update again".format(path)) + sval.value = False + yield 1 + else: + with h5: + i, step = 0, 300 + while True: + order_book_ids = self._order_book_ids[i:i + step] + df = rqdatac.get_price(order_book_ids, START_DATE, datetime.date.today(), '1d', + adjust_type='none', fields=fields, expect_df=True) + if not (df is None or df.empty): + df.reset_index(inplace=True) + df['datetime'] = [convert_date_to_int(d) for d in df['date']] + del df['date'] + df.set_index(['order_book_id', 'datetime'], inplace=True) + df.sort_index(inplace=True) + for order_book_id in df.index.levels[0]: + h5.create_dataset(order_book_id, data=df.loc[order_book_id].to_records(), **kwargs) + i += step + yield len(order_book_ids) + if i >= len(self._order_book_ids): + break class UpdateDayBarTask(DayBarTask): @@ -362,7 +372,7 @@ def __call__(self, path, fields, **kwargs): except OSError: system_log.error("File {} update failed, if it is using, please update later, " "or you can delete then update again".format(path)) - sval.value = 1 + sval.value = False yield 1 else: is_futures = "futures" == os.path.basename(path).split(".")[0] @@ -375,7 +385,7 @@ def __call__(self, path, fields, **kwargs): except OSError: system_log.error("File {} update failed, if it is using, please update later, " "or you can delete then update again".format(path)) - sval.value = 1 + sval.value = False yield 1 break except ValueError: @@ -446,16 +456,16 @@ def update_bundle(path, create, enable_compression=False, concurrency=1): gen_suspended_days, gen_yield_curve, gen_share_transformation, gen_future_info ) - status_code = multiprocessing.Value("i", 0) + succeed = multiprocessing.Value(c_bool, True) with ProgressedProcessPoolExecutor( - max_workers=concurrency, initializer=process_init, initargs=(status_code, ) + max_workers=concurrency, initializer=process_init, initargs=(succeed, ) ) as executor: # windows上子进程需要执行rqdatac.init, 其他os则需要执行rqdatac.reset; rqdatac.init包含了rqdatac.reset的功能 for func in gen_file_funcs: executor.submit(GenerateFileTask(func), path) for file, order_book_id, field in day_bar_args: executor.submit(_DayBarTask(order_book_id), os.path.join(path, file), field, **kwargs) - return status_code.value + return succeed.value class AutomaticUpdateBundle(object):