Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Apply review comments
  • Loading branch information
Kobzol committed Jan 7, 2025
commit 5a95dda44ab42d4d2cdb199a2fa9ac8ba1d7a3f0
151 changes: 93 additions & 58 deletions src/ci/github-actions/ci.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@ def add_job_properties(jobs: List[Dict], prefix: str) -> List[Job]:
"""
modified_jobs = []
for job in jobs:
job = dict(job)
job["image"] = get_job_image(job)
job["name"] = f"{prefix} - {job['name']}"
modified_jobs.append(job)
# Create a copy of the `job` dictionary to avoid modifying `jobs`
new_job = dict(job)
new_job["image"] = get_job_image(new_job)
new_job["name"] = f"{prefix} - {new_job['name']}"
modified_jobs.append(new_job)
return modified_jobs


Expand All @@ -46,11 +47,15 @@ def add_base_env(jobs: List[Job], environment: Dict[str, str]) -> List[Job]:
Prepends `environment` to the `env` attribute of each job.
The `env` of each job has higher precedence than `environment`.
"""
modified_jobs = []
for job in jobs:
env = environment.copy()
env.update(job.get("env", {}))
job["env"] = env
return jobs

new_job = dict(job)
new_job["env"] = env
modified_jobs.append(new_job)
return modified_jobs


@dataclasses.dataclass
Expand Down Expand Up @@ -123,7 +128,9 @@ def find_run_type(ctx: GitHubCtx) -> Optional[WorkflowRunType]:

def calculate_jobs(run_type: WorkflowRunType, job_data: Dict[str, Any]) -> List[Job]:
if isinstance(run_type, PRRunType):
return add_base_env(add_job_properties(job_data["pr"], "PR"), job_data["envs"]["pr"])
return add_base_env(
add_job_properties(job_data["pr"], "PR"), job_data["envs"]["pr"]
)
elif isinstance(run_type, TryRunType):
jobs = job_data["try"]
custom_jobs = run_type.custom_jobs
Expand Down Expand Up @@ -188,95 +195,123 @@ def format_run_type(run_type: WorkflowRunType) -> str:
raise AssertionError()


def get_job_image(job) -> str:
def get_job_image(job: Job) -> str:
"""
By default, the Docker image of a job is based on its name.
However, it can be overridden by its IMAGE environment variable.
"""
return job.get("env", {}).get("IMAGE", job["name"])
env = job.get("env", {})
# Return the IMAGE environment variable if it exists, otherwise return the job name
return env.get("IMAGE", job["name"])


def run_workflow_locally(job_data: Dict[str, Any], job_name: str, pr_jobs: bool):
DOCKER_DIR = Path(__file__).absolute().parent.parent / "docker"
def is_linux_job(job: Job) -> bool:
return "ubuntu" in job["os"]

jobs = job_data["pr"] if pr_jobs else job_data["auto"]
jobs = [job for job in jobs if job.get("name") == job_name]

def find_linux_job(job_data: Dict[str, Any], job_name: str, pr_jobs: bool) -> Job:
candidates = job_data["pr"] if pr_jobs else job_data["auto"]
jobs = [job for job in candidates if job.get("name") == job_name]
if len(jobs) == 0:
raise Exception(f"Job `{job_name}` not found in {'pr' if pr_jobs else 'auto'} jobs")
available_jobs = "\n".join(
sorted(job["name"] for job in candidates if is_linux_job(job))
)
raise Exception(f"""Job `{job_name}` not found in {'pr' if pr_jobs else 'auto'} jobs.
The following jobs are available:
{available_jobs}""")
assert len(jobs) == 1

job = jobs[0]
if "ubuntu" not in job["os"]:
if not is_linux_job(job):
raise Exception("Only Linux jobs can be executed locally")
return job


def run_workflow_locally(job_data: Dict[str, Any], job_name: str, pr_jobs: bool):
DOCKER_DIR = Path(__file__).absolute().parent.parent / "docker"

