Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
de3eed6
update:后端apscheduler使用自定义scheduler
4linuxfun Jul 19, 2024
a2352c2
update:后端增加job相关接口
4linuxfun Jul 19, 2024
86d82ec
update:前端增加job页面
4linuxfun Jul 19, 2024
23d1d91
update:后端CRUD增加通过ID查询
4linuxfun Jul 19, 2024
253076b
update:其他配置更新
4linuxfun Jul 19, 2024
0c7eb3f
后端:修复hosts inventory错误
4linuxfun Jul 22, 2024
9e2cbf0
前端:更新ConfirmDel函数
4linuxfun Jul 22, 2024
0cbe6e9
前端:任务管理-更新实现
4linuxfun Jul 22, 2024
0001783
后端:rpyc执行日志写入redis和数据库
4linuxfun Jul 31, 2024
0ae3e14
后端:rpyc修改任务逻辑调整
4linuxfun Jul 31, 2024
80c38f3
alembic排除部分表
4linuxfun Jul 31, 2024
cda5580
后端:增加任务管理接口
4linuxfun Jul 31, 2024
c338dd1
后端:增加主机管理接口
4linuxfun Jul 31, 2024
f0c6d3b
前端:任务管理界面实现
4linuxfun Jul 31, 2024
f1773c3
后端:rpyc自定义jobstore
4linuxfun Jul 31, 2024
72c00f7
后端:主机管理、任务管理CRUD部分修改
4linuxfun Jul 31, 2024
c0de28e
前端:增加主机管理相关API
4linuxfun Jul 31, 2024
c8c8e78
前端:独立terminal组件,用于日志展示
4linuxfun Jul 31, 2024
9a426bd
前端:主机管理实现
4linuxfun Jul 31, 2024
0ddd9be
后端:更新统计值方式
4linuxfun Aug 1, 2024
1872ca3
后端:实现主机管理查询功能
4linuxfun Aug 1, 2024
81ed345
后端:修复主机管理分组查找逻辑
4linuxfun Aug 2, 2024
97d94db
前端:增加common函数
4linuxfun Aug 6, 2024
1f94a59
前端:主机管理相关页面功能实现
4linuxfun Aug 6, 2024
c1e7406
前端:增加更新主机分组接口
4linuxfun Aug 6, 2024
6adcc11
后端:更新host、user接口
4linuxfun Aug 6, 2024
1cb2281
前端:主机管理-分页点击查询位置调整
4linuxfun Aug 6, 2024
3eb2a87
后端:主机管理接口实现
4linuxfun Aug 6, 2024
a348e06
前端:任务管理-主机选择对话框功能实现
4linuxfun Aug 8, 2024
1bde445
前端:任务管理-主机选择列表功能实现
4linuxfun Aug 8, 2024
0229736
前端:hosts接口调整
4linuxfun Aug 8, 2024
d4cdd91
后端:主机管理增加多主机信息查询接口
4linuxfun Aug 8, 2024
4bc5ab7
前端:update package.json
4linuxfun Aug 8, 2024
9f4491e
前端:任务管理-用户手动输入模板名称
4linuxfun Aug 8, 2024
190cf9f
前端:任务管理-执行方式使用radio实现
4linuxfun Aug 8, 2024
0b2b127
前端:update
4linuxfun Aug 8, 2024
3a15622
后端:pydantic语法更新
4linuxfun Aug 8, 2024
3b310c8
前端:任务管理-任务日志:修复stats为null时异常报错
4linuxfun Aug 12, 2024
1df1880
scheduler:增加playbook业务逻辑实现
4linuxfun Aug 14, 2024
1c6d460
scheduler:增加注释
4linuxfun Aug 14, 2024
deb50aa
后端:增加playbook管理后端接口
4linuxfun Aug 14, 2024
9d8fe9a
后端:update
4linuxfun Aug 14, 2024
05723cc
前端:任务管理,修订module或playbook为null
4linuxfun Aug 14, 2024
42a5bae
前端:任务管理-脚本管理页面功能实现
4linuxfun Aug 14, 2024
98c5193
scheduler:修复ansible playbook传值为id
4linuxfun Aug 14, 2024
bd9a763
后端:playbook增加部分接口
4linuxfun Aug 14, 2024
4fc8a24
前端:任务管理页面调整,playbook传值改为id
4linuxfun Aug 14, 2024
d175f26
update README.md
4linuxfun Aug 14, 2024
ae12682
update:README.md
4linuxfun Aug 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
update:后端增加job相关接口
  • Loading branch information
