From 8f8287a8a7afeda151b7a882bc0cd04fc5a62dfb Mon Sep 17 00:00:00 2001 From: Tom Schreiber Date: Mon, 30 Oct 2023 11:10:05 +0000 Subject: [PATCH] Some fixes --- large_data_loads/src/worker.py | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/large_data_loads/src/worker.py b/large_data_loads/src/worker.py index 9059b5f0..7fd5d0c2 100644 --- a/large_data_loads/src/worker.py +++ b/large_data_loads/src/worker.py @@ -20,7 +20,7 @@ # ----------------------------------------------------------------------------------------------------------------------- # Retries Configuration # ----------------------------------------------------------------------------------------------------------------------- -retry_tries = 6 +retry_tries = 10 retry_delay = 60 retry_backoff = 1.5 @@ -127,13 +127,13 @@ def signal_handler(sig, frame): # As this runs in another thread (compared to the main thread) we need a different # client with a different session id - otherwise ClickHouse will complain - common.set_setting('autogenerate_session_id', False) client = clickhouse_connect.get_client( host=args['host'], port=args['port'], username=args['username'], password=args['password'], - secure=True) + secure=True, + session_id=args['worker_id'] + '_cleanup' ) drop_staging_tables(staging_tables, client) @@ -153,6 +153,7 @@ def main(): username=args['username'], password=args['password'], secure=True, + session_id=args['worker_id'], settings={ 'keeper_map_strict_mode': 1 }) @@ -260,7 +261,11 @@ def load_files_atomically(file_list, staging_tables, configuration, client): try: _load_files(file_list, staging_tables, configuration, client) except Exception as err: - truncate_tables(staging_tables, client) + try: + truncate_tables(staging_tables, client) + except Error as err: + # if all retries are exceeded, stop the worker as we have a unrecoverable state + sys.exit(f"Unable to truncate tables for files chunk with key file: {file_list[0]}") raise logger.info(f"staging load complete")