Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
079438c
后端:添加rpyc+apscheduler服务
4linuxfun Feb 24, 2023
30c6b8e
后端:rpyc+apscheduler服务 fix #100
4linuxfun Feb 24, 2023
46d69e6
后端: 添加任务管理对应接口 fix #100
4linuxfun Feb 24, 2023
9532018
前端:任务中心-任务管理:功能实现 fix #100
4linuxfun Feb 24, 2023
d4e6530
前端:组件封装
4linuxfun Feb 24, 2023
6057efe
后端:通过rpyc实现apscheduler的远程调用,进行任务的管理
4linuxfun Jun 19, 2023
065e8f0
后端:添加Job相关model和crud
4linuxfun Jun 19, 2023
d86d098
后端:添加get_uid依赖
4linuxfun Jun 20, 2023
b9c9caf
后端:修改使用get_uid获取uid
4linuxfun Jun 20, 2023
ce6ee7c
后端:增加get_redis函数,用户获取redis连接
4linuxfun Jun 20, 2023
39ca9f2
后端:实现ws获取redis中的日志信息
4linuxfun Jun 20, 2023
b0b99bf
后端:修改job项目目录
4linuxfun Jun 20, 2023
c297407
后端:添加任务管理相关借口
4linuxfun Jun 20, 2023
9980ee6
前端:实现任务管理相关功能页面
4linuxfun Jun 20, 2023
95d60e5
后端:scheduler后端task进行执行主机任务分离,方便后期扩展执行远程ssh
4linuxfun Jun 28, 2023
80a277f
后端:job增加targets参数,用于指定任务执行主机
4linuxfun Jun 28, 2023
0a438e2
前端:修复升级element plus后中文乱码问题
4linuxfun Jun 28, 2023
cb45acf
前端:任务管理增加targets参数
4linuxfun Jun 28, 2023
f0e7cc5
后端:scheduer增加get_user_jobs,通过uid和job_name快速获取用户的job列表
4linuxfun Jun 30, 2023
2486db8
后端:scheduler task增加时间值
4linuxfun Jun 30, 2023
7e9e4bf
后端:优化任务获取逻辑
4linuxfun Jun 30, 2023
6cee979
后端:重新定义Job相关模型
4linuxfun Jun 30, 2023
0bb4ba9
前端:优化任务管理页面相关代码
4linuxfun Jun 30, 2023
1ce9cc8
前端:任务日志页面展示
4linuxfun Jun 30, 2023
7fd5c91
更新前后端项目依赖版本
4linuxfun Dec 21, 2023
3dc9ea1
update:更新依赖库版本
4linuxfun Jan 3, 2024
58d095b
后端:修复更新pydantic后属性变更
4linuxfun Jan 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
后端:添加任务管理相关借口
  • Loading branch information
4linuxfun committed Jun 20, 2023
commit c297407d4715375611071e636030d4f4029c2dbe
5 changes: 2 additions & 3 deletions server/main.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
from fastapi import FastAPI, Depends
from loguru import logger
from .common.log import init_logging
from .routers.internal import login, user, menu, roles, dictonary
from .routers import job
from .routers.internal import login, user, menu, roles, dictonary, job
from .common.security import auth_check

init_logging()
Expand All @@ -14,7 +13,7 @@
app.include_router(menu.router, tags=['菜单管理'])
app.include_router(roles.router, tags=['角色管理'])
app.include_router(dictonary.router, tags=['数据字典'])
app.include_router(job.router,tags=['任务管理'])
app.include_router(job.router, tags=['任务管理'])


@app.on_event("startup")
Expand Down
248 changes: 248 additions & 0 deletions server/routers/internal/job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
import rpyc
import re
import anyio
from datetime import datetime
from uuid import uuid4
from typing import List, Any, Dict
from sqlmodel import Session, text
from apscheduler.job import Job
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.triggers.date import DateTrigger
from apscheduler.triggers.cron import CronTrigger
from fastapi import APIRouter, Depends, WebSocket, Request
from loguru import logger
from server.settings import settings
from server.models.internal.job import JobAdd, JobLog
from server.common.database import get_session, get_redis
from server.common.response_code import ApiResponse, SearchResponse
from server.common.utils import get_task_logs
from server.schemas.internal.pagination import Pagination
from server.schemas.job import JobSearch, JobLogs, JobLogSearch
from server import crud
from server.common.dep import get_uid