job = find_linux_job(job_data, job_name=job_name, pr_jobs=pr_jobs)

custom_env = {}
custom_env["DEPLOY"] = "1"
# Replicate src/ci/scripts/setup-environment.sh
# Adds custom environment variables to the job
if job_name.startswith("dist-"):
if job_name.endswith("-alt"):
custom_env["DEPLOY_ALT"] = "1"
else:
custom_env["DEPLOY"] = "1"
custom_env.update({k: str(v) for (k, v) in job.get("env", {}).items()})

args = [
str(DOCKER_DIR / "run.sh"),
get_job_image(job)
]
args = [str(DOCKER_DIR / "run.sh"), get_job_image(job)]
env_formatted = [f"{k}={v}" for (k, v) in sorted(custom_env.items())]
print(f"Executing `{' '.join(env_formatted)} {' '.join(args)}`")

env = os.environ.copy()
env.update(custom_env)

process = subprocess.Popen(args, env=env)
try:
process.wait()
except KeyboardInterrupt:
process.kill()
subprocess.run(args, env=env)


if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
def calculate_job_matrix(job_data: Dict[str, Any]):
github_ctx = get_github_ctx()

with open(JOBS_YAML_PATH) as f:
data = yaml.safe_load(f)
run_type = find_run_type(github_ctx)
logging.info(f"Job type: {run_type}")

with open(CI_DIR / "channel") as f:
channel = f.read().strip()

jobs = []
if run_type is not None:
jobs = calculate_jobs(run_type, job_data)
jobs = skip_jobs(jobs, channel)

if not jobs:
raise Exception("Scheduled job list is empty, this is an error")

run_type = format_run_type(run_type)

logging.info(f"Output:\n{yaml.dump(dict(jobs=jobs, run_type=run_type), indent=4)}")
print(f"jobs={json.dumps(jobs)}")
print(f"run_type={run_type}")


def create_cli_parser():
parser = argparse.ArgumentParser(
prog="ci.py",
description="Generate or run CI workflows"
prog="ci.py", description="Generate or run CI workflows"
)
subparsers = parser.add_subparsers(
help="Command to execute", dest="command", required=True
)
subparsers.add_parser(
"calculate-job-matrix",
help="Generate a matrix of jobs that should be executed in CI",
)
run_parser = subparsers.add_parser(
"run-local", help="Run a CI jobs locally (on Linux)"
)
generate_matrix = argparse.ArgumentParser()
subparsers = parser.add_subparsers(help="Command to execute", dest="command", required=True)
subparsers.add_parser("calculate-job-matrix")
run_parser = subparsers.add_parser("run-local")
run_parser.add_argument(
"job_name",
help="CI job that should be executed. By default, a merge (auto) "
"job with the given name will be executed"
"job with the given name will be executed",
)
run_parser.add_argument(
"--pr",
action="store_true",
help="Run a PR job instead of an auto job"
"--pr", action="store_true", help="Run a PR job instead of an auto job"
)
args = parser.parse_args()

if args.command == "calculate-job-matrix":
github_ctx = get_github_ctx()
return parser

run_type = find_run_type(github_ctx)
logging.info(f"Job type: {run_type}")

with open(CI_DIR / "channel") as f:
channel = f.read().strip()

jobs = []
if run_type is not None:
jobs = calculate_jobs(run_type, data)
jobs = skip_jobs(jobs, channel)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)

if not jobs:
raise Exception("Scheduled job list is empty, this is an error")
with open(JOBS_YAML_PATH) as f:
data = yaml.safe_load(f)

run_type = format_run_type(run_type)
parser = create_cli_parser()
args = parser.parse_args()

logging.info(f"Output:\n{yaml.dump(dict(jobs=jobs, run_type=run_type), indent=4)}")
print(f"jobs={json.dumps(jobs)}")
print(f"run_type={run_type}")
if args.command == "calculate-job-matrix":
calculate_job_matrix(data)
elif args.command == "run-local":
run_workflow_locally(data, args.job_name, args.pr)
else:
Expand Down