4linuxfun committed Jul 19, 2024
commit a2352c216f62f361f93b0bb68c0b80820bf8dd67
8 changes: 7 additions & 1 deletion server/common/database.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import redis.asyncio as redis
import rpyc
from sqlmodel import create_engine, SQLModel, Session, select
from ..settings import engine
from server.settings import settings, engine


def init_db():
Expand All @@ -17,6 +18,11 @@ def get_session():
yield session


def get_rpyc():
with rpyc.connect(**settings.rpyc_config) as conn:
yield conn


def get_or_create(session: Session, model, **kwargs):
"""
检查表中是否存在对象,如果不存在就创建
Expand Down
2 changes: 1 addition & 1 deletion server/crud/internal/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
from .roles import role
from .menu import menu
from .dictonary import data_dict, dict_item
from .job import job_log
from .host import host, group
2 changes: 1 addition & 1 deletion server/models/internal/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from .menu import Menu
from .role import Role, RoleMenu
from .dictonary import DataDict, DictItem
from .job import Job, JobLog
from .host import Host, Group
from pydantic import BaseModel
from typing import TypeVar, Generic, Optional

Expand Down
77 changes: 42 additions & 35 deletions server/models/internal/job.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
from enum import Enum
from typing import Union, List, Tuple, Dict, Any, Optional
from sqlmodel import SQLModel, Field, Column, Relationship, Integer, Unicode, LargeBinary, JSON
from sqlmodel import SQLModel, Field, Column, Relationship, Integer, String, LargeBinary, JSON, Unicode
from sqlalchemy.dialects import mysql
from datetime import datetime
from pydantic import BaseModel
from .relationships import UserJob


class CronJobArgs(BaseModel):
Expand All @@ -18,44 +17,52 @@ class TriggerEnum(str, Enum):
cron = 'cron'


class TriggerArgs(BaseModel):
run_date: Optional[str] = Field(default=None, description="date类型触发器设定执行时间,None为立即执行")
cron: Optional[str] = Field(default=None, description="cron类型触发器参数")
start_date: Optional[str] = Field(default=None, description="cron类型触发器开始时间")
end_date: Optional[str] = Field(default=None, description="cron类型触发器结束时间")


class JobAdd(BaseModel):
id: Optional[str] = None
name: str
targets: List[str]
trigger: TriggerEnum
trigger_args: Union[CronJobArgs, str] = None
command: str
type: str


class Job(SQLModel, table=True):
"""
此表同步apschedule中的建表语句,如果没有,则apscheduler会自动创建对应表
"""
__tablename__ = 'apscheduler_jobs'
id: str = Field(sa_column=Column('id', Unicode(191), primary_key=True))
next_run_time: float = Field(sa_column=Column('next_run_time', mysql.DOUBLE, index=True))
job_state: bytes = Field(sa_column=Column('job_state', LargeBinary, nullable=False))
logs: List["JobLog"] = Relationship(back_populates="job")
user: "User" = Relationship(back_populates="jobs", link_model=UserJob)


class JobLog(SQLModel, table=True):
__tablename__ = 'job_log'
id: Optional[int] = Field(sa_column=Column('id', Integer, primary_key=True, autoincrement=True))
status: int = Field(default=0, sa_column=Column(mysql.TINYINT, comment='执行命令返回状态'))
start_time: datetime = Field(default_factory=datetime.now, sa_column_kwargs={'comment': '任务开始时间'})
end_time: datetime = Field(default=datetime.now, sa_column_kwargs={'comment': '任务结束时间'})
log: JSON = Field(sa_column=Column(JSON, comment='执行日志'))
job_id: Optional[str] = Field(default=None, foreign_key="apscheduler_jobs.id")
job: Optional[Job] = Relationship(back_populates="logs")

class Config:
arbitrary_types_allowed = True
id: Optional[str] = Field(default=None, description="任务ID")
name: str = Field(description="任务名称")
trigger: TriggerEnum = Field(description="触发器类型")
trigger_args: TriggerArgs = Field(description="触发器")
targets: List[str] = Field(description="执行任务的主机")
ansible_args: Dict[str, Any] = Field(default=None, description="ansible任务参数")


# class Job(SQLModel, table=True):
# """
# 此表同步apschedule中的建表语句,如果没有,则apscheduler会自动创建对应表
# """
# __tablename__ = 'jobs'
# id: str = Field(sa_column=Column('id', autoincrement=True, primary_key=True))
# name: str = Field(sa_column=Column('name', String(50), nullable=False, unique=True))
# create_time: float = Field(sa_column=Column('create_time', mysql.DOUBLE, index=True))
# update_time: float = Field(sa_column=Column('update_time', mysql.DOUBLE, index=True))
# job_id: str = Field(sa_column=Column('job_id', Unicode(255)))
# job_logs: List["JobLog"] = Relationship(back_populates="job")
#
#
# class JobLog(SQLModel, table=True):
# __tablename__ = 'job_log'
# id: Optional[int] = Field(sa_column=Column('id', Integer, primary_key=True, autoincrement=True))
# status: int = Field(sa_column=Column(mysql.TINYINT, default=0, comment='执行命令返回状态'))
# start_time: datetime = Field(default_factory=datetime.now, sa_column_kwargs={'comment': '任务开始时间'})
# end_time: datetime = Field(default=datetime.now, sa_column_kwargs={'comment': '任务结束时间'})
# log: JSON = Field(sa_column=Column(JSON, comment='执行日志'))
# job_id: Optional[str] = Field(default=None, foreign_key="apscheduler_jobs.id")
# job: Optional[Job] = Relationship(back_populates="job_logs")
#
# class Config:
# arbitrary_types_allowed = True


