Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
53ba21d
update:前端-优化useTerm实现代码
4linuxfun Jan 17, 2025
2143ded
update:后端-简化scheduler任务,删除不必要代码
4linuxfun Jan 21, 2025
8daef2b
update:后端-scheduler完善
4linuxfun Jan 21, 2025
ae16ea1
update:后端-排除websocket权限验证
4linuxfun Jan 21, 2025
19e8bad
update:后端-实现“任务管理”模块功能
4linuxfun Jan 21, 2025
f2bf6d5
update:前端-重构usePagination
4linuxfun Jan 21, 2025
bf8f03c
update:前端-实现“执行任务”功能
4linuxfun Jan 21, 2025
c3c18b5
update:前端usePagination分页增加initialPageSize参数
4linuxfun Feb 10, 2025
7647004
update:前端-任务管理分页日志默认改为5
4linuxfun Feb 10, 2025
3544264
update:service.sh脚本增加部分逻辑判断
4linuxfun Feb 10, 2025
c75e188
update:前端usePagination补全async写法
4linuxfun Feb 11, 2025
9091c98
update:前端-补全缺失组件引用
4linuxfun Feb 11, 2025
ac23b67
fix:前端-修复主机选择关联带出bug
4linuxfun Feb 11, 2025
f9f57b1
fix:前端-修复翻页多选状态保持问题
4linuxfun Feb 12, 2025
e6a1b58
update:后端-更新config配置模式
4linuxfun Feb 12, 2025
23865ae
update:后端-修改yaml配置后的参数
4linuxfun Feb 12, 2025
e9c7183
update:后端-更新服务控制脚本
4linuxfun Feb 12, 2025
e493d2c
update:后端-修改redis过期时间
4linuxfun Feb 12, 2025
28ba966
update:后端-任务管理功能模块实现
4linuxfun Feb 12, 2025
0413a51
update:更新README.md
4linuxfun Feb 13, 2025
fd9180a
update:后端-增加生产环境配置模板
4linuxfun Feb 13, 2025
9cb6b59
update:其他
4linuxfun Feb 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
update:后端-实现“任务管理”模块功能
  • Loading branch information
4linuxfun committed Jan 21, 2025
commit 19e8bad3e2a6b99d8810ae14f0371ab99f27f387
58 changes: 37 additions & 21 deletions server/common/utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import json
import os
import websockets
from collections import defaultdict
from fastapi import WebSocket
from typing import List, Dict, Generic, Type, TypeVar
from loguru import logger
from sqlmodel import SQLModel
from sqlmodel import SQLModel, select
from server import crud

