diff --git a/doc/changelog.d/4600.added.md b/doc/changelog.d/4600.added.md new file mode 100644 index 000000000000..2ab20b97f75d --- /dev/null +++ b/doc/changelog.d/4600.added.md @@ -0,0 +1 @@ +Update client side 'enhanced' meshing workflow to use server side 'meshing_workflow' root. diff --git a/src/ansys/fluent/core/meshing/meshing_workflow_new.py b/src/ansys/fluent/core/meshing/meshing_workflow_new.py new file mode 100644 index 000000000000..d1c9143e0078 --- /dev/null +++ b/src/ansys/fluent/core/meshing/meshing_workflow_new.py @@ -0,0 +1,315 @@ +# Copyright (C) 2021 - 2025 ANSYS, Inc. and/or its affiliates. +# SPDX-License-Identifier: MIT +# +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +"""Meshing workflow specialization of the Workflow module that wraps and extends the +core functionality.""" + +from __future__ import annotations + +from enum import Enum +import os + +from ansys.fluent.core._types import PathType +from ansys.fluent.core.services.datamodel_se import PyMenuGeneric +from ansys.fluent.core.utils.fluent_version import FluentVersion +from ansys.fluent.core.workflow_new import Workflow + +name_to_identifier_map = { + "Watertight Geometry": "EnableCleanCAD", + "Fault-tolerant Meshing": "EnableComplexMeshing", + "2D Meshing": "EnablePrime2dMeshing", + "Topology Based Meshing": "EnablePrimeMeshing", +} + + +class MeshingWorkflow(Workflow): + """Provides meshing specialization of the workflow wrapper that extends the core + functionality in an object-oriented manner.""" + + def __init__( + self, + workflow: PyMenuGeneric, + meshing: PyMenuGeneric, + name: str, + identifier: str, + fluent_version: FluentVersion, + initialize: bool = True, + ) -> None: + """Initialize MeshingWorkflow. + + Parameters + ---------- + workflow : PyMenuGeneric + Underlying workflow object. + meshing : PyMenuGeneric + Meshing object. + name: str + Workflow name to initialize it. + identifier: str + Workflow name to identify it from global settings. + fluent_version: FluentVersion + Version of Fluent in this session. + initialize: bool + Flag to initialize the workflow, defaults to True. + """ + super().__init__( + workflow=workflow, command_source=meshing, fluent_version=fluent_version + ) + self._meshing = meshing + self._name = name + self._identifier = identifier + if initialize: + self._new_workflow(name=self._name) + self._initialized = True + + +class WatertightMeshingWorkflow(MeshingWorkflow): + """Provides watertight meshing specialization of the workflow wrapper.""" + + def __init__( + self, + workflow: PyMenuGeneric, + meshing: PyMenuGeneric, + fluent_version: FluentVersion, + initialize: bool = True, + ) -> None: + """Initialize WatertightMeshingWorkflow. + + Parameters + ---------- + workflow : PyMenuGeneric + Underlying workflow object. + meshing : PyMenuGeneric + Meshing object. + fluent_version: FluentVersion + Version of Fluent in this session. + initialize: bool + Flag to initialize the workflow, defaults to True. + """ + super().__init__( + workflow=workflow, + meshing=meshing, + name="Watertight Geometry", + identifier=name_to_identifier_map["Watertight Geometry"], + fluent_version=fluent_version, + initialize=initialize, + ) + + +class FaultTolerantMeshingWorkflow(MeshingWorkflow): + """Provides fault-tolerant meshing specialization of the workflow wrapper.""" + + def __init__( + self, + workflow: PyMenuGeneric, + meshing: PyMenuGeneric, + part_management: PyMenuGeneric, + pm_file_management: PyMenuGeneric, + fluent_version: FluentVersion, + initialize: bool = True, + ) -> None: + """Initialize FaultTolerantMeshingWorkflow. + + Parameters + ---------- + workflow : PyMenuGeneric + Underlying workflow object. + meshing : PyMenuGeneric + Meshing object. + part_management : PyMenuGeneric + Part management object. + pm_file_management : PyMenuGeneric + File management object in the part management object. + fluent_version: FluentVersion + Version of Fluent in this session. + initialize: bool + Flag to initialize the workflow, defaults to True. + """ + super().__init__( + workflow=workflow, + meshing=meshing, + name="Fault-tolerant Meshing", + identifier=name_to_identifier_map["Fault-tolerant Meshing"], + fluent_version=fluent_version, + initialize=initialize, + ) + self._part_management = part_management + self._pm_file_management = pm_file_management + + @property + def part_management(self) -> PyMenuGeneric | None: + """Access part-management in fault-tolerant mode. + + Returns + ------- + PyMenuGeneric | None + Part-management. + """ + return self._part_management + + @property + def pm_file_management(self): + """Access the part-management file-management object in fault-tolerant mode. + + Returns + ------- + PyMenuGeneric | None + File management object in the part management object. + """ + return self._pm_file_management + + +class TwoDimensionalMeshingWorkflow(MeshingWorkflow): + """Provides 2D meshing specialization of the workflow wrapper.""" + + def __init__( + self, + workflow: PyMenuGeneric, + meshing: PyMenuGeneric, + fluent_version: FluentVersion, + initialize: bool = True, + ) -> None: + """Initialize TwoDimensionalMeshingWorkflow. + + Parameters + ---------- + workflow : PyMenuGeneric + Underlying workflow object. + meshing : PyMenuGeneric + Meshing object. + fluent_version: FluentVersion + Version of Fluent in this session. + initialize: bool + Flag to initialize the workflow, defaults to True. + """ + super().__init__( + workflow=workflow, + meshing=meshing, + name="2D Meshing", + identifier=name_to_identifier_map["2D Meshing"], + fluent_version=fluent_version, + initialize=initialize, + ) + + +class TopologyBasedMeshingWorkflow(MeshingWorkflow): + """Provides topology-based meshing specialization of the workflow wrapper.""" + + def __init__( + self, + workflow: PyMenuGeneric, + meshing: PyMenuGeneric, + fluent_version: FluentVersion, + initialize: bool = True, + ) -> None: + """Initialize TopologyBasedMeshingWorkflow. + + Parameters + ---------- + workflow : PyMenuGeneric + Underlying workflow object. + meshing : PyMenuGeneric + Meshing object. + fluent_version: FluentVersion + Version of Fluent in this session. + initialize: bool + Flag to initialize the workflow, defaults to True. + """ + super().__init__( + workflow=workflow, + meshing=meshing, + name="Topology Based Meshing", + identifier=name_to_identifier_map["Topology Based Meshing"], + fluent_version=fluent_version, + initialize=initialize, + ) + + +class WorkflowMode(Enum): + """Provides an enum of supported Fluent meshing workflow modes.""" + + WATERTIGHT_MESHING_MODE = WatertightMeshingWorkflow + FAULT_TOLERANT_MESHING_MODE = FaultTolerantMeshingWorkflow + TWO_DIMENSIONAL_MESHING_MODE = TwoDimensionalMeshingWorkflow + TOPOLOGY_BASED_MESHING_MODE = TopologyBasedMeshingWorkflow + + +class LoadWorkflow(Workflow): + """Provides a specialization of the workflow wrapper for a loaded workflow.""" + + def __init__( + self, + workflow: PyMenuGeneric, + meshing: PyMenuGeneric, + file_path: PathType, + fluent_version: FluentVersion, + ) -> None: + """Initialize a ``LoadWorkflow`` instance. + + Parameters + ---------- + workflow : PyMenuGeneric + Underlying workflow object. + meshing : PyMenuGeneric + Meshing object. + file_path: os.PathLike[str | bytes] | str | bytes + Path to the saved workflow file. + fluent_version: FluentVersion + Version of Fluent in this session. + """ + super().__init__( + workflow=workflow, command_source=meshing, fluent_version=fluent_version + ) + self._meshing = meshing + self._load_workflow(file_path=os.fspath(file_path)) + + +class CreateWorkflow(Workflow): + """Provides a specialization of the workflow wrapper for a newly created + workflow.""" + + def __init__( + self, + workflow: PyMenuGeneric, + meshing: PyMenuGeneric, + fluent_version: FluentVersion, + initialize: bool = True, + ) -> None: + """Initialize a ``CreateWorkflow`` instance. + + Parameters + ---------- + workflow : PyMenuGeneric + Underlying workflow object. + meshing : PyMenuGeneric + Meshing object. + fluent_version: FluentVersion + Version of Fluent in this session. + initialize: bool + Flag to initialize the workflow, defaults to True. + """ + super().__init__( + workflow=workflow, command_source=meshing, fluent_version=fluent_version + ) + self._meshing = meshing + if initialize: + self._create_workflow() diff --git a/src/ansys/fluent/core/session_base_meshing.py b/src/ansys/fluent/core/session_base_meshing.py index f60295b840be..fbec759954cf 100644 --- a/src/ansys/fluent/core/session_base_meshing.py +++ b/src/ansys/fluent/core/session_base_meshing.py @@ -27,12 +27,7 @@ from ansys.fluent.core._types import PathType from ansys.fluent.core.fluent_connection import FluentConnection -from ansys.fluent.core.meshing.meshing_workflow import ( - CreateWorkflow, - LoadWorkflow, - WorkflowMode, - name_to_identifier_map, -) +from ansys.fluent.core.meshing.meshing_workflow_new import name_to_identifier_map from ansys.fluent.core.session_shared import ( _make_datamodel_module, _make_tui_module, @@ -138,8 +133,14 @@ def meshing_workflow(self): def watertight_workflow(self, initialize: bool = True): """Datamodel root of workflow.""" + if os.getenv("USE_SERVER_MW") == "1": + root_module = "meshing_workflow" + from ansys.fluent.core.meshing.meshing_workflow_new import WorkflowMode + else: + root_module = "workflow" + from ansys.fluent.core.meshing.meshing_workflow import WorkflowMode self._current_workflow = WorkflowMode.WATERTIGHT_MESHING_MODE.value( - _make_datamodel_module(self, "workflow"), + _make_datamodel_module(self, root_module), self.meshing, self.get_fluent_version(), initialize, @@ -148,8 +149,14 @@ def watertight_workflow(self, initialize: bool = True): def fault_tolerant_workflow(self, initialize: bool = True): """Datamodel root of workflow.""" + if os.getenv("USE_SERVER_MW") == "1": + root_module = "meshing_workflow" + from ansys.fluent.core.meshing.meshing_workflow_new import WorkflowMode + else: + root_module = "workflow" + from ansys.fluent.core.meshing.meshing_workflow import WorkflowMode self._current_workflow = WorkflowMode.FAULT_TOLERANT_MESHING_MODE.value( - _make_datamodel_module(self, "workflow"), + _make_datamodel_module(self, root_module), self.meshing, self.PartManagement, self.PMFileManagement, @@ -160,8 +167,14 @@ def fault_tolerant_workflow(self, initialize: bool = True): def two_dimensional_meshing_workflow(self, initialize: bool = True): """Data model root of the workflow.""" + if os.getenv("USE_SERVER_MW") == "1": + root_module = "meshing_workflow" + from ansys.fluent.core.meshing.meshing_workflow_new import WorkflowMode + else: + root_module = "workflow" + from ansys.fluent.core.meshing.meshing_workflow import WorkflowMode self._current_workflow = WorkflowMode.TWO_DIMENSIONAL_MESHING_MODE.value( - _make_datamodel_module(self, "workflow"), + _make_datamodel_module(self, root_module), self.meshing, self.get_fluent_version(), initialize, @@ -170,8 +183,14 @@ def two_dimensional_meshing_workflow(self, initialize: bool = True): def topology_based_meshing_workflow(self, initialize: bool = True): """Datamodel root of workflow.""" + if os.getenv("USE_SERVER_MW") == "1": + root_module = "meshing_workflow" + from ansys.fluent.core.meshing.meshing_workflow_new import WorkflowMode + else: + root_module = "workflow" + from ansys.fluent.core.meshing.meshing_workflow import WorkflowMode self._current_workflow = WorkflowMode.TOPOLOGY_BASED_MESHING_MODE.value( - _make_datamodel_module(self, "workflow"), + _make_datamodel_module(self, root_module), self.meshing, self.get_fluent_version(), initialize, @@ -180,8 +199,14 @@ def topology_based_meshing_workflow(self, initialize: bool = True): def load_workflow(self, file_path: PathType): """Datamodel root of workflow.""" + if os.getenv("USE_SERVER_MW") == "1": + root_module = "meshing_workflow" + from ansys.fluent.core.meshing.meshing_workflow_new import LoadWorkflow + else: + root_module = "workflow" + from ansys.fluent.core.meshing.meshing_workflow import LoadWorkflow self._current_workflow = LoadWorkflow( - _make_datamodel_module(self, "workflow"), + _make_datamodel_module(self, root_module), self.meshing, os.fspath(file_path), self.get_fluent_version(), @@ -190,8 +215,14 @@ def load_workflow(self, file_path: PathType): def create_workflow(self, initialize: bool = True): """Datamodel root of the workflow.""" + if os.getenv("USE_SERVER_MW") == "1": + root_module = "meshing_workflow" + from ansys.fluent.core.meshing.meshing_workflow_new import CreateWorkflow + else: + root_module = "workflow" + from ansys.fluent.core.meshing.meshing_workflow import CreateWorkflow self._current_workflow = CreateWorkflow( - _make_datamodel_module(self, "workflow"), + _make_datamodel_module(self, root_module), self.meshing, self.get_fluent_version(), initialize, diff --git a/src/ansys/fluent/core/workflow_new.py b/src/ansys/fluent/core/workflow_new.py new file mode 100644 index 000000000000..79dc2fb3a2ba --- /dev/null +++ b/src/ansys/fluent/core/workflow_new.py @@ -0,0 +1,1127 @@ +# Copyright (C) 2021 - 2025 ANSYS, Inc. and/or its affiliates. +# SPDX-License-Identifier: MIT +# +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. + +"""Workflow module that wraps and extends the core functionality. + +This module provides a high-level, Pythonic interface for working with Ansys Fluent +workflows. It wraps the underlying datamodel service layer to provide intuitive +navigation, task management, and workflow operations. + +The main classes are: + +- **Workflow**: Top-level workflow container that manages tasks and provides + navigation between them. +- **TaskObject**: Individual task wrapper that provides access to task properties, + arguments, execution, and navigation to sibling/child tasks. + + +Notes +----- +This module is designed for Fluent 26R1 and later versions. Some features may not +be available in earlier versions. + +The workflow system provides both imperative and declarative approaches to building +simulation workflows, with automatic dependency management and validation. +""" + +from __future__ import annotations + +from collections import OrderedDict +import re +from typing import ValuesView + +from ansys.fluent.core.services.datamodel_se import PyMenu +from ansys.fluent.core.utils.fluent_version import FluentVersion + + +def _get_task_type_name(task_obj: PyMenu) -> str: + """Extract the task type name from a task object's class name. + + The datamodel generates task classes with leading underscores (e.g., "_import_geometry"). + This function strips the leading underscore to get the clean task type name. + + Parameters + ---------- + task_obj : PyMenu + The task datamodel object. + + Returns + ------- + str + Clean task type name without leading underscore (e.g., "import_geometry"). + + Notes + ----- + This is needed because the datamodel service generates class names with a leading + underscore convention (e.g., `_import_geometry`), but we want clean names for + internal use and type creation. + """ + return task_obj.__class__.__name__.lstrip("_") + + +def is_compound_child(task_type: str) -> bool: + """Check if a task type represents a compound child task. This encapsulates + a string comparison to avoid repetition. + + Parameters + ---------- + task_type : str + The task type string to check. + + Returns + ------- + bool + True if the task type is "Compound Child", False otherwise. + """ + return task_type == "Compound Child" + + +def _convert_task_list_to_display_names( + workflow_root: PyMenu, task_list: list[str] +) -> list[str]: + """Convert a list of task IDs to their corresponding display names. + + Parameters + ---------- + workflow_root : PyMenu + The root workflow datamodel object that provides service access. + task_list : list[str] + List of internal task identifiers (e.g., ["TaskObject1", "TaskObject2"]). + + Returns + ------- + list[str] + List of display names corresponding to the task IDs + (e.g., ["Import Geometry", "Add Local Sizing"]). + """ + _display_names = [] + for _task_name in task_list: + name_obj = PyMenu( + service=workflow_root.service, + rules=workflow_root.rules, + path=[("task_object", _task_name), ("_name_", "")], + ) + _display_names.append(name_obj.get_remote_state()) + return _display_names + + +def _get_child_task_by_task_id(workflow_root, task_id): + """Get a child task's display name by its internal task ID. + + Parameters + ---------- + workflow_root : PyMenu + The root workflow datamodel object. + task_id : str + Internal identifier for the task (e.g., "TaskObject1"). + + Returns + ------- + str + The display name of the task (e.g., "Import Geometry"). + """ + return PyMenu( + service=workflow_root.service, + rules=workflow_root.rules, + path=[("task_object", task_id), ("_name_", "")], + ).get_remote_state() + + +def command_name_to_task_name(meshing_root, command_name: str) -> str: + """Convert a command name to its corresponding task display name. + + This function maps internal command names (used by the Fluent core) to + user-facing task names. + + Parameters + ---------- + meshing_root : PyMenu + The root meshing datamodel object. + command_name : str + Internal command name (e.g., "ImportGeometry"). + + Returns + ------- + str + User-facing task name (e.g., "import_geometry"). + + Notes + ----- + This is a workaround for Fluent 26R1. + """ + # TODO: This is a fix only for 26R1 as the server lacks the mechanism to return mapped values + # for '.get_next_possible_tasks()'. + command_instance = getattr(meshing_root, command_name).create_instance() + return command_instance.get_attr("APIName") or command_instance.get_attr( + "helpString" + ) + + +class Workflow: + """High-level workflow container that manages tasks and provides navigation. + + The Workflow class wraps the underlying datamodel workflow object and provides + a Pythonic interface for: + + - Discovering and accessing tasks + - Creating, loading, and saving workflows + - Navigating task hierarchies + - Managing task lifecycles (creation/deletion) + """ + + def __init__( + self, + workflow: PyMenu, + command_source: PyMenu, + fluent_version: FluentVersion, + ) -> None: + """Initialize Workflow.""" + self._workflow = workflow + self._command_source = command_source + self._fluent_version = fluent_version + self._task_dict = {} + self._compound_child_dict = {} + + def tasks(self) -> ValuesView[PyMenu]: + """Get the complete list of tasks in the workflow. + + This method builds and returns a comprehensive list of all task objects + currently present in the workflow, including: + + - Top-level tasks + - Compound child tasks (tasks with multiple instances) + - Dynamically created tasks + + The method rebuilds its internal task cache on each call to ensure + freshness, though this can be expensive for large workflows. + """ + self._task_dict = {} + _state = self._workflow.task_object() + for task in sorted(_state): + name, display_name = task.split(":") + task_obj = getattr(self._workflow.task_object, name)[display_name] + if is_compound_child(task_obj.task_type()): + if name not in self._compound_child_dict: + # CASE 1: First instance of this compound child type + # =================================================== + # This is the first time we've seen this task type (e.g., "add_boundary_layers") + # Create a new entry in the compound child dictionary with the first child + # + # Example: For "Boundary Layer 1" task with name="add_boundary_layers" + # Creates: {"add_boundary_layers": {"add_boundary_layers_child_1": task_obj}} + self._compound_child_dict[name] = { + name + "_child_1": task_obj, + } + else: + # CASE 2: Subsequent instance of this compound child type + # ======================================================== + # We've already seen this task type before. Now we need to determine if this + # specific task instance is new or if we've already processed it. + # + # Why check for duplicates? + # The workflow datamodel may return the same task multiple times during iteration, + # so we need to verify this is actually a NEW instance (e.g., "Boundary Layer 2") + # and not a duplicate reference to an existing one (e.g., "Boundary Layer 1" again). + + # Check if this specific task instance already exists in the compound child dict + # We compare by display name using task_obj._name_() which returns names like + # "Boundary Layer 1", "Boundary Layer 2", etc. + if task_obj._name_() not in ( + value._name_() + for value in self._compound_child_dict[name].values() + ): + # This is genuinely a NEW instance - add it with the next available number + # + # Calculate the next child number: + # 1. Sort existing keys: ["add_boundary_layers_child_1", "add_boundary_layers_child_2"] + # 2. Take the last key: "add_boundary_layers_child_2" + # 3. Extract the last character (the number): "2" + # 4. Convert to int and add 1: 3 + # 5. Result: "add_boundary_layers_child_3" + # + # Example progression: + # First: "add_boundary_layers_child_1" -> number is 1 + # Second: "add_boundary_layers_child_2" -> number is 2 + # Third: "add_boundary_layers_child_3" -> number is 3 + child_key = ( + int(sorted(self._compound_child_dict[name])[-1][-1]) + 1 + ) + self._compound_child_dict[name][ + name + f"_child_{child_key}" + ] = task_obj + else: + # Store regular (non-compound-child) tasks in the task dictionary + if name not in self._task_dict: + # CASE 1: First occurrence of this task type + # ============================================= + # Store using the base name (e.g., "import_geometry") + # This allows access via: workflow.import_geometry + self._task_dict[name] = task_obj + else: + # CASE 2: Duplicate task type (e.g., second "Import Geometry") + # ============================================================= + # Multiple tasks of the same type can exist in a workflow. + # Their display names have numeric suffixes: "Import Geometry 1", "Import Geometry 2" + # + # To create unique dictionary keys, we: + # 1. Extract the numeric suffix from the display name + # 2. Append it to the base name with an underscore + # + # Example transformation: + # Display name: "Import Geometry 2" + # Base name: "import_geometry" + # Suffix: "2" (last word from display name) + # Final key: "import_geometry_2" + # + # This allows access via: workflow.import_geometry_2 + self._task_dict[name + f"_{task_obj.name().split()[-1]}"] = task_obj + + # Merge all compound child tasks into main dictionary + for child_tasks in self._compound_child_dict.values(): + self._task_dict.update(child_tasks) + + return self._task_dict.values() + + def _workflow_state(self): + """Get the complete state dictionary of the workflow.""" + return self._workflow() + + def _new_workflow(self, name: str): + """Initialize a new workflow from a predefined template.""" + self._workflow.general.initialize_workflow(workflow_type=name) + + def _load_workflow(self, file_path: str): + """Load a workflow from a saved workflow file (.wft).""" + self._workflow.general.load_workflow(file_path=file_path) + + def _create_workflow(self): + """Create a new empty workflow.""" + self._workflow.general.create_new_workflow() + + def save_workflow(self, file_path: str): + """Save the current workflow to a file.""" + self._workflow.general.save_workflow(file_path=file_path) + + def load_state(self, list_of_roots: list): + """Load the state of the workflow.""" + self._workflow.general.load_state(list_of_roots=list_of_roots) + + def task_names(self): + """Get Python-friendly names for all available tasks. + + Returns the list of task names as they would be accessed via Python + attribute syntax (e.g., "import_geometry" for "Import Geometry"). + """ + return [name.split(":")[0] for name in self._workflow.task_object()] + + def children(self) -> list[TaskObject]: + """Get the top-level tasks in the workflow in display order. + + Returns an ordered list of the workflow's main tasks (those directly under + the workflow root, not nested child tasks). The order reflects the execution + sequence in the workflow. + + Returns + ------- + List[TaskObject] + Ordered list of top-level task wrappers. + """ + ordered_names = _convert_task_list_to_display_names( + self._workflow, + self._workflow.general.workflow.task_list(), + ) + + # Create lightweight lookup: task name -> task datamodel object + tasks_by_name = {task_obj.name(): task_obj for task_obj in self.tasks()} + + # Wrap only the top-level tasks in the correct order + wrapped_tasks = [] + for name in ordered_names: + if name in tasks_by_name: + task_obj = tasks_by_name[name] + wrapped = make_task_wrapper( + task_obj, + _get_task_type_name(task_obj), + self._workflow, + self, + self._command_source, + ) + wrapped_tasks.append(wrapped) + + return wrapped_tasks + + def first_child(self) -> TaskObject | None: + """Get the first top-level task in the workflow. + + Returns + ------- + TaskObject or None + The first task in the workflow, or None if the workflow is empty. + + Examples + -------- + >>> first = ''.first_child() + >>> if first: + ... print(f"Starting task: {first.name()}") + ... first() # Execute it + + >>> # Navigate from first to last + >>> current = ''.first_child() + >>> while current and current.has_next(): + ... print(current.name()) + ... current() # Execute it + ... current = current.next() + + Notes + ----- + Returns None for empty workflows. Always check before accessing properties. + """ + task_list = self._workflow.general.workflow.task_list() + if task_list: + first_name = _get_child_task_by_task_id(self._workflow, task_list[0]) + else: + return None + for task_obj in self.tasks(): + if task_obj.name() == first_name: + return make_task_wrapper( + task_obj, + _get_task_type_name(task_obj), + self._workflow, + self, + self._command_source, + ) + + def last_child(self) -> TaskObject | None: + """Get the last top-level task in the workflow. + + Returns + ------- + TaskObject or None + The last task in the workflow, or None if the workflow is empty. + + Examples + -------- + >>> last = ''.last_child() + >>> if last: + ... print(f"Final task: {last.name()}") + ... last() # Execute it + + >>> # Execute workflow in reverse + >>> current = ''.last_child() + >>> while current and current.has_previous(): + ... print(current.name()) + ... current() # Execute it + ... current = current.previous() + """ + task_list = self._workflow.general.workflow.task_list() + if task_list: + last_name = _get_child_task_by_task_id(self._workflow, task_list[-1]) + else: + return None + for task_obj in self.tasks(): + if task_obj.name() == last_name: + return make_task_wrapper( + task_obj, + _get_task_type_name(task_obj), + self._workflow, + self, + self._command_source, + ) + + def _task_names(self): + """Gets a list of display names of all tasks in the workflow.""" + return _convert_task_list_to_display_names( + self._workflow, self._workflow.general.workflow.task_list() + ) + + def _ordered_tasks(self): + """Get ordered dictionary mapping task names to task objects.""" + ordered_names = _convert_task_list_to_display_names( + self._workflow, + self._workflow.general.workflow.task_list(), + ) + + # Create lightweight lookup: display name -> task datamodel object + tasks_by_name = {task_obj.name(): task_obj for task_obj in self.tasks()} + + # Build ordered dict by wrapping only the tasks in ordered_names + sorted_dict = OrderedDict() + for name in ordered_names: + if name in tasks_by_name: + task_obj = tasks_by_name[name] + wrapped = make_task_wrapper( + task_obj, + _get_task_type_name(task_obj), + self._workflow, + self, + self._command_source, + ) + sorted_dict[name] = wrapped + + return sorted_dict + + def delete_tasks(self, list_of_tasks: list[TaskObject]): + """Delete multiple tasks from the workflow. + + Removes the specified tasks from the workflow. Tasks are identified by TaskObject instances. + + Parameters + ---------- + list_of_tasks: list[TaskObject] + List of task objects to delete. + + Raises + ------ + TypeError + If list contains items that are neither TaskObject nor str. + """ + items_to_be_deleted = [] + for item in list_of_tasks: + if not isinstance(item, TaskObject): + # This is done to support backwards compatibility. + if isinstance(item, str): + items_to_be_deleted.append(item) + else: + raise TypeError( + "'list_of_tasks' only takes list of 'TaskObject' types." + ) + else: + items_to_be_deleted.append(item.name()) + + self._workflow.general.delete_tasks(list_of_tasks=items_to_be_deleted) + + def __getattr__(self, item): + """Enable attribute-style access to tasks.""" + if item not in self._task_dict: + self.tasks() + if item in self._task_dict: + return make_task_wrapper( + self._task_dict[item], item, self._workflow, self, self._command_source + ) + return getattr(self._workflow, item) + + def __call__(self): + """Get workflow state when called as a function.""" + return self._workflow_state() + + def __delattr__(self, item): + """Delete a task using Python's del statement. + + Parameters + ---------- + item : str + Python attribute name of the task to delete. + + Examples + -------- + >>> del ''.import_geometry + + Raises + ------ + LookupError + If the task name is not valid. + """ + if item not in self._task_dict: + self.tasks() + if item in self._task_dict: + getattr(self, item).delete() + del self._task_dict[item] + else: + raise LookupError(f"'{item}' is not a valid task name.'") + + +class TaskObject: + """Wrapper for individual workflow task objects. + + TaskObject provides a high-level interface for interacting with individual + tasks in a workflow. It exposes task properties, arguments, execution methods, + and navigation capabilities. + + Key Features + ------------ + - Access task arguments and properties + - Execute tasks + - Navigate to parent, sibling, and child tasks + - Insert new tasks after the current task + - Access compound child tasks (for multi-instance tasks) + """ + + def __init__( + self, + task_object: PyMenu, + base_name: str, + workflow: PyMenu, + parent: Workflow | TaskObject, + meshing_root: PyMenu, + ): + """Initialize a TaskObject wrapper. + + Parameters + ---------- + task_object : PyMenu + The underlying datamodel task object. + base_name : str + Python-friendly base name for the task. + workflow : PyMenu + Reference to the parent workflow datamodel. + parent : Union[Workflow, TaskObject] + Parent container (Workflow or parent TaskObject). + + Notes + ----- + This constructor is called internally by `make_task_wrapper()`. + Users should not instantiate TaskObject directly. + """ + super().__setattr__("_task_object", task_object) + super().__setattr__("_name", base_name) + super().__setattr__("_workflow", workflow) + super().__setattr__("_parent", parent) + super().__setattr__("_meshing_root", meshing_root) + self._cache = {} + + def _get_next_possible_tasks(self): + """Get display names of tasks that can be inserted after this task.""" + task_obj = self._task_object + ret_list = [] + for item in task_obj.get_next_possible_tasks(): + snake_case_name = command_name_to_task_name(self._meshing_root, item) + if snake_case_name != item: + self._cache[snake_case_name] = item + ret_list.append(snake_case_name) + return ret_list + + def _insert_next_task(self, task_name): + """Insert a task after the current task. + + Notes + ----- + Internal method. Users should use `insertable_tasks..insert()` instead. + """ + self._get_next_possible_tasks() + command_name = self._cache.get(task_name) or task_name + self._task_object.insert_next_task(command_name=command_name) + + @property + def insertable_tasks(self): + """Get interface for inserting tasks after this one. + + Returns a dynamic object that exposes all valid task types that can be + inserted after the current task. Each insertable task is accessible as + an attribute with an `insert()` method. + + Returns + ------- + _NextTask + Object with attributes for each insertable task type. + + Examples + -------- + Basic usage:: + + >>> task = ''.import_geometry + >>> + >>> # See what's available + >>> available = task.insertable_tasks() + >>> for insertable in available: + ... print(insertable) + + + + + + Insert specific task:: + + >>> # Insert by accessing as attribute + >>> task.insertable_tasks.import_boi_geometry.insert() + + Access specific task after insertion:: + + >>> # Access task as attribute + >>> ''.import_boi_geometry + """ + return self._NextTask(self) + + class _NextTask: + """Container for insertable task operations. + + This internal class provides a dynamic interface for task insertion. + It creates attributes on-the-fly for each valid insertable task type. + + Attributes are created dynamically based on the result of + `_get_next_possible_tasks()`, with each attribute being an `_Insert` + instance that provides the `insert()` method. + """ + + def __init__(self, base_task): + """Initialize insertable tasks container. + + Parameters + ---------- + base_task : TaskObject + The task after which new tasks can be inserted. + """ + self._base_task = base_task + self._insertable_tasks = [] + for item in self._base_task._get_next_possible_tasks(): + insertable_task = type("Insert", (self._Insert,), {})( + self._base_task, item + ) + setattr(self, item, insertable_task) + self._insertable_tasks.append(insertable_task) + + def __call__(self) -> list[_Insert]: + """Get list of all insertable task objects. + + Returns + ------- + List[_Insert] + List of insertable task objects. + """ + return self._insertable_tasks + + class _Insert: + """Represents a single insertable task. + + Provides the `insert()` method to actually insert the task into + the workflow after the base task. + """ + + def __init__(self, base_task, name): + """Initialize an insertable task reference. + + Parameters + ---------- + base_task : TaskObject + The task after which this will be inserted. + name : str + Python friendly name of the insertable task. + """ + self._base_task = base_task + self._name = name + + def insert(self): + """Insert this task into the workflow. + + Creates a new instance of this task type and inserts it + immediately after the base task in the workflow sequence. + """ + return self._base_task._insert_next_task(task_name=self._name) + + def __repr__(self): + return f"" + + def __getattr__(self, item): + """Enable attribute access to task properties and arguments. + + Notes + ----- + Arguments take precedence over task object properties. + """ + task_obj = self._task_object + args = task_obj.arguments + if item in args(): + return getattr(args, item) + return getattr(task_obj, item) + + def __setattr__(self, key, value): + """Enable attribute assignment to task arguments.""" + args = self._task_object.arguments + if hasattr(args, key): + setattr(args, key, value) + else: + super().__setattr__(key, value) + + def __call__(self): + """Execute the task when called as a function.""" + return self._task_object.execute() + + def __getitem__(self, key): + task_obj = self._task_object + name = self._name + workflow = self._workflow + parent = self._parent + meshing_root = self._meshing_root + name_1 = name + name_2 = re.sub(r"\s+\d+$", "", task_obj.name().strip()) + f" {key}" + try: + task_obj = getattr(workflow.task_object, name_1)[name_2] + if is_compound_child(task_obj.task_type): + temp_parent = self + else: + temp_parent = parent + return make_task_wrapper( + task_obj, name_1, workflow, temp_parent, meshing_root + ) + except LookupError: + task_obj = getattr(workflow.task_object, name_1)[key] + if is_compound_child(task_obj.task_type): + temp_parent = self + else: + temp_parent = parent + try: + return make_task_wrapper( + getattr(workflow.task_object, name_1)[key], + name_1, + workflow, + temp_parent, + meshing_root, + ) + except LookupError as ex2: + raise LookupError( + f"Neither '{name_2}' nor '{key}' found in task object '{name_1}'." + ) from ex2 + + def __delitem__(self, key): + self[key].delete() + + def _task_names(self): + """Gets the display names of the child tasks of a task item.""" + task_list = self._task_object.task_list() + if task_list: + return _convert_task_list_to_display_names(self._workflow, task_list) + else: + return [] + + def children(self): + """Get ordered list of direct child tasks. + + Returns + ------- + List[TaskObject] + Ordered list of child task wrappers, or empty list if no children. + """ + child_names = self._task_names() + if not child_names: + return [] + + workflow = self._workflow + + # Create reverse lookup: display name -> task type + name_to_type = { + display_name: task_type + for task_type, display_name in ( + item.split(":") for item in workflow.task_object() + ) + } + + # Build list by wrapping only the child tasks in the correct order + wrapped_children = [] + for display_name in child_names: + if display_name in name_to_type: + task_type = name_to_type[display_name] + wrapped = make_task_wrapper( + getattr(workflow.task_object, task_type)[display_name], + task_type, + workflow, + self, + self._meshing_root, + ) + wrapped_children.append(wrapped) + + return wrapped_children + + def first_child(self): + """Get the first child task of this task. + + Returns + ------- + TaskObject or None + The first child task, or None if no children exist. + + Examples + -------- + >>> parent = ''.describe_geometry + >>> first = parent.first_child() + >>> if first: + ... print(f"First child: {first.name()}") + + Navigate through children:: + + >>> current = parent.first_child() + >>> while current: + ... print(current.name()) + ... if current.has_next(): + ... current = current.next() + ... else: + ... break + """ + task_list = self._task_names() + if task_list: + first_name = task_list[0] + else: + return None + workflow = self._workflow + + type_to_name = { + item.split(":")[0]: item.split(":")[-1] for item in workflow.task_object() + } + for key, val in type_to_name.items(): + if val == first_name: + return make_task_wrapper( + getattr(workflow.task_object, key)[val], + key, + workflow, + self, + self._meshing_root, + ) + + def last_child(self): + """Get the last child task of this task. + + Returns + ------- + TaskObject or None + The last child task, or None if no children exist. + + Examples + -------- + >>> parent = ''.describe_geometry + >>> last = parent.last_child() + >>> if last: + ... print(f"Last child: {last.name()}") + """ + task_list = self._task_names() + if task_list: + last_name = task_list[-1] + else: + return None + workflow = self._workflow + + type_to_name = { + item.split(":")[0]: item.split(":")[-1] for item in workflow.task_object() + } + for key, val in type_to_name.items(): + if val == last_name: + return make_task_wrapper( + getattr(workflow.task_object, key)[val], + key, + workflow, + self, + self._meshing_root, + ) + + @staticmethod + def _get_next_key(input_dict, current_key): + """Get the key that follows current_key in an ordered dictionary. + + Parameters + ---------- + input_dict : Dict + Ordered dictionary of tasks. + current_key : str + Current task name. + + Returns + ------- + str + Next task name. + + Raises + ------ + IndexError + If current_key is the last key in the dictionary. + """ + keys = list(input_dict) + idx = keys.index(current_key) + if idx == len(keys) - 1: + raise IndexError("Reached the end.") + return keys[idx + 1] + + @staticmethod + def _get_previous_key(input_dict, current_key): + """Get the key that precedes current_key in an ordered dictionary. + + Parameters + ---------- + input_dict : Dict + Ordered dictionary of tasks. + current_key : str + Current task name. + + Returns + ------- + str + Previous task name. + + Raises + ------ + IndexError + If current_key is the first key in the dictionary. + """ + keys = list(input_dict) + idx = keys.index(current_key) + if idx == 0: + raise IndexError("In the beginning.") + return keys[idx - 1] + + def has_parent(self): + """Check if this task has a parent container. + + Returns + ------- + bool + True if task has a parent (Workflow or TaskObject), False otherwise. + """ + try: + super().__getattribute__("_parent") + return True + except AttributeError: + return False + + def parent(self): + """Get the parent container of this task. + + Returns + ------- + Union[Workflow, TaskObject] + The parent container. Can be: + - Workflow instance for top-level tasks + - TaskObject instance for nested child tasks + """ + return self._parent + + def has_next(self) -> bool: + """Check if there is a next sibling task. + + Determines whether this task has a sibling task that follows it in the + workflow sequence at the same level. + + Returns + ------- + bool + True if a next sibling exists, False if this is the last task. + """ + task_dict = self._parent._ordered_tasks() + try: + self._get_next_key(task_dict, self.name()) + return True + except IndexError: + return False + + def next(self): + """Returns the next sibling task item.""" + task_dict = self._parent._ordered_tasks() + next_key = self._get_next_key(task_dict, self.name()) + return task_dict[next_key] + + def has_previous(self) -> bool: + """Check if there is a previous sibling task. + + Determines whether this task has a sibling task that precedes it in the + workflow sequence at the same level. + + Returns + ------- + bool + True if a previous sibling exists, False if this is the first task. + """ + task_dict = self._parent._ordered_tasks() + try: + self._get_previous_key(task_dict, self.name()) + return True + except IndexError: + return False + + def previous(self): + """Returns the previous sibling task item.""" + task_dict = self._parent._ordered_tasks() + previous_key = self._get_previous_key(task_dict, self.name()) + return task_dict[previous_key] + + def _ordered_tasks(self): + if not self._task_names(): + return OrderedDict() + + workflow = self._workflow + + # Create lightweight lookup: task type -> display name + type_to_name = dict(item.split(":") for item in workflow.task_object()) + + # Get ordered list of display names for this level + ordered_names = self._task_names() + + # Build ordered dict by wrapping only the tasks that are in ordered_names + sorted_dict = OrderedDict() + for display_name in ordered_names: + # Find the matching task type for this display name + for task_type, name in type_to_name.items(): + if name == display_name: + wrapped = make_task_wrapper( + getattr(workflow.task_object, task_type)[display_name], + task_type, + workflow, + self, + self._meshing_root, + ) + sorted_dict[display_name] = wrapped + break + + return sorted_dict + + def delete(self): + """Deletes the task item on which it is called.""" + self._workflow.general.delete_tasks(list_of_tasks=[self.name()]) + + def __repr__(self): + try: + suffix = int(self.name().split()[-1]) + except (TypeError, ValueError): + suffix = 0 + return f"task < {self._name}: {suffix} >" + + +def build_specific_interface(task_object): + """ + Build a dynamic interface type that exposes task-specific + commands/properties while delegating back to the task_object. + """ + + def make_delegate(attr): + def delegate(self, *args, **kwargs): + return getattr(self._task_object, attr)(*args, **kwargs) + + return delegate + + # Determine the API surface of the underlying task: + public_members = { + name + for name in dir(task_object) + if not name.startswith("_") and callable(getattr(task_object, name)) + } + + namespace = {name: make_delegate(name) for name in public_members} + + iface_name = f"{task_object.task_type}SpecificInterface" + + return type(iface_name, (), namespace) + + +def make_task_wrapper(task_obj, name, workflow, parent, meshing_root): + """Wraps TaskObjects.""" + + specific_interface = build_specific_interface(task_obj) + + combined_type = type( + f"{task_obj.task_type}Task", (specific_interface, TaskObject), {} + ) + + return combined_type(task_obj, name, workflow, parent, meshing_root) diff --git a/tests/conftest.py b/tests/conftest.py index b91219f551a2..640cc63899ed 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -491,3 +491,8 @@ def datamodel_api_version_all(request, monkeypatch: pytest.MonkeyPatch) -> None: @pytest.fixture def datamodel_api_version_new(monkeypatch: pytest.MonkeyPatch) -> None: monkeypatch.setenv("REMOTING_NEW_DM_API", "1") + + +@pytest.fixture +def use_server_meshing_workflow(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setenv("USE_SERVER_MW", "1") diff --git a/tests/test_server_meshing_workflow.py b/tests/test_server_meshing_workflow.py index cf1e537de2a2..23cde4a47541 100644 --- a/tests/test_server_meshing_workflow.py +++ b/tests/test_server_meshing_workflow.py @@ -23,8 +23,11 @@ import pytest from ansys.fluent.core import examples +from ansys.fluent.core.services.datamodel_se import PyMenu +@pytest.mark.nightly +@pytest.mark.codegen_required @pytest.mark.fluent_version(">=26.1") def test_new_watertight_workflow(new_meshing_session_wo_exit): # Import geometry @@ -132,6 +135,8 @@ def test_new_watertight_workflow(new_meshing_session_wo_exit): assert solver.is_active() is False +@pytest.mark.nightly +@pytest.mark.codegen_required @pytest.mark.fluent_version(">=26.1") def test_new_fault_tolerant_workflow(new_meshing_session_wo_exit): meshing = new_meshing_session_wo_exit @@ -144,12 +149,12 @@ def test_new_fault_tolerant_workflow(new_meshing_session_wo_exit): WorkflowType="Fault-tolerant Meshing" ) fault_tolerant = meshing.meshing_workflow - meshing.PartManagement.InputFileChanged( - FilePath=import_file_name, IgnoreSolidNames=False, PartPerBody=False + fault_tolerant.parts.input_file_changed( + file_path=import_file_name, ignore_solid_names=False, part_per_body=False ) - meshing.PMFileManagement.FileManager.LoadFiles() - meshing.PartManagement.Node["Meshing Model"].Copy( - Paths=[ + fault_tolerant.parts_files.file_manager.load_files() + fault_tolerant.parts.node["Meshing Model"].copy( + paths=[ "/dirty_manifold-for-wrapper," + "1/dirty_manifold-for-wrapper,1/main,1", "/dirty_manifold-for-wrapper," + "1/dirty_manifold-for-wrapper,1/flow-pipe,1", @@ -159,7 +164,7 @@ def test_new_fault_tolerant_workflow(new_meshing_session_wo_exit): "/dirty_manifold-for-wrapper," + "1/dirty_manifold-for-wrapper,1/object1,1", ] ) - meshing.PartManagement.ObjectSetting["DefaultObjectSetting"].OneZonePer.set_state( + fault_tolerant.parts.object_setting["DefaultObjectSetting"].one_zone_per.set_state( "part" ) fault_tolerant.task_object.import_cad_and_part_management[ @@ -491,6 +496,8 @@ def test_new_fault_tolerant_workflow(new_meshing_session_wo_exit): assert solver.is_active() is False +@pytest.mark.nightly +@pytest.mark.codegen_required @pytest.mark.fluent_version(">=26.1") def test_new_2d_meshing_workflow(new_meshing_session_wo_exit): # Import geometry @@ -689,6 +696,7 @@ def test_new_2d_meshing_workflow(new_meshing_session_wo_exit): assert solver.is_active() is False +@pytest.mark.codegen_required @pytest.mark.fluent_version(">=26.1") def test_arguments_and_parameters_in_new_meshing_workflow(new_meshing_session): new_meshing_session.workflow.InitializeWorkflow(WorkflowType="Watertight Geometry") @@ -786,3 +794,869 @@ def test_arguments_and_parameters_in_new_meshing_workflow(new_meshing_session): watertight.task_object.import_geometry["Import Geometry"].state() == "Forced-up-to-date" ) + + +@pytest.mark.codegen_required +@pytest.mark.fluent_version(">=26.1") +def test_get_task_by_id(new_meshing_session): + # This test is only intended for developer level testing + meshing_session = new_meshing_session + meshing_session.meshing_workflow.general.initialize_workflow( + workflow_type="Watertight Geometry" + ) + service = meshing_session.meshing_workflow.service + rules = meshing_session.meshing_workflow.rules + + path = [("task_object", "TaskObject1"), ("_name_", "")] + assert ( + PyMenu(service=service, rules=rules, path=path).get_remote_state() + == "Import Geometry" + ) + + path = [("task_object", "TaskObject1"), ("CommandName", "")] + assert ( + PyMenu(service=service, rules=rules, path=path).get_remote_state() + == "ImportGeometry" + ) + + path = [("task_object", "TaskObject5"), ("_name_", "")] + assert ( + PyMenu(service=service, rules=rules, path=path).get_remote_state() + == "Apply Share Topology" + ) + + path = [("task_object", "TaskObject1")] + assert PyMenu(service=service, rules=rules, path=path).get_remote_state() == { + "_name_": "Import Geometry", + "arguments": {}, + "warnings": None, + "command_name": "ImportGeometry", + "errors": None, + "task_type": "Simple", + "object_path": "", + "state": "Out-of-date", + "check_point": "default-off", + } + + +@pytest.mark.codegen_required +@pytest.mark.fluent_version(">=26.1") +def test_insert_delete_and_rename_task(new_meshing_session): + meshing_session = new_meshing_session + meshing_session.meshing_workflow.general.initialize_workflow( + workflow_type="Watertight Geometry" + ) + + # Insert new task + assert len(meshing_session.meshing_workflow.task_object()) == 11 + meshing_session.meshing_workflow.task_object.import_geometry[ + "Import Geometry" + ].insert_next_task(command_name="ImportBodyOfInfluenceGeometry") + assert len(meshing_session.meshing_workflow.task_object()) == 12 + assert meshing_session.meshing_workflow.task_object.import_boi_geometry[ + "Import Body of Influence Geometry" + ].arguments() == { + "type": "CAD", + "geometry_file_name": None, + "cad_import_options": {}, + } + + # Delete + assert len(meshing_session.meshing_workflow.task_object()) == 12 + assert ( + "create_volume_mesh_wtm:Generate the Volume Mesh" + in meshing_session.meshing_workflow.task_object() + ) + meshing_session.meshing_workflow.general.delete_tasks( + list_of_tasks=["Generate the Volume Mesh"] + ) + assert len(meshing_session.meshing_workflow.task_object()) == 11 + assert ( + "create_volume_mesh_wtm:Generate the Volume Mesh" + not in meshing_session.meshing_workflow.task_object() + ) + + # Rename + assert ( + "add_boundary_layers:Add Boundary Layers" + in meshing_session.meshing_workflow.task_object() + ) + meshing_session.meshing_workflow.task_object.add_boundary_layers[ + "Add Boundary Layers" + ].rename(new_name="Add BL") + assert ( + "add_boundary_layers:Add Boundary Layers" + not in meshing_session.meshing_workflow.task_object() + ) + assert ( + "add_boundary_layers:Add BL" in meshing_session.meshing_workflow.task_object() + ) + + +############################################################################################ +# Test the enhanced meshing workflow +############################################################################################ + + +@pytest.mark.nightly +@pytest.mark.codegen_required +@pytest.mark.fluent_version(">=26.1") +def test_new_watertight_workflow_enhanced_meshing( + new_meshing_session_wo_exit, use_server_meshing_workflow +): + # Import geometry + import_file_name = examples.download_file( + "mixing_elbow.pmdb", "pyfluent/mixing_elbow" + ) + watertight = new_meshing_session_wo_exit.watertight() + watertight.import_geometry.file_name.set_state(import_file_name) + assert watertight.import_geometry.length_unit() == "mm" + watertight.import_geometry.length_unit.set_state("in") + assert watertight.import_geometry.length_unit.get_state() == "in" + watertight.import_geometry() + + # Add local sizing + watertight.add_local_sizing_wtm.add_child_to_task() + watertight.add_local_sizing_wtm() + + # Generate surface mesh + watertight.create_surface_mesh.cfd_surface_mesh_controls.max_size.set_state(0.3) + assert watertight.create_surface_mesh.cfd_surface_mesh_controls.max_size() == 0.3 + watertight.create_surface_mesh() + + # Describe geometry + watertight.describe_geometry.update_child_tasks(setup_type_changed=False) + watertight.describe_geometry.setup_type.set_state( + "The geometry consists of only fluid regions with no voids" + ) + watertight.describe_geometry.update_child_tasks(setup_type_changed=True) + watertight.describe_geometry() + + # Update boundaries + watertight.update_boundaries.boundary_zone_list.set_state(["wall-inlet"]) + watertight.update_boundaries.boundary_label_list.set_state(["wall-inlet"]) + watertight.update_boundaries.boundary_label_type_list.set_state(["wall"]) + watertight.update_boundaries.old_boundary_label_list.set_state(["wall-inlet"]) + watertight.update_boundaries.old_boundary_label_type_list.set_state( + ["velocity-inlet"] + ) + watertight.update_boundaries() + + # Update regions + watertight.update_regions() + + # Add boundary layers + watertight.add_boundary_layers.add_child_to_task() + watertight.add_boundary_layers.control_name.set_state("smooth-transition_1") + watertight.add_boundary_layers.insert_compound_child_task() + watertight.add_boundary_layers_child_1() + + # Generate volume mesh + watertight.create_volume_mesh_wtm.volume_fill.set_state("poly-hexcore") + watertight.create_volume_mesh_wtm.volume_fill_controls.hex_max_cell_length.set_state( + 0.3 + ) + watertight.create_volume_mesh_wtm() + + # Switch to solution mode + solver = new_meshing_session_wo_exit.switch_to_solver() + assert solver.is_active() is True + assert new_meshing_session_wo_exit.is_active() is False + solver.exit() + assert solver.is_active() is False + + +@pytest.mark.nightly +@pytest.mark.codegen_required +@pytest.mark.fluent_version(">=26.1") +def test_new_fault_tolerant_workflow_enhanced_meshing( + new_meshing_session_wo_exit, use_server_meshing_workflow +): + meshing = new_meshing_session_wo_exit + + # Import CAD and part management + import_file_name = examples.download_file( + "exhaust_system.fmd", "pyfluent/exhaust_system" + ) + fault_tolerant = meshing.fault_tolerant() + fault_tolerant.parts.input_file_changed( + file_path=import_file_name, ignore_solid_names=False, part_per_body=False + ) + fault_tolerant.parts_files.file_manager.load_files() + fault_tolerant.parts.node["Meshing Model"].copy( + paths=[ + "/dirty_manifold-for-wrapper," + "1/dirty_manifold-for-wrapper,1/main,1", + "/dirty_manifold-for-wrapper," + + "1/dirty_manifold-for-wrapper,1/flow-pipe,1", + "/dirty_manifold-for-wrapper," + + "1/dirty_manifold-for-wrapper,1/outpipe3,1", + "/dirty_manifold-for-wrapper," + "1/dirty_manifold-for-wrapper,1/object2,1", + "/dirty_manifold-for-wrapper," + "1/dirty_manifold-for-wrapper,1/object1,1", + ] + ) + fault_tolerant.parts.object_setting["DefaultObjectSetting"].one_zone_per.set_state( + "part" + ) + fault_tolerant.import_cad_and_part_management.context.set_state(0) + fault_tolerant.import_cad_and_part_management.create_object_per.set_state("Custom") + fault_tolerant.import_cad_and_part_management.fmd_file_name.set_state( + import_file_name + ) + fault_tolerant.import_cad_and_part_management.file_loaded.set_state("yes") + fault_tolerant.import_cad_and_part_management.object_setting.set_state( + "DefaultObjectSetting" + ) + fault_tolerant.import_cad_and_part_management() + + # Describe geometry and flow + fault_tolerant.describe_geometry_and_flow.add_enclosure.set_state("No") + fault_tolerant.describe_geometry_and_flow.close_caps.set_state("Yes") + fault_tolerant.describe_geometry_and_flow.describe_geometry_and_flow_options.advanced_options.set_state( + True + ) + fault_tolerant.describe_geometry_and_flow.describe_geometry_and_flow_options.extract_edge_features.set_state( + "Yes" + ) + fault_tolerant.describe_geometry_and_flow.flow_type.set_state( + "Internal flow through the object" + ) + fault_tolerant.describe_geometry_and_flow.update_child_tasks( + setup_type_changed=False + ) + fault_tolerant.describe_geometry_and_flow() + + # Enclose fluid regions (capping) + fault_tolerant.capping.create_patch_preferences.show_in_gui.set_state(False) + + fault_tolerant.capping.patch_name.set_state("inlet-1") + fault_tolerant.capping.selection_type.set_state("zone") + fault_tolerant.capping.zone_selection_list.set_state(["inlet.1"]) + fault_tolerant.capping.insert_compound_child_task() + fault_tolerant.capping_child_1() + + fault_tolerant.capping.patch_name.set_state("inlet-2") + fault_tolerant.capping.selection_type.set_state("zone") + fault_tolerant.capping.zone_selection_list.set_state(["inlet.2"]) + fault_tolerant.capping.insert_compound_child_task() + fault_tolerant.capping_child_2() + + fault_tolerant.capping.patch_name.set_state("inlet-3") + fault_tolerant.capping.selection_type.set_state("zone") + fault_tolerant.capping.zone_selection_list.set_state(["inlet"]) + fault_tolerant.capping.insert_compound_child_task() + fault_tolerant.capping_child_3() + + fault_tolerant.capping.patch_name.set_state("outlet-1") + fault_tolerant.capping.selection_type.set_state("zone") + fault_tolerant.capping.zone_selection_list.set_state(["outlet"]) + fault_tolerant.capping.zone_type.set_state("pressure-outlet") + fault_tolerant.capping.insert_compound_child_task() + fault_tolerant.capping_child_4() + + # Extract edge features + fault_tolerant.extract_edge_features.extract_edges_name.set_state("edge-group-1") + fault_tolerant.extract_edge_features.extract_method_type.set_state( + "Intersection Loops" + ) + fault_tolerant.extract_edge_features.object_selection_list.set_state( + ["flow_pipe", "main"] + ) + fault_tolerant.extract_edge_features.insert_compound_child_task() + fault_tolerant.extract_edge_features_child_1() + + # Identify regions + fault_tolerant.identify_regions.show_coordinates = True + fault_tolerant.identify_regions.material_points_name.set_state("fluid-region-1") + fault_tolerant.identify_regions.selection_type.set_state("zone") + fault_tolerant.identify_regions.x.set_state(377.322045740589) + fault_tolerant.identify_regions.y.set_state(-176.800676988458) + fault_tolerant.identify_regions.z.set_state(-37.0764628583475) + fault_tolerant.identify_regions.zone_selection_list.set_state(["main.1"]) + fault_tolerant.identify_regions.insert_compound_child_task() + fault_tolerant.identify_regions_child_1() + + fault_tolerant.identify_regions.show_coordinates = True + fault_tolerant.identify_regions.material_points_name.set_state("void-region-1") + fault_tolerant.identify_regions.new_region_type.set_state("void") + fault_tolerant.identify_regions.selection_type = "object" + fault_tolerant.identify_regions.object_selection_list.set_state( + ["inlet-1", "inlet-2", "inlet-3", "main"] + ) + fault_tolerant.identify_regions.x.set_state(374.722045740589) + fault_tolerant.identify_regions.y.set_state(-278.9775145640143) + fault_tolerant.identify_regions.z.set_state(-161.1700719416913) + fault_tolerant.identify_regions.insert_compound_child_task() + fault_tolerant.identify_regions_child_2() + + # Define leakage threshold + fault_tolerant.define_leakage_threshold.add_child.set_state("yes") + fault_tolerant.define_leakage_threshold.flip_direction.set_state(True) + fault_tolerant.define_leakage_threshold.plane_direction.set_state("X") + fault_tolerant.define_leakage_threshold.region_selection_single.set_state( + "void-region-1" + ) + + fault_tolerant.define_leakage_threshold.add_child = "yes" + fault_tolerant.define_leakage_threshold.flip_direction = True + fault_tolerant.define_leakage_threshold.leakage_name = "leakage-1" + fault_tolerant.define_leakage_threshold.plane_direction = "X" + fault_tolerant.define_leakage_threshold.region_selection_single = "void-region-1" + fault_tolerant.define_leakage_threshold.insert_compound_child_task() + fault_tolerant.define_leakage_threshold_child_1() + + # Update regions settings + fault_tolerant.update_region_settings.all_region_filter_categories.set_state( + ["2"] * 5 + ["1"] * 2 + ) + fault_tolerant.update_region_settings.all_region_leakage_size_list.set_state( + ["none"] * 6 + ["6.4"] + ) + fault_tolerant.update_region_settings.all_region_linked_construction_surface_list.set_state( + ["n/a"] * 6 + ["no"] + ) + fault_tolerant.update_region_settings.all_region_mesh_method_list.set_state( + ["none"] * 6 + ["wrap"] + ) + fault_tolerant.update_region_settings.all_region_name_list.set_state( + [ + "main", + "flow_pipe", + "outpipe3", + "object2", + "object1", + "void-region-1", + "fluid-region-1", + ] + ) + fault_tolerant.update_region_settings.all_region_overset_componen_list.set_state( + ["no"] * 7 + ) + fault_tolerant.update_region_settings.all_region_source_list.set_state( + ["object"] * 5 + ["mpt"] * 2 + ) + fault_tolerant.update_region_settings.all_region_type_list.set_state( + ["void"] * 6 + ["fluid"] + ) + fault_tolerant.update_region_settings.all_region_volume_fill_list.set_state( + ["none"] * 6 + ["tet"] + ) + fault_tolerant.update_region_settings.filter_category.set_state( + "Identified Regions" + ) + fault_tolerant.update_region_settings.all_region_leakage_size_list.set_state([""]) + fault_tolerant.update_region_settings.all_region_mesh_method_list.set_state( + ["wrap"] + ) + fault_tolerant.update_region_settings.all_region_name_list.set_state( + ["fluid-region-1"] + ) + fault_tolerant.update_region_settings.all_region_overset_componen_list.set_state( + ["no"] + ) + fault_tolerant.update_region_settings.all_region_type_list.set_state(["fluid"]) + fault_tolerant.update_region_settings.all_region_volume_fill_list.set_state( + ["hexcore"] + ) + fault_tolerant.update_region_settings.all_region_leakage_size_list.set_state([""]) + fault_tolerant.update_region_settings.all_region_mesh_method_list.set_state( + ["wrap"] + ) + fault_tolerant.update_region_settings.all_region_name_list.set_state( + ["fluid-region-1"] + ) + fault_tolerant.update_region_settings.all_region_overset_componen_list.set_state( + ["no"] + ) + fault_tolerant.update_region_settings.all_region_type_list.set_state(["fluid"]) + fault_tolerant.update_region_settings.all_region_volume_fill_list.set_state(["tet"]) + fault_tolerant.update_region_settings() + + # Setup size controls + fault_tolerant.setup_size_controls.local_settings_name = "default-curvature" + fault_tolerant.setup_size_controls.local_size_control_parameters.sizing_type = ( + "curvature" + ) + fault_tolerant.setup_size_controls.object_selection_list = [ + "inlet-1", + "inlet-2", + "inlet-3", + ] + fault_tolerant.setup_size_controls.add_child_and_update(defer_update=False) + fault_tolerant.setup_size_controls.local_settings_name = "default-proximity" + fault_tolerant.setup_size_controls.local_size_control_parameters.sizing_type = ( + "proximity" + ) + fault_tolerant.setup_size_controls.object_selection_list = [ + "inlet-1", + "inlet-2", + "inlet-3", + ] + fault_tolerant.setup_size_controls.add_child_and_update(defer_update=False) + + # Choose mesh control options + fault_tolerant.choose_mesh_control_options() + + # Generate surface mesh + fault_tolerant.generate_surface_mesh() + + # Update boundaries + fault_tolerant.update_boundaries() + + # Add boundary layers + fault_tolerant.add_boundary_layers.control_name.set_state("aspect-ratio_1") + fault_tolerant.add_boundary_layers.insert_compound_child_task() + fault_tolerant.add_boundary_layers_child_1() + + # Generate volume mesh + generate_volume_mesh = fault_tolerant.create_volume_mesh_ftm + generate_volume_mesh.all_region_name_list.set_state( + [ + "main", + "flow_pipe", + "outpipe3", + "object2", + "object1", + "void-region-1", + "fluid-region-1", + ] + ) + generate_volume_mesh.all_region_size_list.set_state(["11.33375"] * 7) + generate_volume_mesh.all_region_volume_fill_list.set_state(["none"] * 6 + ["tet"]) + generate_volume_mesh() + + solver = meshing.switch_to_solver() + assert solver.is_active() is True + assert meshing.is_active() is False + solver.exit() + assert solver.is_active() is False + + +@pytest.mark.nightly +@pytest.mark.codegen_required +@pytest.mark.fluent_version(">=26.1") +def test_new_2d_meshing_workflow_enhanced_meshing( + new_meshing_session_wo_exit, use_server_meshing_workflow +): + # Import geometry + import_file_name = examples.download_file("NACA0012.fmd", "pyfluent/airfoils") + two_dim_mesh = new_meshing_session_wo_exit.two_dimensional_meshing() + + two_dim_mesh.load_cad_geometry.file_name = import_file_name + two_dim_mesh.load_cad_geometry.length_unit = "mm" + two_dim_mesh.load_cad_geometry.refaceting.refacet = False + two_dim_mesh.load_cad_geometry() + + # Set regions and boundaries + two_dim_mesh.update_boundaries.selection_type = "zone" + two_dim_mesh.update_boundaries() + + # Define global sizing + two_dim_mesh.define_global_sizing.curvature_normal_angle = 20 + two_dim_mesh.define_global_sizing.max_size = 2000.0 + two_dim_mesh.define_global_sizing.min_size = 5.0 + two_dim_mesh.define_global_sizing.size_functions = "Curvature" + two_dim_mesh.define_global_sizing() + + # Add local sizing + two_dim_mesh.add_local_sizing_wtm.add_child = "yes" + two_dim_mesh.add_local_sizing_wtm.boi_control_name = "boi_1" + two_dim_mesh.add_local_sizing_wtm.boi_execution = "Body Of Influence" + two_dim_mesh.add_local_sizing_wtm.boi_face_label_list = ["boi"] + two_dim_mesh.add_local_sizing_wtm.boi_size = 50.0 + two_dim_mesh.add_local_sizing_wtm.boi_zoneor_label = "label" + two_dim_mesh.add_local_sizing_wtm.draw_size_control = True + two_dim_mesh.add_local_sizing_wtm.add_child_and_update(defer_update=False) + + two_dim_mesh.add_local_sizing_wtm.add_child = "yes" + two_dim_mesh.add_local_sizing_wtm.boi_control_name = "edgesize_1" + two_dim_mesh.add_local_sizing_wtm.boi_execution = "Edge Size" + two_dim_mesh.add_local_sizing_wtm.boi_size = 5.0 + two_dim_mesh.add_local_sizing_wtm.boi_zoneor_label = "label" + two_dim_mesh.add_local_sizing_wtm.draw_size_control = True + two_dim_mesh.add_local_sizing_wtm.edge_label_list = ["airfoil-te"] + two_dim_mesh.add_local_sizing_wtm.add_child_and_update(defer_update=False) + + two_dim_mesh.add_local_sizing_wtm.add_child = "yes" + two_dim_mesh.add_local_sizing_wtm.boi_control_name = "curvature_1" + two_dim_mesh.add_local_sizing_wtm.boi_curvature_normal_angle = 10 + two_dim_mesh.add_local_sizing_wtm.boi_execution = "Curvature" + two_dim_mesh.add_local_sizing_wtm.boi_max_size = 2 + two_dim_mesh.add_local_sizing_wtm.boi_min_size = 1.5 + two_dim_mesh.add_local_sizing_wtm.boi_scope_to = "edges" + two_dim_mesh.add_local_sizing_wtm.boi_zoneor_label = "label" + two_dim_mesh.add_local_sizing_wtm.draw_size_control = True + two_dim_mesh.add_local_sizing_wtm.edge_label_list = ["airfoil"] + two_dim_mesh.add_local_sizing_wtm.add_child_and_update(defer_update=False) + + # Add boundary layer + two_dim_mesh.add_2d_boundary_layers.add_child = "yes" + two_dim_mesh.add_2d_boundary_layers.bl_control_name = "aspect-ratio_1" + two_dim_mesh.add_2d_boundary_layers.number_of_layers = 4 + two_dim_mesh.add_2d_boundary_layers.offset_method_type = "aspect-ratio" + two_dim_mesh.add_2d_boundary_layers.add_child_and_update(defer_update=False) + + # NOTE: Setting `show_advanced_options = True` is required to configure advanced preferences. + # This dependency may be removed in a future release as the API evolves. + two_dim_mesh.generate_initial_surface_mesh.surface_2d_preferences.show_advanced_options = ( + True + ) + two_dim_mesh.generate_initial_surface_mesh.surface_2d_preferences.merge_edge_zones_based_on_labels = ( + "no" + ) + two_dim_mesh.generate_initial_surface_mesh.surface_2d_preferences.merge_face_zones_based_on_labels = ( + "no" + ) + two_dim_mesh.generate_initial_surface_mesh() + + two_dim_mesh.add_2d_boundary_layers_child_1.revert() + two_dim_mesh.add_2d_boundary_layers_child_1.add_child = "yes" + two_dim_mesh.add_2d_boundary_layers_child_1.bl_control_name = "uniform_1" + two_dim_mesh.add_2d_boundary_layers_child_1.first_layer_height = 2 + two_dim_mesh.add_2d_boundary_layers_child_1.number_of_layers = 4 + two_dim_mesh.add_2d_boundary_layers_child_1.offset_method_type = "uniform" + two_dim_mesh.add_2d_boundary_layers_child_1() + + # NOTE: Setting `show_advanced_options = True` is required to configure advanced preferences. + # This dependency may be removed in a future release as the API evolves. + two_dim_mesh.generate_initial_surface_mesh.surface_2d_preferences.show_advanced_options = ( + True + ) + two_dim_mesh.generate_initial_surface_mesh.surface_2d_preferences.merge_edge_zones_based_on_labels = ( + "no" + ) + two_dim_mesh.generate_initial_surface_mesh.surface_2d_preferences.merge_face_zones_based_on_labels = ( + "no" + ) + two_dim_mesh.generate_initial_surface_mesh() + + # Switch to solution mode + solver = new_meshing_session_wo_exit.switch_to_solver() + assert solver.is_active() is True + assert new_meshing_session_wo_exit.is_active() is False + solver.exit() + assert solver.is_active() is False + + +@pytest.mark.codegen_required +@pytest.mark.fluent_version(">=26.1") +def test_workflow_and_data_model_methods_new_meshing_workflow( + new_meshing_session, use_server_meshing_workflow +): + meshing = new_meshing_session + watertight = meshing.watertight() + _next_possible_tasks = [ + "", + "", + "", + "", + ] + assert sorted( + [repr(x) for x in watertight.import_geometry.insertable_tasks()] + ) == sorted(_next_possible_tasks) + watertight.import_geometry.insertable_tasks.import_boi_geometry.insert() + assert sorted( + [repr(x) for x in watertight.import_geometry.insertable_tasks()] + ) == sorted(_next_possible_tasks) + watertight.import_geometry.insertable_tasks.set_up_rotational_periodic_boundaries.insert() + assert len(watertight.tasks()) == 13 + + +@pytest.mark.codegen_required +@pytest.mark.fluent_version(">=26.1") +def test_duplicate_tasks(new_meshing_session, use_server_meshing_workflow): + meshing = new_meshing_session + watertight = meshing.watertight() + + _next_possible_tasks = [ + "", + "", + "", + "", + ] + assert sorted( + [repr(x) for x in watertight.import_geometry.insertable_tasks()] + ) == sorted(_next_possible_tasks) + watertight.import_geometry.insertable_tasks.import_boi_geometry.insert() + watertight.import_geometry.insertable_tasks.import_boi_geometry.insert() + watertight.import_geometry.insertable_tasks.import_boi_geometry.insert() + + assert watertight.import_boi_geometry.name() == "Import Body of Influence Geometry" + assert ( + watertight.import_boi_geometry[1].name() + == "Import Body of Influence Geometry 1" + ) + assert ( + watertight.import_boi_geometry[2].name() + == "Import Body of Influence Geometry 2" + ) + + watertight.import_boi_geometry[1].rename(new_name="Renamed BOI task") + + with pytest.raises(LookupError): + watertight.import_boi_geometry[1].name() + + assert ( + watertight.import_boi_geometry["Renamed BOI task"].name() == "Renamed BOI task" + ) + + +@pytest.mark.codegen_required +@pytest.mark.fluent_version(">=26.1") +def test_watertight_workflow( + mixing_elbow_geometry_filename, new_meshing_session, use_server_meshing_workflow +): + watertight = new_meshing_session.watertight() + watertight.import_geometry.file_name = mixing_elbow_geometry_filename + watertight.import_geometry() + add_local_sizing = watertight.add_local_sizing_wtm + assert not add_local_sizing.task_list() + add_local_sizing.add_child = True + add_local_sizing.boi_face_label_list = ["cold-inlet", "hot-inlet"] + add_local_sizing.add_child_and_update() + assert add_local_sizing._task_names() == ["facesize_1"] + assert watertight.add_local_sizing_wtm_child_1.name() == "facesize_1" + assert watertight.add_local_sizing_wtm["facesize_1"].name() == "facesize_1" + + +@pytest.mark.codegen_required +@pytest.mark.fluent_version(">=26.1") +def test_delete_interface(new_meshing_session, use_server_meshing_workflow): + watertight = new_meshing_session.watertight() + + watertight.import_geometry.insertable_tasks.import_boi_geometry.insert() + watertight.import_geometry.insertable_tasks.import_boi_geometry.insert() + + assert watertight.import_boi_geometry.name() == "Import Body of Influence Geometry" + assert ( + watertight.import_boi_geometry[1].name() + == "Import Body of Influence Geometry 1" + ) + + assert len(watertight.tasks()) == 13 + del watertight.import_boi_geometry[1] + watertight.import_boi_geometry.delete() + assert len(watertight.tasks()) == 11 + + assert "create_volume_mesh_wtm" in watertight.task_names() + assert "add_boundary_layers" in watertight.task_names() + watertight.delete_tasks( + list_of_tasks=[ + watertight.create_volume_mesh_wtm, + watertight.add_boundary_layers, + ] + ) + assert "create_volume_mesh_wtm" not in watertight.task_names() + assert "add_boundary_layers" not in watertight.task_names() + + assert "update_regions" in watertight.task_names() + watertight.update_regions.delete() + assert "update_regions" not in watertight.task_names() + + assert "create_regions" in watertight.task_names() + del watertight.create_regions + assert "create_regions" not in watertight.task_names() + + +@pytest.mark.codegen_required +@pytest.mark.fluent_version(">=26.1") +def test_ordering_of_tasks(new_meshing_session, use_server_meshing_workflow): + watertight = new_meshing_session.watertight() + assert len(watertight.children()) == 7 + _watertight_tasks = [ + "task < import_geometry: 0 >", + "task < add_local_sizing_wtm: 0 >", + "task < create_surface_mesh: 0 >", + "task < describe_geometry: 0 >", + "task < update_regions: 0 >", + "task < add_boundary_layers: 0 >", + "task < create_volume_mesh_wtm: 0 >", + ] + assert sorted([repr(x) for x in watertight.children()]) == sorted(_watertight_tasks) + + assert watertight.import_geometry.children() == [] + assert len(watertight.describe_geometry.children()) == 2 + + assert repr(watertight.describe_geometry.first_child()) == "task < capping: 0 >" + assert watertight.describe_geometry.first_child().has_parent() + assert ( + repr(watertight.describe_geometry.first_child().parent()) + == "task < describe_geometry: 0 >" + ) + assert ( + repr(watertight.describe_geometry.first_child().next()) + == "task < create_regions: 0 >" + ) + + assert not watertight.describe_geometry.first_child().has_previous() + assert watertight.describe_geometry.first_child().has_next() + assert ( + watertight.describe_geometry.first_child().next().previous().name() + == "Enclose Fluid Regions (Capping)" + ) + + assert repr(watertight.first_child()) == "task < import_geometry: 0 >" + assert ( + watertight.import_geometry.next().next().next().next().name() + == "Update Regions" + ) + + watertight.import_geometry.insertable_tasks.import_boi_geometry.insert() + watertight.import_geometry.insertable_tasks.import_boi_geometry.insert() + + assert watertight.import_boi_geometry[1].previous().name() == "Import Geometry" + assert ( + watertight.import_boi_geometry[1].next().name() + == "Import Body of Influence Geometry" + ) + assert watertight.import_boi_geometry[1].next().next().name() == "Add Local Sizing" + + +@pytest.mark.codegen_required +@pytest.mark.fluent_version(">=26.1") +def test_workflow_type_checking(new_meshing_session, use_server_meshing_workflow): + meshing = new_meshing_session + watertight = meshing.watertight() + + wf_1 = watertight.first_child() + + assert repr(wf_1) == "task < import_geometry: 0 >" + + assert wf_1.insertable_tasks() + + wf_1.insertable_tasks.import_boi_geometry.insert() + wf_1.insertable_tasks.import_boi_geometry.insert() + + assert repr(wf_1.next()) == "task < import_boi_geometry: 1 >" + assert repr(wf_1.next().next()) == "task < import_boi_geometry: 0 >" + + +@pytest.mark.codegen_required +@pytest.mark.fluent_version(">=26.1") +def test_workflow_traversal(new_meshing_session, use_server_meshing_workflow): + meshing = new_meshing_session + watertight = meshing.watertight() + + assert len(watertight.children()) == 7 + + wf_1 = watertight.first_child() + assert wf_1.name() == "Import Geometry" + assert wf_1.has_parent() + assert wf_1.parent().__class__.__name__ == "WatertightMeshingWorkflow" + assert wf_1.has_previous() is False + assert wf_1.has_next() + assert wf_1.first_child() is None + assert wf_1.last_child() is None + + with pytest.raises(IndexError): + wf_1.previous() + + wf_2 = wf_1.next() + assert wf_2.name() == "Add Local Sizing" + assert wf_2.has_previous() + assert wf_2.has_next() + + wf_4 = wf_2.next().next() + assert wf_4.name() == "Describe Geometry" + wf_4_1 = wf_4.first_child() + assert wf_4_1.name() == "Enclose Fluid Regions (Capping)" + assert wf_4_1.has_next() + assert wf_4_1.has_previous() is False + + assert wf_4_1.has_parent() + assert wf_4_1.parent().name() == "Describe Geometry" + + wf_4_2 = wf_4.first_child().next() + assert wf_4_2.name() == wf_4.last_child().name() == "Create Regions" + assert wf_4_2.has_next() is False + assert wf_4_2.has_previous() + + assert wf_4.next().name() == "Update Regions" + + wf_7 = watertight.last_child() + assert wf_7.name() == "Generate the Volume Mesh" + assert wf_7.has_previous() + assert wf_7.has_next() is False + + with pytest.raises(IndexError): + wf_7.next() + + wf_6 = wf_7.previous() + assert wf_6.name() == "Add Boundary Layers" + + +@pytest.mark.codegen_required +@pytest.mark.fluent_version(">=26.1") +def test_new_watertight_workflow_using_traversal( + new_meshing_session_wo_exit, use_server_meshing_workflow +): + # Import geometry + import_file_name = examples.download_file( + "mixing_elbow.pmdb", "pyfluent/mixing_elbow" + ) + watertight = new_meshing_session_wo_exit.watertight() + wf_1 = watertight.first_child() + wf_1.file_name.set_state(import_file_name) + wf_1.length_unit = "in" + wf_1() + + # Add local sizing + assert wf_1.has_next() + wf_2 = wf_1.next() + wf_2.add_child_to_task() + wf_2() + + # Generate surface mesh + assert wf_2.has_next() + wf_3 = wf_2.next() + wf_3.cfd_surface_mesh_controls.max_size.set_state(0.3) + wf_3() + + # Describe geometry + assert wf_3.has_next() + wf_4 = wf_3.next() + wf_4.update_child_tasks(setup_type_changed=False) + assert wf_4.setup_type.allowed_values() == ["fluid", "fluid_solid_voids", "solid"] + wf_4.setup_type = "fluid" + wf_4.update_child_tasks(setup_type_changed=True) + wf_4() + + # Update boundaries + wf_4_1 = wf_4.first_child() + wf_4_1.boundary_zone_list.set_state(["wall-inlet"]) + wf_4_1.boundary_label_list.set_state(["wall-inlet"]) + wf_4_1.boundary_label_type_list.set_state(["wall"]) + wf_4_1.old_boundary_label_list.set_state(["wall-inlet"]) + wf_4_1.old_boundary_label_type_list.set_state(["velocity-inlet"]) + wf_4_1() + + # Update regions + assert wf_4.has_next() + wf_5 = wf_4.next() + wf_5() + + # Add boundary layers + assert wf_5.has_next() + wf_6 = wf_5.next() + wf_6.add_child_to_task() + wf_6.control_name.set_state("smooth-transition_1") + wf_6.insert_compound_child_task() + assert wf_6.has_next() + assert wf_6.first_child() is not None + wf_6.first_child()() + + # Generate volume mesh + assert wf_6.has_next() + wf_7 = wf_6.next() + + wf_7.volume_fill.set_state("poly-hexcore") + wf_7.volume_fill_controls.hex_max_cell_length = 0.3 + wf_7() + + assert wf_7.has_next() is False + + # Switch to solution mode + solver = new_meshing_session_wo_exit.switch_to_solver() + assert solver.is_active() is True + assert new_meshing_session_wo_exit.is_active() is False + solver.exit() + assert solver.is_active() is False