class JobSearch(SQLModel):
job_name: Optional[str] = None
job_trigger: Optional[str] = None


class JobLogs(SQLModel):
Expand Down
14 changes: 7 additions & 7 deletions server/models/internal/relationships.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ class UserRole(SQLModel, table=True):
role_id: int = Field(foreign_key="roles.id", primary_key=True)


class UserJob(SQLModel, table=True):
"""
通过中间表实现:用户-任务的对应关系
"""
__tablename__ = 'user_job'
user_id: int = Field(foreign_key="user.id", primary_key=True, nullable=False)
job_id: str = Field(Unicode(191), foreign_key="apscheduler_jobs.id", primary_key=True, nullable=False)
# class UserJob(SQLModel, table=True):
# """
# 通过中间表实现:用户-任务的对应关系
# """
# __tablename__ = 'user_job'
# user_id: int = Field(foreign_key="user.id", primary_key=True, nullable=False)
# job_id: str = Field(Unicode(191), foreign_key="apscheduler_jobs.id", primary_key=True, nullable=False)
118 changes: 48 additions & 70 deletions server/routers/internal/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,38 +3,37 @@
import anyio
from datetime import datetime
from uuid import uuid4
from typing import List, Any, Dict
from typing import List, Any, Dict, TYPE_CHECKING
from sqlmodel import Session, text
from apscheduler.job import Job
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.triggers.date import DateTrigger
from apscheduler.triggers.cron import CronTrigger
from fastapi import APIRouter, Depends, WebSocket, Request
from loguru import logger
from server.settings import settings
from server.models.internal.job import JobAdd, JobLog
from server.common.database import get_session, get_redis
from server.models.internal.job import JobAdd
from server.common.database import get_session, get_redis, get_rpyc
from server.common.response_code import ApiResponse, SearchResponse
from server.common.utils import get_task_logs
from server.models.internal import Pagination
from server.models.internal.job import JobSearch, JobLogs, JobLogSearch
from server import crud
from server.common.dep import get_uid