router = APIRouter(prefix='/api/jobs')


def cron_to_dict(cron):
cron_list = re.split(r'\s+', cron)
return {'minute': cron_list[0], 'hour': cron_list[1], 'day': cron_list[2], 'month': cron_list[3],
'day_of_week': cron_list[4]}


@router.get('/switch/{job_id}', summary='切换任务状态')
async def switch_job(job_id: str, ):
try:
conn = rpyc.connect(**settings.rpyc_config)
conn.root.switch_job(job_id)
except ValueError as e:
logger.warning(e)
return ApiResponse(
code=500,
message=str(e)
)
return ApiResponse()


@router.get('/resume/{job_id}', summary='恢复任务')
async def resume_job(job_id: str):
try:
conn = rpyc.connect(**settings.rpyc_config)
conn.root.resume_job(job_id)
except Exception as e:
logger.warning(e)
return ApiResponse(
code=500,
message='恢复任务出错,请联系管理员!'
)
return ApiResponse()


@router.delete('/{job_id}', summary='删除任务', response_model=ApiResponse[str])
async def delete_job(job_id: str, uid: int = Depends(get_uid), session: Session = Depends(get_session)):
sql = text("select job_id from user_job where user_id=:uid")
user_jobs = tuple([job[0] for job in session.execute(sql, {'uid': uid})])
logger.debug(user_jobs)
if job_id not in user_jobs:
return ApiResponse(
code=500,
message='用户无权限操作此任务!'
)
try:
conn = rpyc.connect(**settings.rpyc_config)
conn.root.pause_job(job_id)
delete_user_job = text("delete from user_job where job_id=:job_id")
session.execute(delete_user_job, {'job_id': job_id})
delete_job_logs = text("delete from job_log where job_id=:job_id")
session.execute(delete_job_logs, {'job_id': job_id})
session.commit()
conn.root.remove_job(job_id)
except Exception as e:
logger.warning(e)
return ApiResponse(
code=500,
message='删除任务出错,请联系管理员!'
)
return ApiResponse()


@router.put('/', summary='修改任务', response_model=ApiResponse[str])
async def modify_job(job: JobAdd, session: Session = Depends(get_session)):
logger.debug(job)
if job.trigger == 'cron':
trigger_args = job.trigger_args.dict()
trigger_args.update(cron_to_dict(job.trigger_args.cron))
del trigger_args['cron']
trigger = CronTrigger(**trigger_args)
elif job.trigger == 'date':
trigger = DateTrigger(run_date=job.trigger_args, timezone=job.trigger.timezone)
try:
conn = rpyc.connect(**settings.rpyc_config)
conn.root.modify_job(job.id, kwargs={'job_id': job.id, 'command': job.command},
name=job.name, trigger=trigger)
except Exception as e:
logger.warning(e)
return ApiResponse(
code=500,
message=str(e)
)
return ApiResponse()


@router.post('/', summary='添加任务', response_model=ApiResponse[str])
async def add_job(job: JobAdd, uid: int = Depends(get_uid), session: Session = Depends(get_session)):
logger.debug(job)
# 手动生成job_id,需要传递到内部
job_id = uuid4().hex
# 只支持cron的date任务,interval间隔任务完全可以用cron替代,没必要单独实现功能
if job.trigger == 'cron':
trigger_args: Dict[str, Any] = job.trigger_args.dict()
trigger_args.update(cron_to_dict(job.trigger_args.cron))
del trigger_args['cron']
elif job.trigger == 'date':
trigger_args = {'run_date': job.trigger_args}
try:
conn = rpyc.connect(**settings.rpyc_config)
job = conn.root.add_job('scheduler-server:run_command_with_channel', trigger=job.trigger.value,
kwargs={'job_id': job_id,
'command': job.command}, id=job_id, name=job.name, **trigger_args)
sql = text("INSERT INTO user_job values (:uid,:job_id)")
session.execute(sql, {'uid': uid, "job_id": job_id})
session.commit()
except Exception as e:
logger.warning(e)
return ApiResponse(
code=500,
message=str(e)
)
return ApiResponse(
data=job.id
)