from ..models.internal.dictonary import DictBase
from ..models.internal.menu import MenusWithChild
Expand All @@ -18,8 +20,8 @@ class Tree(Generic[T]):
"""

def __init__(self, tree_list: List[T], model: Type[T]):
self.tree_dict: Dict[int, T] = {tree.id:model(**tree.model_dump()) for tree in tree_list}
self.children_map:Dict[int,List[T]] = defaultdict(list)
self.tree_dict: Dict[int, T] = {tree.id: model(**tree.model_dump()) for tree in tree_list}
self.children_map: Dict[int, List[T]] = defaultdict(list)
for tree in self.tree_dict.values():
self.children_map[tree.parent_id].append(tree)

Expand Down Expand Up @@ -100,37 +102,51 @@ def remove_tmp_file(file):
os.remove(file)


async def get_task_logs(ws: WebSocket, redis, session, task_id: str, trigger: str):
key_name = f'tasks:{task_id}'
async def get_task_logs(ws: WebSocket, redis, session, job_id: str):
"""
通过websocket给前端展示实时日志信息
Args:
ws(WebSocket): ws连接,用于给前端传递日志
redis(Redis): redis连接,用于获取实时日志
session(Session): 数据库连接,用于直接获取日志
job_id(str): 任务ID
Returns:
None
"""
key_name = f'tasks:{job_id}'
last_id = 0
sleep_ms = 5000
if trigger == 'date':
job_log.get(session,)
while True:
if await redis.exists(key_name):
# key存在于redis中,从redis中实时获取日志信息
resp = await redis.xread({f'{key_name}': last_id}, count=1, block=sleep_ms)
if resp:
key, message = resp[0]
last_id = message[0][0]
# last_id, data = message[0]
# data_dict = {k.decode("utf-8"): data[k].decode("utf-8") for k in data}
msg = message[0][1].get(b'msg', b'').decode("utf-8")
try:
await ws.send_json({'task_id': task_id, 'data': message})
logger.debug(msg)
await ws.send_text(msg + '\r\n')
except websockets.exceptions.ConnectionClosed as e:
logger.warning(f"websocket 异常关闭:{e}")
break
elif last_id == 0 and (result.state == 'FAILURE' or result.state == 'SUCCESS'):
# last_id为0表示redis中不存在key,task_id为已执行完成的任务,且redis缓存已过期
logger.debug(f'{task_id} state:{result.state}')
try:
await ws.send_json({'task_id': task_id, 'data': AsyncResult(task_id).result})
except websockets.exceptions.ConnectionClosed as e:
logger.warning(f"websocket 异常关闭:{e}")
break
elif result.state == 'PENDING' or result.state == 'STARTED':
logger.debug(f"{task_id} state:{result.state}")
continue
else:
logger.debug(f'{task_id}已结束')
break
# Redis 中不存在 key,从数据库的 job_logs 表中获取日志
logger.debug(f'{job_id} 已结束,尝试从数据库获取日志')
try:
logs = crud.internal.job_logs.get_by_job_id(session, job_id)

if logs:

await ws.send_text(json.dump(logs.log) + '\r\n')
logger.debug(f"从数据库获取日志成功: {len(logs)} 条记录")
else:
logger.debug(f"数据库中没有 {job_id} 的日志记录")

break # 数据库日志获取完成后退出循环
except Exception as e:
logger.error(f"从数据库获取日志失败: {e}")
return False
return True
3 changes: 2 additions & 1 deletion server/crud/internal/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@


class CRUDJobLogs(CRUDBase[JobLogs]):
pass
def get_by_job_id(self, db: Session, job_id: str):
return db.exec(select(self.model).where(self.model.job_id == job_id)).one()


job_logs = CRUDJobLogs(JobLogs)
4 changes: 2 additions & 2 deletions server/models/internal/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@
from .job import JobLogs
from .playbook import Playbook
from pydantic import BaseModel
from typing import TypeVar, Generic, Optional
from typing import TypeVar, Generic, Optional, Union

T = TypeVar('T')


class Pagination(BaseModel, Generic[T]):
search: T
search: Union[T, None]
page: Optional[int] = 1
page_size: Optional[int] = 10
model: Optional[str] = 'asc'
17 changes: 14 additions & 3 deletions server/models/internal/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,19 @@ class TriggerArgs(BaseModel):
end_date: Optional[str] = Field(default=None, description="cron类型触发器结束时间")


class AnsibleArgs(BaseModel):
module: Union[str, None] = Field(default=None, description="ansible模块")
module_args: Union[str, None] = Field(default=None, description="ansible模块参数")
playbook: Union[str, None] = Field(default=None, description="ansible playbook")


class JobAdd(BaseModel):
id: Optional[str] = Field(default=None, description="任务ID")
name: str = Field(description="任务名称")
name: Union[str, None] = Field(description="任务名称")
trigger: TriggerEnum = Field(description="触发器类型")
trigger_args: TriggerArgs = Field(description="触发器")
targets: List[int] = Field(description="执行任务的主机")
ansible_args: Dict[str, Any] = Field(default=None, description="ansible任务参数")
ansible_args: AnsibleArgs = Field(description="ansible任务参数")


# class Job(SQLModel, table=True):
Expand All @@ -47,13 +53,17 @@ class JobAdd(BaseModel):
#
#
class JobLogs(SQLModel, table=True):
"""
任务执行日志相关表
"""
__tablename__ = 'job_logs'
id: Optional[int] = Field(sa_column=Column('id', Integer, primary_key=True, autoincrement=True))
job_id: Optional[str] = Field(sa_column=Column('job_id', String(191), index=True))
start_time: datetime = Field(default_factory=datetime.now, sa_column_kwargs={'comment': '任务开始时间'})
end_time: datetime = Field(default=datetime.now, sa_column_kwargs={'comment': '任务结束时间'})
log: str = Field(sa_column=Column(mysql.TEXT, comment='执行日志'))
stats: str = Field(sa_column=Column(mysql.TEXT, comment='任务返回状态'))
type: int = Field(sa_column=Column('type', mysql.TINYINT, comment='任务类型,0:cron,1:date'))


class JobSearch(SQLModel):
Expand All @@ -62,4 +72,5 @@ class JobSearch(SQLModel):


class JobLogSearch(BaseModel):
job_id: str
job_id: Union[str, None] = None
type: Union[int, None] = None
14 changes: 8 additions & 6 deletions server/routers/internal/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ async def add_job(job: JobAdd, uid: int = Depends(get_uid), session: Session = D
conn = rpyc.connect(**settings.rpyc_config)
job = conn.root.add_job('scheduler-server:ansible_task', trigger=job.trigger, id=job_id,
kwargs={'job_id': job_id, 'targets': job.targets,
'ansible_args': job.ansible_args}, name=job.name,
'ansible_args': job.ansible_args.model_dump(),
'task_type': 0 if job.trigger == 'cron' else 1}, name=job.name,
trigger_args=job.trigger_args.model_dump())
logger.debug(job.id)
except Exception as e:
Expand All @@ -93,6 +94,7 @@ async def add_job(job: JobAdd, uid: int = Depends(get_uid), session: Session = D
message=str(e)
)
return ApiResponse(
data=job_id,
message=f'新建任务成功:{job.name}'
)

Expand Down Expand Up @@ -178,8 +180,8 @@ async def show_jobs(search: Pagination[JobSearch], uid: int = Depends(get_uid),
@router.post('/logs', summary='任务日志查询', response_model=ApiResponse[SearchResponse[JobLogs]])
async def job_logs(page_search: Pagination[JobLogSearch], session: Session = Depends(get_session)):
logger.debug(page_search)
total = crud.internal.job_logs.search_total(session, page_search.search, filter_type={'job_id': 'eq'})
jobs = crud.internal.job_logs.search(session, page_search, filter_type={'job_id': 'eq'})
total = crud.internal.job_logs.search_total(session, page_search.search, filter_type={'job_id': 'eq','type':'eq'})
jobs = crud.internal.job_logs.search(session, page_search, filter_type={'job_id': 'eq','type':'eq'})
logger.debug(jobs)
return ApiResponse(
data={
Expand All @@ -189,12 +191,12 @@ async def job_logs(page_search: Pagination[JobLogSearch], session: Session = Dep
)


@router.websocket('/logs/ws/')
async def websocket_endpoint(id: int, job_id: str, trigger: str, websocket: WebSocket,
@router.websocket('/logs/ws/{job_id}')
async def websocket_endpoint(job_id: str, websocket: WebSocket,
session: Session = Depends(get_session),
redis=Depends(get_redis)):
await websocket.accept()
async with anyio.create_task_group() as tg:
tg.start_soon(get_task_logs, websocket, redis, session, task_id, trigger)
tg.start_soon(get_task_logs, websocket, redis, session, job_id)
logger.debug('close websocket')
await websocket.close()