router = APIRouter(prefix='/api/jobs')

# if TYPE_CHECKING:
# from apscheduler.job import Job

def cron_to_dict(cron):
cron_list = re.split(r'\s+', cron)
return {'minute': cron_list[0], 'hour': cron_list[1], 'day': cron_list[2], 'month': cron_list[3],
'day_of_week': cron_list[4]}
router = APIRouter(prefix='/api/jobs')


@router.get('/switch/{job_id}', summary='切换任务状态')
async def switch_job(job_id: str, ):
@router.get('/switch/{job_id}', summary='任务状态切换')
async def switch_job(job_id: str, status: int):
try:
conn = rpyc.connect(**settings.rpyc_config)
conn.root.switch_job(job_id)
if status == 1:
conn.root.resume_job(job_id)
else:
conn.root.pause_job(job_id)
except ValueError as e:
logger.warning(e)
return ApiResponse(
Expand All @@ -44,20 +43,6 @@ async def switch_job(job_id: str, ):
return ApiResponse()


@router.get('/resume/{job_id}', summary='恢复任务')
async def resume_job(job_id: str):
try:
conn = rpyc.connect(**settings.rpyc_config)
conn.root.resume_job(job_id)
except Exception as e:
logger.warning(e)
return ApiResponse(
code=500,
message='恢复任务出错,请联系管理员!'
)
return ApiResponse()


@router.delete('/{job_id}', summary='删除任务', response_model=ApiResponse[str])
async def delete_job(job_id: str, uid: int = Depends(get_uid), session: Session = Depends(get_session)):
sql = text("select job_id from user_job where user_id=:uid")
Expand Down Expand Up @@ -89,17 +74,10 @@ async def delete_job(job_id: str, uid: int = Depends(get_uid), session: Session
@router.put('/', summary='修改任务', response_model=ApiResponse[str])
async def modify_job(job: JobAdd, session: Session = Depends(get_session)):
logger.debug(job)
if job.trigger == 'cron':
trigger_args = job.trigger_args.dict()
trigger_args.update(cron_to_dict(job.trigger_args.cron))
del trigger_args['cron']
trigger = CronTrigger(**trigger_args)
elif job.trigger == 'date':
trigger = DateTrigger(run_date=job.trigger_args)
try:
conn = rpyc.connect(**settings.rpyc_config)
conn.root.modify_job(job.id, kwargs={'job_id': job.id, 'targets': job.targets, 'command': job.command},
name=job.name, trigger=trigger)
conn.root.modify_job(job.id, kwargs={'targets': job.targets, 'ansible_args': job.ansible_args},
name=job.name, trigger=job.trigger, trigger_args=job.trigger_args.model_dump())
except Exception as e:
logger.warning(e)
return ApiResponse(
Expand All @@ -112,41 +90,34 @@ async def modify_job(job: JobAdd, session: Session = Depends(get_session)):
@router.post('/', summary='添加任务', response_model=ApiResponse[str])
async def add_job(job: JobAdd, uid: int = Depends(get_uid), session: Session = Depends(get_session)):
logger.debug(job)
# 手动生成job_id,需要传递到内部
# 手动生成job_id,传入执行函数,方便后期日志写入redis
job_id = uuid4().hex
# 只支持cron的date任务,interval间隔任务完全可以用cron替代,没必要单独实现功能
if job.trigger == 'cron':
trigger_args: Dict[str, Any] = job.trigger_args.dict()
trigger_args.update(cron_to_dict(job.trigger_args.cron))
del trigger_args['cron']
elif job.trigger == 'date':
trigger_args = {'run_date': job.trigger_args}
logger.debug(f'user defined job ID:{job_id}')
# 只支持cron和date任务,interval间隔任务完全可以用cron替代,没必要单独实现功能
try:
conn = rpyc.connect(**settings.rpyc_config)
job = conn.root.add_job('scheduler-server:run_command_with_channel', trigger=job.trigger.value,
kwargs={'job_id': job_id,
'targets': job.targets,
'command': job.command}, id=job_id, name=job.name, **trigger_args)
sql = text("INSERT INTO user_job values (:uid,:job_id)")
session.execute(sql, {'uid': uid, "job_id": job_id})
session.commit()
job = conn.root.add_job('scheduler-server:ansible_task', trigger=job.trigger, id=job_id,
kwargs={'job_id': job_id, 'targets': job.targets,
'ansible_args': job.ansible_args}, name=job.name,
trigger_args=job.trigger_args.model_dump())
logger.debug(job.id)
except Exception as e:
logger.warning(e)
return ApiResponse(
code=500,
message=str(e)
)
return ApiResponse(
data=job.id
message=f'新建任务成功:{job.name}'
)


@router.post('/search', summary='获取所有任务', response_model=ApiResponse[SearchResponse[Any]])
async def show_jobs(search: Pagination[JobSearch], uid: int = Depends(get_uid)):
job_name = search.search['job_name']
async def show_jobs(search: Pagination[JobSearch], uid: int = Depends(get_uid), conn: Any = Depends(get_rpyc)):
# job_name = search.search.job_name
try:
conn = rpyc.connect(**settings.rpyc_config)
user_jobs: List[Job] = conn.root.get_user_jobs(uid, job_name)
user_jobs = conn.root.get_user_jobs(uid=None)
logger.debug(user_jobs)
except Exception as e:
logger.warning(e)
return ApiResponse(
Expand All @@ -156,7 +127,7 @@ async def show_jobs(search: Pagination[JobSearch], uid: int = Depends(get_uid)):
if len(user_jobs) == 0:
return ApiResponse(
data={
'total': len(user_jobs),
'total': 0,
'data': []
}
)
Expand All @@ -175,38 +146,45 @@ async def show_jobs(search: Pagination[JobSearch], uid: int = Depends(get_uid)):
logger.debug('cron')
for field in job.trigger.fields:
trigger_args[field.name] = str(field)
# job.kwargs['targets']默认为rpyc类型的list,需要转成python的list类型,否则pydantic类型检查会去找rpyc list,报错
info.update({
'id': job.id,
'name': job.name,
'targets': job.kwargs['targets'],
'id': str(job.id),
'name': str(job.name),
'targets': list(job.kwargs['targets']),
'trigger': 'cron',
'trigger_args': {
'cron': f"{trigger_args['minute']} {trigger_args['hour']} {trigger_args['day']} {trigger_args['month']} {trigger_args['day_of_week']}",
'start_date': None if job.trigger.start_date is None else job.trigger.start_date.strftime(
"%Y-%m-%d %H:%M:%S"),
'end_date': None if job.trigger.end_date is None else job.trigger.end_date.strftime(
"%Y-%m-%d %H:%M:%S"),
'run_date': None,
},
'command': job.kwargs['command'],
'status': 'running' if job.next_run_time is not None else 'stop'
'ansible_args': dict(job.kwargs['ansible_args']),
'status': '1' if job.next_run_time is not None else '0'
})
elif isinstance(job.trigger, DateTrigger):
info.update({
'id': job.id,
'name': job.name,
'targets': job.kwargs['targets'],
'id': str(job.id),
'name': str(job.name),
'targets': list(job.kwargs['targets']),
'trigger': 'date',
'trigger_args': job.trigger.run_date.strftime(
"%Y-%m-%d %H:%M:%S"),
'command': job.kwargs['command'],
'status': 'running' if job.next_run_time is not None else 'stop'
'trigger_args': {
'run_date': str(job.trigger.run_date.strftime(
"%Y-%m-%d %H:%M:%S")),
'cron': None,
'start_date': None,
'end_date': None,
},
'ansible_args': dict(job.kwargs['ansible_args']),
'status': '1' if job.next_run_time is not None else '0'
})
logger.debug(info)
job_info_list.append(info)
logger.debug(job_info_list)
return ApiResponse(
data={
'total': len(user_jobs),
'total': 10,
'data': job_info_list
}
)
Expand Down