Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
WIP: more type hints
  • Loading branch information
ZetaTwo committed Jan 11, 2024
commit 36b45771f6c85ab01515111d5046ddac18fb7ab9
24 changes: 14 additions & 10 deletions challtools/cli.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
# PYTHON_ARGCOMPLETE_OK
import sys
import time
import argparse
import os
import uuid
import hashlib
import shutil
import urllib.parse
import json
from pathlib import Path
import os
import pkg_resources
import shutil
import sys
import time
import urllib.parse
import uuid

import argcomplete
import docker
import requests
import yaml
import docker
import argcomplete
from .validator import ConfigValidator, is_url

from pathlib import Path
from .constants import *
from .utils import (
_copytree,
build_chall,
Expand All @@ -34,10 +38,10 @@
start_solution,
validate_solution_output,
)
from .constants import *
from .validator import ConfigValidator, is_url


def main(passed_args=None):
def main(passed_args: Optional[List[str]] = None) -> int:
parser = argparse.ArgumentParser(
prog="challtools",
description="A tool for managing CTF challenges and challenge repositories using the OpenChallSpec",
Expand Down
36 changes: 19 additions & 17 deletions challtools/utils.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,25 @@
import docker
import hashlib
import json
import os
import re
import requests
import shutil
import subprocess
import sys
from pathlib import Path
from typing import Dict, Any, Optional, Union, List

import docker
import requests
import yaml

from .constants import *
from .validator import ConfigValidator
from pathlib import Path
from typing import Dict, Any, Optional, Union, List

class CriticalException(Exception):
pass


def process_messages(messages, verbose=False):
def process_messages(messages: List[str], verbose: bool = False) -> Dict[str, Any]:
"""Processes a list of messages from validator.ConfigValidator.validate for printing.

Args:
Expand Down Expand Up @@ -75,7 +77,7 @@ def process_messages(messages, verbose=False):
}


def get_ctf_config_path(search_start=Path(".")) -> Optional[Path]:
def get_ctf_config_path(search_start: Path = Path(".")) -> Optional[Path]:
"""Locates the global CTF configuration file (ctf.yml) and returns a path to it.

Returns:
Expand All @@ -93,7 +95,7 @@ def get_ctf_config_path(search_start=Path(".")) -> Optional[Path]:
return None


def get_config_path(search_start=Path(".")) -> Path:
def get_config_path(search_start: Path = Path(".")) -> Path:
"""Locates the challenge configuration file (challenge.yml) and returns a path to it.

Returns:
Expand Down Expand Up @@ -129,7 +131,7 @@ def load_ctf_config() -> Dict[str, Any]:
return config if config else {}


def load_config(workdir=".", search=True, cd=True) -> Dict[str, Any]:
def load_config(workdir: str = ".", search: bool = True, cd: bool = True) -> Dict[str, Any]:
"""Loads the challenge configuration file from the current directory, a specified directory, or optionally one of their parent directories. Optionally changes the working directory to the directory of the configuration file.

Args:
Expand Down Expand Up @@ -248,7 +250,7 @@ def checkdir(d):
return checkdir(root)


def get_docker_client():
def get_docker_client() -> docker.api.client.ContainerApiMixin:
"""Gets an authenticated docker client.

Returns:
Expand Down Expand Up @@ -299,7 +301,7 @@ def get_first_text_flag(config: Dict[str, Any]) -> Optional[str]:
return config["flag_format_prefix"] + text_flag + config["flag_format_suffix"]


def dockerize_string(string):
def dockerize_string(string: str) -> str:
"""Converts a string into a valid docker tag name.

Args:
Expand All @@ -317,7 +319,7 @@ def dockerize_string(string):
return string[:128]


def create_docker_name(title: str, container_name: str = None, chall_id=None):
def create_docker_name(title: str, container_name: str = None, chall_id=None) -> str:
"""Converts challenge information into a most likely unique and valid docker tag name.

Args:
Expand All @@ -341,7 +343,7 @@ def create_docker_name(title: str, container_name: str = None, chall_id=None):
return "_".join([title[:32], digest[:16]])


def format_user_service(config, service_type, **kwargs):
def format_user_service(config: Dict[str, Any], service_type: str, **kwargs) -> str:
"""Formats a string displayed to the user based on the service type and a substitution context (``display`` in the OpenChallSpec).

Args:
Expand All @@ -368,7 +370,7 @@ def format_user_service(config, service_type, **kwargs):
return string


def validate_solution_output(config, output):
def validate_solution_output(config: Dict[str, Any], output: str) -> bool:
"""validates a flag outputted by a solver by stripping the whitespace and validating the flag.

Args:
Expand All @@ -381,7 +383,7 @@ def validate_solution_output(config, output):
return validate_flag(config, output.strip())


def validate_flag(config, submitted_flag: str) -> bool:
def validate_flag(config: Dict[str, Any], submitted_flag: str) -> bool:
"""validates a flag against the flags in the challenge config.

Args:
Expand Down Expand Up @@ -412,7 +414,7 @@ def validate_flag(config, submitted_flag: str) -> bool:
return False


def build_image(image, tag, client):
def build_image(image: str, tag: str, client: docker.api.client.ContainerApiMixin) -> None:
"""Build a docker image given the image (as a path to a folder, if archive it will load it), the tag and the docker client.

Args:
Expand Down Expand Up @@ -459,7 +461,7 @@ def build_image(image, tag, client):
)


def run_build_script(config):
def run_build_script(config: Dict[str, Any]) -> None:
if "build_script" not in config["custom"]:
raise CriticalException(f"Build script has not been defined!")

Expand All @@ -478,7 +480,7 @@ def run_build_script(config):
raise CriticalException(f"Build script exited with code {p.returncode}")


def build_docker_images(config, client):
def build_docker_images(config: Dict[str, Any], client: docker.api.client.ContainerApiMixin) -> bool:
if not config["deployment"]:
return False

Expand Down
9 changes: 5 additions & 4 deletions tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import pytest
import docker
import pytest

from challtools.utils import get_docker_client


def pytest_addoption(parser) -> None:
def pytest_addoption(parser: pytest.Parser) -> None:
parser.addoption("--docker-fails", action="store_true")
parser.addoption("--docker-strict", action="store_true")

Expand All @@ -17,12 +18,12 @@ def pytest_collection_modifyitems(session, config, items):


@pytest.fixture(scope="session")
def docker_client():
def docker_client() -> docker.api.client.ContainerApiMixin:
return get_docker_client()


@pytest.fixture()
def clean_container_state(docker_client):
def clean_container_state(docker_client: docker.api.client.ContainerApiMixin):
relevant_tags = [
"challtools_test",
"challtools_test_challenge_f9629917705648c9",
Expand Down
14 changes: 8 additions & 6 deletions tests/test_cli.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
import os
from pathlib import Path
import yaml

import docker
import pytest
import yaml

from challtools.utils import build_chall, get_valid_config
from utils import populate_dir, main_wrapper, inittemplatepath


class Test_allchalls:
def test_validate(self, tmp_path: Path, capsys) -> None:
populate_dir(tmp_path, "simple_ctf")
Expand Down Expand Up @@ -47,15 +49,15 @@ def test_no_service(self, tmp_path: Path, capsys) -> None:
assert "nothing to do" in capsys.readouterr().out.lower()

@pytest.mark.fails_without_docker
def test_single(self, tmp_path: Path, docker_client, clean_container_state) -> None:
def test_single(self, tmp_path: Path, docker_client: docker.api.client.ContainerApiMixin, clean_container_state) -> None:
populate_dir(tmp_path, "trivial_tcp")
assert main_wrapper(["build"]) == 0
assert "challtools_test_challenge_f9629917705648c9:latest" in [
tag for image in docker_client.images.list() for tag in image.tags
]

@pytest.mark.fails_without_docker
def test_subdir(self, tmp_path: Path, docker_client, clean_container_state) -> None:
def test_subdir(self, tmp_path: Path, docker_client: docker.api.client.ContainerApiMixin, clean_container_state) -> None:
populate_dir(tmp_path, "trivial_tcp")
os.chdir("container")
assert main_wrapper(["build"]) == 0
Expand All @@ -64,7 +66,7 @@ def test_subdir(self, tmp_path: Path, docker_client, clean_container_state) -> N
]

@pytest.mark.fails_without_docker
def test_solution(self, tmp_path: Path, docker_client, clean_container_state) -> None:
def test_solution(self, tmp_path: Path, docker_client: docker.api.client.ContainerApiMixin, clean_container_state) -> None:
populate_dir(tmp_path, "trivial_tcp_solution")
assert main_wrapper(["build"]) == 0
import time
Expand Down Expand Up @@ -147,7 +149,7 @@ def test_has_id(self, tmp_path: Path, capsys) -> None:


class Test_init:
def check_identical(self, tmp_path: Path, template) -> None:
def check_identical(self, tmp_path: Path, template) -> bool:
if not (
len(list(tmp_path.rglob("*")))
== len(list((inittemplatepath / template).rglob("*"))) - 1
Expand Down
33 changes: 17 additions & 16 deletions tests/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
import os
import re
from pathlib import Path
from typing import Union

import pytest
import yaml

from challtools.utils import (
CriticalException,
process_messages,
Expand All @@ -22,82 +25,80 @@
start_solution,
)
from utils import populate_dir
from typing import Union


# TODO
# class Test_process_messages:
# pass


class Test_get_ctf_config_path:
def test_root(self, tmp_path):
def test_root(self, tmp_path: Path) -> None:
populate_dir(tmp_path, "simple_ctf")
assert get_ctf_config_path() == tmp_path / "ctf.yml"

def test_subdir(self, tmp_path):
def test_subdir(self, tmp_path: Path) -> None:
populate_dir(tmp_path, "simple_ctf")
os.chdir("chall1")
assert get_ctf_config_path() == tmp_path / "ctf.yml"

def test_yaml(self, tmp_path):
def test_yaml(self, tmp_path: Path) -> None:
populate_dir(tmp_path, "simple_ctf")
Path("ctf.yml").rename("ctf.yaml")
assert get_ctf_config_path() == tmp_path / "ctf.yaml"

def test_missing(self, tmp_path):
def test_missing(self, tmp_path: Path) -> None:
populate_dir(tmp_path, "minimal_valid")
assert get_ctf_config_path() is None


class Test_load_ctf_config:
def test_empty(self, tmp_path):
def test_empty(self, tmp_path: Path) -> None:
populate_dir(tmp_path, "simple_ctf")
assert load_ctf_config() == {}

def test_populated(self, tmp_path):
def test_populated(self, tmp_path: Path) -> None:
populate_dir(tmp_path, "ctf_authors")
assert load_ctf_config() == yaml.safe_load((tmp_path / "ctf.yml").read_text())

def test_missing(self, tmp_path):
def test_missing(self, tmp_path: Path) -> None:
os.chdir(tmp_path)
assert load_ctf_config() == None


class Test_load_config:
def test_root(self, tmp_path):
def test_root(self, tmp_path: Path) -> None:
populate_dir(tmp_path, "minimal_valid")
assert load_config() == yaml.safe_load((tmp_path / "challenge.yml").read_text())

def test_subdir(self, tmp_path):
def test_subdir(self, tmp_path: Path) -> None:
populate_dir(tmp_path, "subdir")
os.chdir("subdir")
assert load_config() == yaml.safe_load((tmp_path / "challenge.yml").read_text())

def test_yaml(self, tmp_path):
def test_yaml(self, tmp_path: Path) -> None:
populate_dir(tmp_path, "minimal_valid")
Path("challenge.yml").rename("challenge.yaml")
assert load_config() == yaml.safe_load(
(tmp_path / "challenge.yaml").read_text()
)

def test_missing(self, tmp_path):
def test_missing(self, tmp_path: Path) -> None:
os.chdir(tmp_path)
with pytest.raises(CriticalException):
load_config()


class Test_get_valid_config:
def test_valid(self, tmp_path: Union[str, Path]) -> None:
def test_valid(self, tmp_path: Path) -> None:
populate_dir(tmp_path, "minimal_valid")
assert get_valid_config()

def test_invalid(self, tmp_path: Union[str, Path]) -> None:
def test_invalid(self, tmp_path: Path) -> None:
populate_dir(tmp_path, "schema_violation")
with pytest.raises(CriticalException):
get_valid_config()

def test_invalid_list(self, tmp_path: Union[str, Path]) -> None:
def test_invalid_list(self, tmp_path: Path) -> None:
populate_dir(tmp_path, "schema_violation_list")
with pytest.raises(CriticalException):
get_valid_config()
Expand Down
4 changes: 3 additions & 1 deletion tests/utils.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import os
from pathlib import Path
from typing import List, Union

from challtools.cli import main
from challtools.utils import _copytree

Expand All @@ -20,7 +22,7 @@ def populate_dir(path: Union[str, Path], template: str) -> None:
_copytree(templatepath / template, path)


def main_wrapper(args):
def main_wrapper(args: List[str]) -> int:
try:
exit_code = main(args)
except SystemExit as e:
Expand Down