@router.post('/search', summary='获取所有任务', response_model=ApiResponse[SearchResponse[Any]])
async def show_jobs(search: Pagination[JobSearch], uid: int = Depends(get_uid),
session: Session = Depends(get_session)):
job_name = search.search['job_name']
sql = text("select job_id from user_job where user_id=:uid ")
results = session.execute(sql, {'uid': uid})
user_job_ids: List[str] = []
for job_id in results.fetchall():
user_job_ids.append(job_id[0])
logger.debug(f'uid:{uid},jobs:{user_job_ids}')
try:
conn = rpyc.connect(**settings.rpyc_config)
if job_name is None:
all_jobs: List[Job] = conn.root.get_jobs()
else:
search_job = conn.root.get_job(job_name)
if search_job is None:
all_jobs: List[Job] = []
else:
all_jobs: List[Job] = [search_job]
except Exception as e:
logger.warning(e)
return ApiResponse(
code=500,
message=str(e)
)
if len(all_jobs) == 0:
return ApiResponse(
data={
'total': len(all_jobs),
'data': []
}
)

user_jobs = [job for job in all_jobs if job.id in user_job_ids]
logger.debug(user_jobs)

job_info_list: List[Dict[str, Any]] = []
start = (search.page - 1) * search.page_size
end = search.page * search.page_size - 1
for job in user_jobs[start:end]:
logger.debug(job.trigger)
trigger_args: Dict[str, str] = {}
info = {}
if isinstance(job.trigger, CronTrigger):
logger.debug('cron')
for field in job.trigger.fields:
trigger_args[field.name] = str(field)
info.update({
'id': job.id,
'name': job.name,
'trigger': 'cron',
'trigger_args': {
'cron': f"{trigger_args['minute']} {trigger_args['hour']} {trigger_args['day']} {trigger_args['month']} {trigger_args['day_of_week']}",
'start_date': None if job.trigger.start_date is None else job.trigger.start_date.strftime(
"%Y-%m-%d %H:%M:%S"),
'end_date': None if job.trigger.end_date is None else job.trigger.end_date.strftime(
"%Y-%m-%d %H:%M:%S"),
},
'command': job.kwargs['command'],
'status': 'running' if job.next_run_time is not None else 'stop'
})
elif isinstance(job.trigger, DateTrigger):
info.update({
'id': job.id,
'name': job.name,
'trigger': 'date',
'trigger_args': job.trigger.run_date.strftime(
"%Y-%m-%d %H:%M:%S"),
'command': job.kwargs['command'],
'status': 'running' if job.next_run_time is not None else 'stop'
})
logger.debug(info)
job_info_list.append(info)
logger.debug(job_info_list)
return ApiResponse(
data={
'total': len(user_jobs),
'data': job_info_list
}
)


@router.post('/logs', summary='任务日志查询', response_model=ApiResponse[SearchResponse[JobLogs]])
async def job_logs(page_search: Pagination[JobLogSearch], session: Session = Depends(get_session)):
filter_type = JobLogSearch(job_id='like', cmd='like')
total = crud.internal.job_log.search_total(session, page_search.search, filter_type.dict())
jobs = crud.internal.job_log.search(session, page_search, filter_type.dict())
logger.debug(jobs)
return ApiResponse(
data={
'total': total,
'data': jobs
}
)


@router.websocket('/logs/ws/')
async def websocket_endpoint(id: int, job_id: str, trigger: str, websocket: WebSocket,
session: Session = Depends(get_session),
redis=Depends(get_redis)):
await websocket.accept()
async with anyio.create_task_group() as tg:
tg.start_soon(get_task_logs, websocket, redis, session, task_id, trigger)
logger.debug('close websocket')
await websocket.close()