Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 58 additions & 4 deletions src/targetdb/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from datetime import datetime
from pathlib import Path

import numpy as np
import pandas as pd
from astropy.table import Table
from loguru import logger
Expand Down Expand Up @@ -1175,10 +1176,14 @@ def parse_allocation_file(input_file, output_dir=Path("."), outfile_prefix=None)


def transfer_data_from_uploader(df, config, local_dir=Path("."), force=False):
status = []
n_transfer = []
for upload_id in df["upload_id"]:
# datadirs = glob.glob(local_dir / f"????????-??????-{upload_id}")
datadirs = list(local_dir.cwd().glob(f"????????-??????-{upload_id}"))
# datadirs = list(local_dir.cwd().glob(f"????????-??????-{upload_id}"))
datadirs = list(local_dir.glob(f"????????-??????-{upload_id}"))
skip_transfer = False
# logger.info(f"{local_dir=} {local_dir.cwd()} {len(datadirs)=}")
if len(datadirs) == 1:
skip_transfer = True if not force else False
if skip_transfer:
Expand All @@ -1190,8 +1195,12 @@ def transfer_data_from_uploader(df, config, local_dir=Path("."), force=False):
f"Data directory, {datadirs[0]}, is found locally, but force transfer"
)
elif len(datadirs) > 1:
logger.error(f"Multiple data directories are found locally: {datadirs}")
raise ValueError("Multiple data directories are found locally")
logger.error(
f"Multiple data directories are found in the destination directory: {datadirs}."
)
raise ValueError(
f"Multiple data directories are found in the destination directory for {upload_id}: {datadirs}"
)
else:
logger.info(
f"Data directory for upload_id: {upload_id} is not found locally. Try transfer"
Expand Down Expand Up @@ -1243,10 +1252,55 @@ def transfer_data_from_uploader(df, config, local_dir=Path("."), force=False):

# Execute the rsync command
try:
subprocess.run(rsync_command, shell=use_shell, check=True)
proc = subprocess.run(
rsync_command,
shell=use_shell,
check=True,
stdout=subprocess.PIPE,
encoding="utf-8",
)
str_uploaded_dirs = [
line
for line in proc.stdout.splitlines()
if upload_id in line
if line.endswith("/")
]
logger.info(f"Transferred directories: {str_uploaded_dirs}")

n_dirs = len(str_uploaded_dirs)
n_transfer.append(n_dirs)

if n_dirs == 1:
status.append("success")
else:
status.append("WARNING")
except subprocess.CalledProcessError as e:
logger.error(f"Failed to transfer data for upload_id: {upload_id}")
logger.error(e)
# raise Exception(f"Failed to transfer data for upload_id: {upload_id}")
# raise
n_transfer.append(0)
status.append("FAILED")
else:
status.append("skipped")
n_transfer.append(0)

custom_status_dict = {"success": 0, "WARNING": 1, "FAILED": 3}
df_status = pd.DataFrame(
{"upload_id": df["upload_id"], "status": status, "n_transfer": n_transfer}
)
df_status.sort_values(by=["status"], key=lambda x: x.map(custom_status_dict))
df_status_out = df_status.sort_values(
by=["status"], key=lambda x: x.map(custom_status_dict)
)
logger.info(f"Transfer status: \n{df_status_out.to_string(index=False)}")

if np.all(df_status["status"] == "success"):
logger.info("All data transfer is successful.")
else:
logger.error(
"There are some issues with data transfer. Please check the status."
)


def insert_targets_from_uploader(
Expand Down