From 079438ca0bfbfe31fb61cc0bee1f4dc6dd4b672c Mon Sep 17 00:00:00 2001 From: 4linuxfun Date: Fri, 24 Feb 2023 15:50:53 +0800 Subject: [PATCH 01/27] =?UTF-8?q?=E5=90=8E=E7=AB=AF=EF=BC=9A=E6=B7=BB?= =?UTF-8?q?=E5=8A=A0rpyc+apscheduler=E6=9C=8D=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rpyc_scheduler/config.py | 0 rpyc_scheduler/models.py | 0 rpyc_scheduler/scheduler-server.py | 128 +++++++++++++++++++++++++++++ rpyc_scheduler/tasks.py | 0 rpyc_scheduler/utils.py | 37 +++++++++ 5 files changed, 165 insertions(+) create mode 100644 rpyc_scheduler/config.py create mode 100644 rpyc_scheduler/models.py create mode 100644 rpyc_scheduler/scheduler-server.py create mode 100644 rpyc_scheduler/tasks.py create mode 100644 rpyc_scheduler/utils.py diff --git a/rpyc_scheduler/config.py b/rpyc_scheduler/config.py new file mode 100644 index 0000000..e69de29 diff --git a/rpyc_scheduler/models.py b/rpyc_scheduler/models.py new file mode 100644 index 0000000..e69de29 diff --git a/rpyc_scheduler/scheduler-server.py b/rpyc_scheduler/scheduler-server.py new file mode 100644 index 0000000..516cb10 --- /dev/null +++ b/rpyc_scheduler/scheduler-server.py @@ -0,0 +1,128 @@ +""" +远程调用apscheduler,参考: +https://github.com/agronholm/apscheduler/tree/3.x/examples/rpc +https://gist.github.com/gsw945/15cbb71eaca5be66787a2c187414e36f +""" + +import redis +import rpyc +from loguru import logger +from typing import Dict +from rpyc.utils.server import ThreadedServer +from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore +from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor +from apscheduler.events import ( + EVENT_JOB_EXECUTED, + EVENT_JOB_ERROR, + EVENT_JOB_ADDED, + EVENT_JOB_SUBMITTED, + EVENT_JOB_REMOVED +) + + +class Channel: + def __init__(self, redis_config, task_id): + logger.debug(f'__init__,redis:{redis_config},task id:{task_id}') + self.conn = redis.Redis(**redis_config, decode_responses=True) + self.task_id = task_id + self.task_key = f"tasks:{self.task_id}" + self._expire = 60 + + @property + def msg(self, ): + return self.conn.xrange(self.task_key, '-', '+') + + @property + def expire(self, ): + return self._expire + + @expire.setter + def expire(self, value): + self._expire = value + + def send(self, msg: Dict[Any, Any]): + self.conn.xadd(self.task_key, msg) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.conn.expire(self.task_key, self._expire) + self.close() + + def close(self, ): + self.conn.close() + + +def subprocess_with_channel(task_id, command): + with Channel(redis_config, task_id=task_id) as channel: + channel.send({'msg': '开始执行任务:'}) + channel.send({'msg': f"执行命令:{command}"}) + process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, + shell=True) + while process.poll() is None: + message = process.stdout.readline() + channel.send({'msg': message}) + channel.send({'msg': '结束执行任务'}) + results = channel.msg + return results + + +def print_text(text): + logger.debug(text) + + +class SchedulerService(rpyc.Service): + def exposed_add_job(self, func, *args, **kwargs): + return scheduler.add_job(func, *args, **kwargs) + + def exposed_modify_job(self, job_id, jobstore=None, **changes): + return scheduler.modify_job(job_id, jobstore, **changes) + + def exposed_pause_job(self, job_id, jobstore=None): + return scheduler.pause_job(job_id, jobstore) + + def exposed_resume_job(self, job_id, jobstore=None): + return scheduler.resume_job(job_id, jobstore) + + def exposed_remove_job(self, job_id, jobstore=None): + return scheduler.remove_job(job_id, jobstore) + + def exposed_get_jobs(self, jobstore=None): + return scheduler.get_jobs(jobstore) + + +def event_listener(event): + if event.code == EVENT_JOB_ADDED: + logger.debug('event add') + elif event.code == EVENT_JOB_EXECUTED: + logger.debug('event executed') + elif event.code == EVENT_JOB_REMOVED: + logger.debug('event removed') + elif event.code == EVENT_JOB_ERROR: + logger.debug('event error') + + +if __name__ == '__main__': + # apscheduler相关服务启动 + jobstores = { + 'default': SQLAlchemyJobStore(url='mysql+pymysql://root:123456@192.168.137.129/devops') + } + excutors = { + 'default': ThreadPoolExecutor(20), + 'processpool': ProcessPoolExecutor(20) + } + scheduler = BackgroundScheduler(jobstores=jobstores, excutors=excutors) + scheduler.add_listener(event_listener, + EVENT_JOB_EXECUTED | EVENT_JOB_ERROR | EVENT_JOB_ADDED | EVENT_JOB_REMOVED | EVENT_JOB_SUBMITTED) + scheduler.start() + + # 启动rpyc服务 + server = ThreadedServer(SchedulerService, port=18861, protocol_config={'allow_public_attrs': True}) + try: + server.start() + except (KeyboardInterrupt, SystemExit): + pass + finally: + scheduler.shutdown() diff --git a/rpyc_scheduler/tasks.py b/rpyc_scheduler/tasks.py new file mode 100644 index 0000000..e69de29 diff --git a/rpyc_scheduler/utils.py b/rpyc_scheduler/utils.py new file mode 100644 index 0000000..9c70f31 --- /dev/null +++ b/rpyc_scheduler/utils.py @@ -0,0 +1,37 @@ +import redis +from loguru import logger +from typing import Dict, Any + + +class Channel: + def __init__(self, redis_config, task_id): + logger.debug(f'__init__,redis:{redis_config},task id:{task_id}') + self.conn = redis.Redis(**redis_config, decode_responses=True) + self.task_id = task_id + self.task_key = f"tasks:{self.task_id}" + self._expire = 60 + + @property + def msg(self, ): + return self.conn.xrange(self.task_key, '-', '+') + + @property + def expire(self, ): + return self._expire + + @expire.setter + def expire(self, value): + self._expire = value + + def send(self, msg: Dict[Any, Any]): + self.conn.xadd(self.task_key, msg) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.conn.expire(self.task_key, self._expire) + self.close() + + def close(self, ): + self.conn.close() From 30c6b8ea8b1aae4516588679686285bd6942dc23 Mon Sep 17 00:00:00 2001 From: 4linuxfun Date: Fri, 24 Feb 2023 15:56:24 +0800 Subject: [PATCH 02/27] =?UTF-8?q?=E5=90=8E=E7=AB=AF=EF=BC=9Arpyc+apschedul?= =?UTF-8?q?er=E6=9C=8D=E5=8A=A1=20fix=20#100?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rpyc_scheduler/config.py | 11 +++++ rpyc_scheduler/models.py | 21 +++++++++ rpyc_scheduler/scheduler-server.py | 73 ++++++------------------------ rpyc_scheduler/tasks.py | 26 +++++++++++ rpyc_scheduler/utils.py | 5 +- 5 files changed, 75 insertions(+), 61 deletions(-) diff --git a/rpyc_scheduler/config.py b/rpyc_scheduler/config.py index e69de29..610e2eb 100644 --- a/rpyc_scheduler/config.py +++ b/rpyc_scheduler/config.py @@ -0,0 +1,11 @@ +from pydantic import BaseSettings + + +class RpcConfig(BaseSettings): + # apscheduler指定job store和excutors + apscheduler_job_store = 'mysql+pymysql://root:123456@192.168.137.129/devops' + redis = {'host': '192.168.137.129', 'password':'seraphim','port': 6379, 'health_check_interval': 30} + rpc_port = 18861 + + +rpc_config = RpcConfig() diff --git a/rpyc_scheduler/models.py b/rpyc_scheduler/models.py index e69de29..ceb47b5 100644 --- a/rpyc_scheduler/models.py +++ b/rpyc_scheduler/models.py @@ -0,0 +1,21 @@ +from typing import Optional +from datetime import datetime +from sqlmodel import SQLModel, Field, Column, Integer, create_engine, Session +from sqlalchemy.dialects import mysql +from config import rpc_config + + +class TaskLog(SQLModel, table=True): + id: Optional[int] = Field(sa_column=Column('id', Integer, primary_key=True, autoincrement=True)) + task_id: str = Field(max_length=20, sa_column_kwargs={'comment': '任务名'}) + status: int = Field(default=0, sa_column=Column(mysql.TINYINT, comment='执行命令返回状态')) + exe_time: datetime = Field(default_factory=datetime.now, sa_column_kwargs={'comment': '任务执行时间'}) + cmd: str = Field(sa_column_kwargs={'comment': '执行命令'}) + type: int = Field(default=0, sa_column=Column(mysql.TINYINT, comment='类型:0单行命令,1:脚本文件')) + stdout: str = Field(sa_column=Column(mysql.MEDIUMTEXT, comment='执行日志')) + + +engine = create_engine(rpc_config.apscheduler_job_store, pool_size=5, max_overflow=10, pool_timeout=30, + pool_pre_ping=True) +# SQLModel.metadata.create_all(engine) +session = Session(engine) diff --git a/rpyc_scheduler/scheduler-server.py b/rpyc_scheduler/scheduler-server.py index 516cb10..4fab8ae 100644 --- a/rpyc_scheduler/scheduler-server.py +++ b/rpyc_scheduler/scheduler-server.py @@ -4,10 +4,10 @@ https://gist.github.com/gsw945/15cbb71eaca5be66787a2c187414e36f """ -import redis import rpyc +from tasks import subprocess_with_channel from loguru import logger -from typing import Dict +from config import rpc_config from rpyc.utils.server import ThreadedServer from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore @@ -21,56 +21,9 @@ ) -class Channel: - def __init__(self, redis_config, task_id): - logger.debug(f'__init__,redis:{redis_config},task id:{task_id}') - self.conn = redis.Redis(**redis_config, decode_responses=True) - self.task_id = task_id - self.task_key = f"tasks:{self.task_id}" - self._expire = 60 - - @property - def msg(self, ): - return self.conn.xrange(self.task_key, '-', '+') - - @property - def expire(self, ): - return self._expire - - @expire.setter - def expire(self, value): - self._expire = value - - def send(self, msg: Dict[Any, Any]): - self.conn.xadd(self.task_key, msg) - - def __enter__(self): - return self - - def __exit__(self, exc_type, exc_val, exc_tb): - self.conn.expire(self.task_key, self._expire) - self.close() - - def close(self, ): - self.conn.close() - - -def subprocess_with_channel(task_id, command): - with Channel(redis_config, task_id=task_id) as channel: - channel.send({'msg': '开始执行任务:'}) - channel.send({'msg': f"执行命令:{command}"}) - process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, - shell=True) - while process.poll() is None: - message = process.stdout.readline() - channel.send({'msg': message}) - channel.send({'msg': '结束执行任务'}) - results = channel.msg - return results - - -def print_text(text): - logger.debug(text) +def print_text(*args, **kwargs): + logger.debug(args) + logger.debug(kwargs) class SchedulerService(rpyc.Service): @@ -92,6 +45,9 @@ def exposed_remove_job(self, job_id, jobstore=None): def exposed_get_jobs(self, jobstore=None): return scheduler.get_jobs(jobstore) + def exposed_get_job(self, job_id, jobstore=None): + return scheduler.get_job(job_id, jobstore) + def event_listener(event): if event.code == EVENT_JOB_ADDED: @@ -105,21 +61,22 @@ def event_listener(event): if __name__ == '__main__': - # apscheduler相关服务启动 - jobstores = { - 'default': SQLAlchemyJobStore(url='mysql+pymysql://root:123456@192.168.137.129/devops') + job_store = { + 'default': SQLAlchemyJobStore(url=rpc_config.apscheduler_job_store) } - excutors = { + apscheduler_excutors = { 'default': ThreadPoolExecutor(20), 'processpool': ProcessPoolExecutor(20) } - scheduler = BackgroundScheduler(jobstores=jobstores, excutors=excutors) + scheduler = BackgroundScheduler(jobstores=job_store, + excutors=apscheduler_excutors) scheduler.add_listener(event_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR | EVENT_JOB_ADDED | EVENT_JOB_REMOVED | EVENT_JOB_SUBMITTED) scheduler.start() # 启动rpyc服务 - server = ThreadedServer(SchedulerService, port=18861, protocol_config={'allow_public_attrs': True}) + server = ThreadedServer(SchedulerService, port=rpc_config.rpc_port, + protocol_config={'allow_public_attrs': True, 'allow_pickle': True}) try: server.start() except (KeyboardInterrupt, SystemExit): diff --git a/rpyc_scheduler/tasks.py b/rpyc_scheduler/tasks.py index e69de29..c991408 100644 --- a/rpyc_scheduler/tasks.py +++ b/rpyc_scheduler/tasks.py @@ -0,0 +1,26 @@ +import subprocess +import json +from loguru import logger +from utils import Channel +from config import rpc_config +from models import session, TaskLog + + +def subprocess_with_channel(task_id=None, command=None): + """ + 本地执行命令,并把任务实时输入redis中。任务结束后,日志写入数据库中 + """ + with Channel(rpc_config.redis, task_id=task_id) as channel: + logger.debug(f"task is:{task_id}") + channel.send({'msg': '开始执行任务:'}) + channel.send({'msg': f"执行命令:{command}"}) + process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, + shell=True) + while process.poll() is None: + message = process.stdout.readline() + channel.send({'msg': message}) + channel.send({'msg': '结束执行任务'}) + task_log = TaskLog(task_id=task_id, status=process.returncode, cmd=command, type=0, + stdout=json.dumps(channel.msg)) + session.add(task_log) + session.commit() diff --git a/rpyc_scheduler/utils.py b/rpyc_scheduler/utils.py index 9c70f31..98bca43 100644 --- a/rpyc_scheduler/utils.py +++ b/rpyc_scheduler/utils.py @@ -5,7 +5,6 @@ class Channel: def __init__(self, redis_config, task_id): - logger.debug(f'__init__,redis:{redis_config},task id:{task_id}') self.conn = redis.Redis(**redis_config, decode_responses=True) self.task_id = task_id self.task_key = f"tasks:{self.task_id}" @@ -16,11 +15,11 @@ def msg(self, ): return self.conn.xrange(self.task_key, '-', '+') @property - def expire(self, ): + def expire(self, ) -> int: return self._expire @expire.setter - def expire(self, value): + def expire(self, value: int): self._expire = value def send(self, msg: Dict[Any, Any]): From 46d69e66770974a91399363328fffcbf9cbbff64 Mon Sep 17 00:00:00 2001 From: 4linuxfun Date: Fri, 24 Feb 2023 15:58:01 +0800 Subject: [PATCH 03/27] =?UTF-8?q?=E5=90=8E=E7=AB=AF=EF=BC=9A=20=E6=B7=BB?= =?UTF-8?q?=E5=8A=A0=E4=BB=BB=E5=8A=A1=E7=AE=A1=E7=90=86=E5=AF=B9=E5=BA=94?= =?UTF-8?q?=E6=8E=A5=E5=8F=A3=20=20fix=20#100?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/main.py | 2 + server/models/job.py | 23 ++++++ server/routers/job.py | 169 ++++++++++++++++++++++++++++++++++++++++++ server/settings.py | 3 + 4 files changed, 197 insertions(+) create mode 100644 server/models/job.py create mode 100644 server/routers/job.py diff --git a/server/main.py b/server/main.py index 6c46ca1..9f987c1 100644 --- a/server/main.py +++ b/server/main.py @@ -2,6 +2,7 @@ from loguru import logger from .common.log import init_logging from .routers.internal import login, user, menu, roles, dictonary +from .routers import job from .common.security import auth_check init_logging() @@ -13,6 +14,7 @@ app.include_router(menu.router, tags=['菜单管理']) app.include_router(roles.router, tags=['角色管理']) app.include_router(dictonary.router, tags=['数据字典']) +app.include_router(job.router,tags=['任务管理']) @app.on_event("startup") diff --git a/server/models/job.py b/server/models/job.py new file mode 100644 index 0000000..92dab11 --- /dev/null +++ b/server/models/job.py @@ -0,0 +1,23 @@ +from typing import Union, List, Tuple, Dict, Any, Optional +from sqlmodel import SQLModel +from datetime import datetime +from pydantic import BaseModel + + +class DataJobArgs(BaseModel): + run_date: Optional[datetime] + + +class CronJobArgs(BaseModel): + cron: str + start_date: Optional[str] + end_date: Optional[str] + + +class CronJobAdd(BaseModel): + job_id: str + trigger: str + date_args: Optional[DataJobArgs] + cron_args: Optional[CronJobArgs] + command: str + type: str diff --git a/server/routers/job.py b/server/routers/job.py new file mode 100644 index 0000000..35b34fa --- /dev/null +++ b/server/routers/job.py @@ -0,0 +1,169 @@ +import rpyc +import copy +import re +from typing import Union, List, Any, Dict +from sqlmodel import SQLModel +from apscheduler.job import Job +from apscheduler.triggers.cron import CronTrigger +from fastapi import APIRouter +from loguru import logger +from ..settings import settings +from ..models.job import CronJobAdd +from ..common.response_code import ApiResponse, SearchResponse +from ..schemas.internal.pagination import Pagination + +router = APIRouter(prefix='/api/jobs') + + +class JobSearch(SQLModel): + job_id: str + + +def cron_to_dict(cron): + cron_list = re.split(r'\s+', cron) + return {'minute': cron_list[0], 'hour': cron_list[1], 'day': cron_list[2], 'month': cron_list[3], + 'day_of_week': cron_list[4]} + + +@router.get('/pause/{job_id}', summary='暂停任务') +async def pause_job(job_id: str, ): + try: + conn = rpyc.connect(**settings.rpyc_config) + conn.root.pause_job(job_id) + except Exception as e: + logger.warning(e) + return ApiResponse( + code=500, + message='暂停任务出错,请联系管理员!' + ) + return ApiResponse() + + +@router.get('/resume/{job_id}', summary='恢复任务') +async def resume_job(job_id: str): + try: + conn = rpyc.connect(**settings.rpyc_config) + conn.root.resume_job(job_id) + except Exception as e: + logger.warning(e) + return ApiResponse( + code=500, + message='恢复任务出错,请联系管理员!' + ) + return ApiResponse() + + +@router.delete('/{job_id}', summary='删除任务', response_model=ApiResponse[str]) +async def delete_job(job_id: str): + try: + conn = rpyc.connect(**settings.rpyc_config) + conn.root.remove_job(job_id) + except Exception as e: + logger.warning(e) + return ApiResponse( + code=500, + message='删除任务出错,请联系管理员!' + ) + return ApiResponse() + + +@router.put('/', summary='修改任务') +async def modify_job(job: CronJobAdd): + cron_args = job.cron_args.dict() + cron_args.update(cron_to_dict(job.cron_args.cron)) + del cron_args['cron'] + logger.debug(cron_args) + try: + conn = rpyc.connect(**settings.rpyc_config) + trigger = CronTrigger(**cron_args) + conn.root.modify_job(job.job_id, trigger=trigger, kwargs={'task_id': job.job_id, 'command': job.command}) + except Exception as e: + logger.warning(e) + return ApiResponse( + code=500, + message=str(e) + ) + + +@router.post('/', summary='添加任务', response_model=ApiResponse[str]) +async def add_job(job: CronJobAdd): + logger.debug(job) + if job.trigger == 'cron': + trigger_args: Dict[str, Any] = job.cron_args.dict() + trigger_args.update(cron_to_dict(job.cron_args.cron)) + del trigger_args['cron'] + else: + trigger_args = job.date_args.dict() + try: + conn = rpyc.connect(**settings.rpyc_config) + job = conn.root.add_job('scheduler-server:subprocess_with_channel', trigger=job.trigger, + kwargs={'task_id': job.job_id, 'command': job.command}, id=job.job_id, **trigger_args) + except Exception as e: + logger.warning(e) + return ApiResponse( + code=500, + message=str(e) + ) + return ApiResponse( + data=job.id + ) + + +@router.post('/search', summary='获取所有任务', response_model=ApiResponse[SearchResponse[Any]]) +async def show_jobs(search: Pagination[JobSearch]): + job_id = search.search['job_id'] + try: + conn = rpyc.connect(**settings.rpyc_config) + if job_id is None: + all_jobs: List[Job] = conn.root.get_jobs() + else: + search_job = conn.root.get_job(job_id) + if search_job is None: + all_jobs: List[Job] = [] + else: + all_jobs: List[Job] = [search_job] + except Exception as e: + logger.warning(e) + return ApiResponse( + code=500, + message=str(e) + ) + if len(all_jobs) == 0: + return ApiResponse( + data={ + 'total': len(all_jobs), + 'data': [] + } + ) + job_info_list: List[Dict[str, Any]] = [] + start = (search.page - 1) * search.page_size + end = search.page * search.page_size - 1 + for job in all_jobs[start:end]: + cron: Dict[str, str] = {} + for field in job.trigger.fields: + cron[field.name] = str(field) + info = { + 'job_id': job.id, + 'next_run_time': job.next_run_time, + 'cron_args': { + 'cron': f"{cron['minute']} {cron['hour']} {cron['day']} {cron['month']} {cron['day_of_week']}", + 'start_date': None if job.trigger.start_date is None else job.trigger.start_date.strftime( + "%Y-%m-%d %H:%M:%S"), + 'end_date': None if job.trigger.end_date is None else job.trigger.end_date.strftime( + "%Y-%m-%d %H:%M:%S"), + }, + 'command': job.kwargs['command'], + 'status': 'running' if job.next_run_time is not None else 'stop' + } + job_info_list.append(info) + return ApiResponse( + data={ + 'total': len(all_jobs), + 'data': job_info_list + } + ) + + +@router.get('/logs') +async def job_logs(): + pass diff --git a/server/settings.py b/server/settings.py index c1b649d..23601f3 100644 --- a/server/settings.py +++ b/server/settings.py @@ -1,5 +1,6 @@ import casbin_sqlalchemy_adapter import casbin +import rpyc from typing import List from pathlib import Path from pydantic import BaseSettings @@ -20,6 +21,8 @@ class APISettings(BaseSettings): '/', '/api/login', ] + rpyc_config = {'host': 'localhost', 'port': 18861, 'config': {"allow_public_attrs": True, 'allow_pickle': True}, + 'keepalive': True} settings = APISettings() From 9532018d2a145e795368dec099e45154b989a04d Mon Sep 17 00:00:00 2001 From: 4linuxfun Date: Fri, 24 Feb 2023 15:59:23 +0800 Subject: [PATCH 04/27] =?UTF-8?q?=E5=89=8D=E7=AB=AF=EF=BC=9A=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E4=B8=AD=E5=BF=83-=E4=BB=BB=E5=8A=A1=E7=AE=A1?= =?UTF-8?q?=E7=90=86=EF=BC=9A=E5=8A=9F=E8=83=BD=E5=AE=9E=E7=8E=B0=20fix=20?= =?UTF-8?q?#100?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- www/src/api/jobs.js | 8 ++ www/src/views/jobs/JobManage/AddJob.vue | 110 ++++++++++++++++++++++ www/src/views/jobs/JobManage/index.vue | 120 ++++++++++++++++++++++++ 3 files changed, 238 insertions(+) create mode 100644 www/src/api/jobs.js create mode 100644 www/src/views/jobs/JobManage/AddJob.vue create mode 100644 www/src/views/jobs/JobManage/index.vue diff --git a/www/src/api/jobs.js b/www/src/api/jobs.js new file mode 100644 index 0000000..50cc85e --- /dev/null +++ b/www/src/api/jobs.js @@ -0,0 +1,8 @@ +import {GET, POST, PUT, DELETE} from '@/utils/request' + +export const PostNewCronJob = (dict) => POST('/api/jobs/', dict) +export const GetJobList = () => GET('/api/jobs/') +export const DelJob = (jobId) => DELETE('/api/jobs/' + jobId) +export const PutCronJob = (job) => PUT('/api/jobs/', job) +export const PauseJob = (jobId) => GET('/api/jobs/pause/' + jobId) +export const ResumeJob = (jobId) => GET('/api/jobs/resume/' + jobId) \ No newline at end of file diff --git a/www/src/views/jobs/JobManage/AddJob.vue b/www/src/views/jobs/JobManage/AddJob.vue new file mode 100644 index 0000000..765cd66 --- /dev/null +++ b/www/src/views/jobs/JobManage/AddJob.vue @@ -0,0 +1,110 @@ + + + + + + + \ No newline at end of file diff --git a/www/src/views/jobs/JobManage/index.vue b/www/src/views/jobs/JobManage/index.vue new file mode 100644 index 0000000..caef5e1 --- /dev/null +++ b/www/src/views/jobs/JobManage/index.vue @@ -0,0 +1,120 @@ + + + + + + + \ No newline at end of file From d4e6530d839812f82a74c1135f2cc343306af653 Mon Sep 17 00:00:00 2001 From: 4linuxfun Date: Fri, 24 Feb 2023 15:59:40 +0800 Subject: [PATCH 05/27] =?UTF-8?q?=E5=89=8D=E7=AB=AF=EF=BC=9A=E7=BB=84?= =?UTF-8?q?=E4=BB=B6=E5=B0=81=E8=A3=85?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- www/src/utils/request.js | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/www/src/utils/request.js b/www/src/utils/request.js index bbc5ac6..c8f8081 100644 --- a/www/src/utils/request.js +++ b/www/src/utils/request.js @@ -2,7 +2,8 @@ import axios from 'axios' // import { Notification, detailBox } from 'element-ui' import {useStore} from '../stores' import {getToken} from '@/utils/auth' -import {ElNotification} from 'element-plus' +import {ElMessage, ElMessageBox, ElNotification} from 'element-plus' +import {DelJob} from '@/api/jobs' // 创建axios实例 const service = axios.create({ @@ -148,4 +149,22 @@ export function DELETE(url, params) { }) } +export function ConfirmDel(txt,func,id) { + ElMessageBox.confirm(txt, '警告', {type: 'warning'}).then(() => { + func(id).then(() => { + ElMessage({ + title: 'success', + message: '删除成功', + type: 'success' + }) + }) + }).catch(() => { + ElMessage({ + title: 'success', + message: '取消删除操作', + type: 'warning' + }) + }) + } + export default service \ No newline at end of file From 6057efe9877f1fb3914a98c695b10613eeb947a7 Mon Sep 17 00:00:00 2001 From: 4linuxfun Date: Mon, 19 Jun 2023 16:48:37 +0800 Subject: [PATCH 06/27] =?UTF-8?q?=E5=90=8E=E7=AB=AF=EF=BC=9A=E9=80=9A?= =?UTF-8?q?=E8=BF=87rpyc=E5=AE=9E=E7=8E=B0apscheduler=E7=9A=84=E8=BF=9C?= =?UTF-8?q?=E7=A8=8B=E8=B0=83=E7=94=A8=EF=BC=8C=E8=BF=9B=E8=A1=8C=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E7=9A=84=E7=AE=A1=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- rpyc_scheduler/job.py | 38 ++++++++++ rpyc_scheduler/models.py | 11 --- rpyc_scheduler/scheduler-server.py | 25 ++++++- rpyc_scheduler/scheduler.py | 114 +++++++++++++++++++++++++++++ rpyc_scheduler/tasks.py | 27 ++++--- rpyc_scheduler/utils.py | 9 ++- 6 files changed, 197 insertions(+), 27 deletions(-) create mode 100644 rpyc_scheduler/job.py create mode 100644 rpyc_scheduler/scheduler.py diff --git a/rpyc_scheduler/job.py b/rpyc_scheduler/job.py new file mode 100644 index 0000000..ddb3f82 --- /dev/null +++ b/rpyc_scheduler/job.py @@ -0,0 +1,38 @@ +from apscheduler.jobstores.base import BaseJobStore, JobLookupError, ConflictingIdError +from apscheduler.util import maybe_ref, datetime_to_utc_timestamp, utc_timestamp_to_datetime +from apscheduler.job import Job +try: + import cPickle as pickle +except ImportError: # pragma: nocover + import pickle + +try: + from sqlalchemy import ( + create_engine, Table, Column, MetaData, Unicode, Float, LargeBinary, select, and_) + from sqlalchemy.exc import IntegrityError + from sqlalchemy.sql.expression import null +except ImportError: # pragma: nocover + raise ImportError('SQLAlchemyJobStore requires SQLAlchemy installed') + + +class MyJobStore(BaseJobStore): + """ + 参照apscheduler自带的SQLAlchemyJobStore,重写JobStore + 1. 添加额外的字段 + 2. 重写部分方法,实现Job的保持 + """ + + def __init__(self, url=None, engine=None, tablename='apscheduler_jobs', metadata=None, + pickle_protocol=pickle.HIGHEST_PROTOCOL, tableschema=None, engine_options=None): + super().__init__(url=None, engine=None, tablename='apscheduler_jobs', metadata=None, + pickle_protocol=pickle.HIGHEST_PROTOCOL, tableschema=None, engine_options=None) + self.jobs_t = Table( + tablename, metadata, + Column('id', Unicode(191), primary_key=True), + Column('next_run_time', Float(25), index=True), + Column('job_state', LargeBinary, nullable=False), + schema=tableschema + ) + + def start(self, scheduler, alias): + super().start(scheduler, alias) diff --git a/rpyc_scheduler/models.py b/rpyc_scheduler/models.py index ceb47b5..d1e0a11 100644 --- a/rpyc_scheduler/models.py +++ b/rpyc_scheduler/models.py @@ -5,17 +5,6 @@ from config import rpc_config -class TaskLog(SQLModel, table=True): - id: Optional[int] = Field(sa_column=Column('id', Integer, primary_key=True, autoincrement=True)) - task_id: str = Field(max_length=20, sa_column_kwargs={'comment': '任务名'}) - status: int = Field(default=0, sa_column=Column(mysql.TINYINT, comment='执行命令返回状态')) - exe_time: datetime = Field(default_factory=datetime.now, sa_column_kwargs={'comment': '任务执行时间'}) - cmd: str = Field(sa_column_kwargs={'comment': '执行命令'}) - type: int = Field(default=0, sa_column=Column(mysql.TINYINT, comment='类型:0单行命令,1:脚本文件')) - stdout: str = Field(sa_column=Column(mysql.MEDIUMTEXT, comment='执行日志')) - - engine = create_engine(rpc_config.apscheduler_job_store, pool_size=5, max_overflow=10, pool_timeout=30, pool_pre_ping=True) # SQLModel.metadata.create_all(engine) -session = Session(engine) diff --git a/rpyc_scheduler/scheduler-server.py b/rpyc_scheduler/scheduler-server.py index 4fab8ae..5ba9876 100644 --- a/rpyc_scheduler/scheduler-server.py +++ b/rpyc_scheduler/scheduler-server.py @@ -5,11 +5,11 @@ """ import rpyc -from tasks import subprocess_with_channel +from tasks import run_command_with_channel +from datetime import datetime from loguru import logger from config import rpc_config from rpyc.utils.server import ThreadedServer -from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor from apscheduler.events import ( @@ -20,6 +20,8 @@ EVENT_JOB_REMOVED ) +from scheduler import CustomScheduler + def print_text(*args, **kwargs): logger.debug(args) @@ -48,6 +50,21 @@ def exposed_get_jobs(self, jobstore=None): def exposed_get_job(self, job_id, jobstore=None): return scheduler.get_job(job_id, jobstore) + def exposed_switch_job(self, job_id, jobstore=None): + """ + 任务状态切换,暂停、启用 + """ + job = scheduler.get_job(job_id, jobstore) + if job.next_run_time is None: + now = datetime.now(job.trigger.timezone) + next_fire_time = job.trigger.get_next_fire_time(None, now) + if next_fire_time: + scheduler.resume_job(job_id, jobstore) + else: + raise ValueError('无法指定下次运行时间,请确认任务时间配置') + else: + scheduler.pause_job(job_id, jobstore) + def event_listener(event): if event.code == EVENT_JOB_ADDED: @@ -68,8 +85,8 @@ def event_listener(event): 'default': ThreadPoolExecutor(20), 'processpool': ProcessPoolExecutor(20) } - scheduler = BackgroundScheduler(jobstores=job_store, - excutors=apscheduler_excutors) + scheduler = CustomScheduler(jobstores=job_store, + excutors=apscheduler_excutors) scheduler.add_listener(event_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR | EVENT_JOB_ADDED | EVENT_JOB_REMOVED | EVENT_JOB_SUBMITTED) scheduler.start() diff --git a/rpyc_scheduler/scheduler.py b/rpyc_scheduler/scheduler.py new file mode 100644 index 0000000..8b69e6b --- /dev/null +++ b/rpyc_scheduler/scheduler.py @@ -0,0 +1,114 @@ +from __future__ import print_function + +from datetime import datetime, timedelta +import six + +from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.executors.base import MaxInstancesReachedError, BaseExecutor +from apscheduler.util import (timedelta_seconds, TIMEOUT_MAX) +from apscheduler.events import (JobSubmissionEvent, EVENT_JOB_SUBMITTED, EVENT_JOB_MAX_INSTANCES) + +try: + from collections.abc import MutableMapping +except ImportError: + from collections import MutableMapping + +STATE_STOPPED = 0 +#: constant indicating a scheduler's running state (started and processing jobs) +STATE_RUNNING = 1 +#: constant indicating a scheduler's paused state (started but not processing jobs) +STATE_PAUSED = 2 + + +class CustomScheduler(BackgroundScheduler): + def _process_jobs(self): + """ + 修改_process_jobs方法,把remove_job全部替换为pause_job,当任务执行完成后改成暂停状态 + """ + if self.state == STATE_PAUSED: + self._logger.debug('Scheduler is paused -- not processing jobs') + return None + + self._logger.debug('Looking for jobs to run') + now = datetime.now(self.timezone) + next_wakeup_time = None + events = [] + + with self._jobstores_lock: + for jobstore_alias, jobstore in six.iteritems(self._jobstores): + try: + due_jobs = jobstore.get_due_jobs(now) + except Exception as e: + # Schedule a wakeup at least in jobstore_retry_interval seconds + self._logger.warning('Error getting due jobs from job store %r: %s', + jobstore_alias, e) + retry_wakeup_time = now + timedelta(seconds=self.jobstore_retry_interval) + if not next_wakeup_time or next_wakeup_time > retry_wakeup_time: + next_wakeup_time = retry_wakeup_time + + continue + + for job in due_jobs: + # Look up the job's executor + try: + executor = self._lookup_executor(job.executor) + except BaseException: + self._logger.error( + 'Executor lookup ("%s") failed for job "%s" -- pause it from the ' + 'job store', job.executor, job) + self.pause_job(job.id, jobstore_alias) + continue + + run_times = job._get_run_times(now) + run_times = run_times[-1:] if run_times and job.coalesce else run_times + if run_times: + try: + executor.submit_job(job, run_times) + except MaxInstancesReachedError: + self._logger.warning( + 'Execution of job "%s" skipped: maximum number of running ' + 'instances reached (%d)', job, job.max_instances) + event = JobSubmissionEvent(EVENT_JOB_MAX_INSTANCES, job.id, + jobstore_alias, run_times) + events.append(event) + except BaseException: + self._logger.exception('Error submitting job "%s" to executor "%s"', + job, job.executor) + else: + event = JobSubmissionEvent(EVENT_JOB_SUBMITTED, job.id, jobstore_alias, + run_times) + events.append(event) + + # 如果存在next_run_time,则更新 + # 否则暂停此任务 + job_next_run = job.trigger.get_next_fire_time(run_times[-1], now) + if job_next_run: + job._modify(next_run_time=job_next_run) + jobstore.update_job(job) + else: + self.pause_job(job.id, jobstore_alias) + + # Set a new next wakeup time if there isn't one yet or + # the jobstore has an even earlier one + jobstore_next_run_time = jobstore.get_next_run_time() + if jobstore_next_run_time and (next_wakeup_time is None or + jobstore_next_run_time < next_wakeup_time): + next_wakeup_time = jobstore_next_run_time.astimezone(self.timezone) + + # Dispatch collected events + for event in events: + self._dispatch_event(event) + + # Determine the delay until this method should be called again + if self.state == STATE_PAUSED: + wait_seconds = None + self._logger.debug('Scheduler is paused; waiting until resume() is called') + elif next_wakeup_time is None: + wait_seconds = None + self._logger.debug('No jobs; waiting until a job is added') + else: + wait_seconds = min(max(timedelta_seconds(next_wakeup_time - now), 0), TIMEOUT_MAX) + self._logger.debug('Next wakeup is due at %s (in %f seconds)', next_wakeup_time, + wait_seconds) + + return wait_seconds diff --git a/rpyc_scheduler/tasks.py b/rpyc_scheduler/tasks.py index c991408..32f4e13 100644 --- a/rpyc_scheduler/tasks.py +++ b/rpyc_scheduler/tasks.py @@ -1,26 +1,35 @@ import subprocess import json +from datetime import datetime from loguru import logger +from sqlmodel import text from utils import Channel from config import rpc_config -from models import session, TaskLog +from models import engine -def subprocess_with_channel(task_id=None, command=None): +def run_command_with_channel(job_id=None, command=None): """ 本地执行命令,并把任务实时输入redis中。任务结束后,日志写入数据库中 + :param job_id:任务名 + :param command: 执行的命令 """ - with Channel(rpc_config.redis, task_id=task_id) as channel: - logger.debug(f"task is:{task_id}") + with Channel(rpc_config.redis, job_id=job_id) as channel: + logger.debug(f"job id:{job_id}") channel.send({'msg': '开始执行任务:'}) channel.send({'msg': f"执行命令:{command}"}) + start_time = datetime.now() process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, shell=True) - while process.poll() is None: + while (status := process.poll()) is None: message = process.stdout.readline() channel.send({'msg': message}) channel.send({'msg': '结束执行任务'}) - task_log = TaskLog(task_id=task_id, status=process.returncode, cmd=command, type=0, - stdout=json.dumps(channel.msg)) - session.add(task_log) - session.commit() + end_time = datetime.now() + with engine.connect() as conn: + sql = text( + "INSERT INTO job_log (status,job_id,start_time,end_time,log) values (:status,:job_id,:start_time,:end_time,:log)") + conn.execute(sql, + {'status': status, 'job_id': job_id, 'start_time': start_time, 'end_time': end_time, + 'log': json.dumps(channel.msg)}) + conn.commit() diff --git a/rpyc_scheduler/utils.py b/rpyc_scheduler/utils.py index 98bca43..c197311 100644 --- a/rpyc_scheduler/utils.py +++ b/rpyc_scheduler/utils.py @@ -4,10 +4,10 @@ class Channel: - def __init__(self, redis_config, task_id): + def __init__(self, redis_config, job_id): self.conn = redis.Redis(**redis_config, decode_responses=True) - self.task_id = task_id - self.task_key = f"tasks:{self.task_id}" + self.job_id = job_id + self.task_key = f"tasks:{self.job_id}" self._expire = 60 @property @@ -25,6 +25,9 @@ def expire(self, value: int): def send(self, msg: Dict[Any, Any]): self.conn.xadd(self.task_key, msg) + def delete(self, ): + self.conn.delete(self.task_key) + def __enter__(self): return self From 065e8f07800267a9876bdf075e2fb1acf26e5871 Mon Sep 17 00:00:00 2001 From: 4linuxfun Date: Mon, 19 Jun 2023 16:50:02 +0800 Subject: [PATCH 07/27] =?UTF-8?q?=E5=90=8E=E7=AB=AF=EF=BC=9A=E6=B7=BB?= =?UTF-8?q?=E5=8A=A0Job=E7=9B=B8=E5=85=B3model=E5=92=8Ccrud?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/crud/internal/__init__.py | 1 + server/crud/internal/job.py | 13 ++++++ server/models/internal/__init__.py | 1 + server/models/internal/job.py | 53 +++++++++++++++++++++++++ server/models/internal/relationships.py | 10 ++++- server/models/internal/user.py | 4 +- server/schemas/job.py | 23 +++++++++++ 7 files changed, 103 insertions(+), 2 deletions(-) create mode 100644 server/crud/internal/job.py create mode 100644 server/models/internal/job.py create mode 100644 server/schemas/job.py diff --git a/server/crud/internal/__init__.py b/server/crud/internal/__init__.py index 51bdf03..56f1406 100644 --- a/server/crud/internal/__init__.py +++ b/server/crud/internal/__init__.py @@ -2,3 +2,4 @@ from .roles import role from .menu import menu from .dictonary import data_dict, dict_item +from .job import job_log diff --git a/server/crud/internal/job.py b/server/crud/internal/job.py new file mode 100644 index 0000000..67c8254 --- /dev/null +++ b/server/crud/internal/job.py @@ -0,0 +1,13 @@ +from typing import Union +from loguru import logger +from sqlmodel import select, Session +from ...models.internal.job import JobLog +from ..base import CRUDBase +from .roles import role + + +class CRUDJobLog(CRUDBase[JobLog]): + pass + + +job_log = CRUDJobLog(JobLog) diff --git a/server/models/internal/__init__.py b/server/models/internal/__init__.py index 95cd37a..b138003 100644 --- a/server/models/internal/__init__.py +++ b/server/models/internal/__init__.py @@ -2,3 +2,4 @@ from .menu import Menu from .role import Role, RoleMenu from .dictonary import DataDict, DictItem +from .job import Job, JobLog diff --git a/server/models/internal/job.py b/server/models/internal/job.py new file mode 100644 index 0000000..15a417d --- /dev/null +++ b/server/models/internal/job.py @@ -0,0 +1,53 @@ +from enum import Enum +from typing import Union, List, Tuple, Dict, Any, Optional +from sqlmodel import SQLModel, Field, Column, Relationship, Integer, Unicode, LargeBinary, JSON +from sqlalchemy.dialects import mysql +from datetime import datetime +from pydantic import BaseModel +from .relationships import UserJob + + +class CronJobArgs(BaseModel): + cron: str + start_date: Optional[str] + end_date: Optional[str] + + +class TriggerEnum(str, Enum): + date = 'date' + cron = 'cron' + + +class JobAdd(BaseModel): + id: Optional[str] = None + name: str + trigger: TriggerEnum + trigger_args: Union[CronJobArgs, str] = None + command: str + type: str + + +class Job(SQLModel, table=True): + """ + 此表同步apschedule中的建表语句,如果没有,则apscheduler会自动创建对应表 + """ + __tablename__ = 'apscheduler_jobs' + id: str = Field(sa_column=Column('id', Unicode(191), primary_key=True)) + next_run_time: float = Field(sa_column=Column('next_run_time', mysql.DOUBLE, index=True)) + job_state: bytes = Field(sa_column=Column('job_state', LargeBinary, nullable=False)) + logs: List["JobLog"] = Relationship(back_populates="job") + user: "User" = Relationship(back_populates="jobs", link_model=UserJob) + + +class JobLog(SQLModel, table=True): + __tablename__ = 'job_log' + id: Optional[int] = Field(sa_column=Column('id', Integer, primary_key=True, autoincrement=True)) + status: int = Field(default=0, sa_column=Column(mysql.TINYINT, comment='执行命令返回状态')) + start_time: datetime = Field(default_factory=datetime.now, sa_column_kwargs={'comment': '任务开始时间'}) + end_time: datetime = Field(default=datetime.now, sa_column_kwargs={'comment': '任务结束时间'}) + log: JSON = Field(sa_column=Column(JSON, comment='执行日志')) + job_id: Optional[str] = Field(default=None, foreign_key="apscheduler_jobs.id") + job: Optional[Job] = Relationship(back_populates="logs") + + class Config: + arbitrary_types_allowed = True diff --git a/server/models/internal/relationships.py b/server/models/internal/relationships.py index 1a4188f..37fcf55 100644 --- a/server/models/internal/relationships.py +++ b/server/models/internal/relationships.py @@ -1,4 +1,4 @@ -from sqlmodel import SQLModel, Field +from sqlmodel import SQLModel, Field, Unicode # 这些是权限验证的基础表,单独放置 @@ -14,3 +14,11 @@ class UserRole(SQLModel, table=True): user_id: int = Field(foreign_key="user.id", primary_key=True) role_id: int = Field(foreign_key="roles.id", primary_key=True) + +class UserJob(SQLModel, table=True): + """ + 通过中间表实现:用户-任务的对应关系 + """ + __tablename__ = 'user_job' + user_id: int = Field(foreign_key="user.id", primary_key=True, nullable=False) + job_id: str = Field(Unicode(191), foreign_key="apscheduler_jobs.id", primary_key=True, nullable=False) diff --git a/server/models/internal/user.py b/server/models/internal/user.py index 0292a31..f192ccc 100644 --- a/server/models/internal/user.py +++ b/server/models/internal/user.py @@ -1,11 +1,12 @@ from typing import Optional, List, Literal, TYPE_CHECKING from pydantic import BaseModel from sqlmodel import SQLModel, Field, Relationship, Column, Integer, Boolean -from .relationships import UserRole +from .relationships import UserRole, UserJob from .role import Role if TYPE_CHECKING: from .role import Role + from .job import Job class UserWithOutPasswd(SQLModel): @@ -38,6 +39,7 @@ class UserRoles(SQLModel): class User(UserBase, table=True): id: Optional[int] = Field(sa_column=Column('id', Integer, primary_key=True, autoincrement=True)) roles: List['Role'] = Relationship(back_populates="users", link_model=UserRole) + jobs: List['Job'] = Relationship(back_populates="user", link_model=UserJob) class UserCreateWithRoles(SQLModel): diff --git a/server/schemas/job.py b/server/schemas/job.py new file mode 100644 index 0000000..ba25b20 --- /dev/null +++ b/server/schemas/job.py @@ -0,0 +1,23 @@ +from typing import Optional +from sqlmodel import SQLModel +from datetime import datetime + + +class JobSearch(SQLModel): + job_id: str + + +class JobLogs(SQLModel): + id: int + job_id: str + cmd: str + trigger: int + exe_time: datetime + stdout: Optional[str] + status: int + type: int + + +class JobLogSearch(SQLModel): + job_id: Optional[str] + cmd: Optional[str] From d86d09899d5370ea66807b074754dd21719d32dd Mon Sep 17 00:00:00 2001 From: 4linuxfun Date: Tue, 20 Jun 2023 08:46:19 +0800 Subject: [PATCH 08/27] =?UTF-8?q?=E5=90=8E=E7=AB=AF=EF=BC=9A=E6=B7=BB?= =?UTF-8?q?=E5=8A=A0get=5Fuid=E4=BE=9D=E8=B5=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/common/dep.py | 8 ++++++++ 1 file changed, 8 insertions(+) create mode 100644 server/common/dep.py diff --git a/server/common/dep.py b/server/common/dep.py new file mode 100644 index 0000000..dd3ba05 --- /dev/null +++ b/server/common/dep.py @@ -0,0 +1,8 @@ +from fastapi import Request + + +async def get_uid(request: Request) -> int: + """ + 从request头部中获取uid信息 + """ + return request.state.uid From b9c9cafadeed828a1e2bf2e3dc53c4825ce318db Mon Sep 17 00:00:00 2001 From: 4linuxfun Date: Tue, 20 Jun 2023 08:47:13 +0800 Subject: [PATCH 09/27] =?UTF-8?q?=E5=90=8E=E7=AB=AF=EF=BC=9A=E4=BF=AE?= =?UTF-8?q?=E6=94=B9=E4=BD=BF=E7=94=A8get=5Fuid=E8=8E=B7=E5=8F=96uid?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/routers/internal/login.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/routers/internal/login.py b/server/routers/internal/login.py index 84dccd0..3e3e434 100644 --- a/server/routers/internal/login.py +++ b/server/routers/internal/login.py @@ -7,6 +7,7 @@ from ...common.response_code import ApiResponse from ...common.security import create_access_token from ...models.internal import User, Menu +from ...common.dep import get_uid from ...models.internal.user import UserLogin, LoginResponse from ... import crud from ...common.utils import menu_convert @@ -44,7 +45,7 @@ async def login(login_form: UserLogin, session: Session = Depends(get_session)): @router.get('/permission', summary='获取权限') -async def get_permission(request: Request, session: Session = Depends(get_session)): +async def get_permission(uid:int=Depends(get_uid), session: Session = Depends(get_session)): """ 用户权限请求,返回拥有权限的菜单列表,前端根据返回的菜单列表信息,合成菜单项 :param request: @@ -52,7 +53,6 @@ async def get_permission(request: Request, session: Session = Depends(get_sessio :param token: :return: """ - uid: int = request.state.uid logger.debug(f"uid is:{uid}") user: User = crud.internal.user.get(session, uid) logger.debug(user.roles) From ce6ee7c7eb96d82ad4b01bfbe1de63560ef90e62 Mon Sep 17 00:00:00 2001 From: 4linuxfun Date: Tue, 20 Jun 2023 08:48:13 +0800 Subject: [PATCH 10/27] =?UTF-8?q?=E5=90=8E=E7=AB=AF=EF=BC=9A=E5=A2=9E?= =?UTF-8?q?=E5=8A=A0get=5Fredis=E5=87=BD=E6=95=B0=EF=BC=8C=E7=94=A8?= =?UTF-8?q?=E6=88=B7=E8=8E=B7=E5=8F=96redis=E8=BF=9E=E6=8E=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/common/database.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/server/common/database.py b/server/common/database.py index 0aefca4..90b4865 100644 --- a/server/common/database.py +++ b/server/common/database.py @@ -1,3 +1,4 @@ +import redis.asyncio as redis from sqlmodel import create_engine, SQLModel, Session, select from ..settings import engine @@ -32,3 +33,11 @@ def get_or_create(session: Session, model, **kwargs): session.add(instance) session.commit() return instance + + +async def get_redis(): + redis_conn = redis.Redis(**settings.redis_config) + try: + yield redis_conn + finally: + await redis_conn.close() From 39ca9f2447b24393007380208a6264a230d2f98b Mon Sep 17 00:00:00 2001 From: 4linuxfun Date: Tue, 20 Jun 2023 08:48:44 +0800 Subject: [PATCH 11/27] =?UTF-8?q?=E5=90=8E=E7=AB=AF=EF=BC=9A=E5=AE=9E?= =?UTF-8?q?=E7=8E=B0ws=E8=8E=B7=E5=8F=96redis=E4=B8=AD=E7=9A=84=E6=97=A5?= =?UTF-8?q?=E5=BF=97=E4=BF=A1=E6=81=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/common/utils.py | 39 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/server/common/utils.py b/server/common/utils.py index 5c2e890..9150880 100644 --- a/server/common/utils.py +++ b/server/common/utils.py @@ -1,8 +1,11 @@ import os +import websockets +from fastapi import WebSocket from typing import List, Generic, Type, TypeVar from loguru import logger from sqlmodel import SQLModel from ..models.internal.menu import MenusWithChild +from ..crud.internal import job_log T = TypeVar('T', bound=SQLModel) @@ -64,3 +67,39 @@ def update_model(old_model, new_model): def remove_tmp_file(file): logger.debug(f'删除临时文件{file}') os.remove(file) + + +async def get_task_logs(ws: WebSocket, redis, session, task_id: str, trigger: str): + key_name = f'tasks:{task_id}' + last_id = 0 + sleep_ms = 5000 + if trigger == 'date': + job_log.get(session,) + while True: + if await redis.exists(key_name): + resp = await redis.xread({f'{key_name}': last_id}, count=1, block=sleep_ms) + if resp: + key, message = resp[0] + last_id = message[0][0] + # last_id, data = message[0] + # data_dict = {k.decode("utf-8"): data[k].decode("utf-8") for k in data} + try: + await ws.send_json({'task_id': task_id, 'data': message}) + except websockets.exceptions.ConnectionClosed as e: + logger.warning(f"websocket 异常关闭:{e}") + break + elif last_id == 0 and (result.state == 'FAILURE' or result.state == 'SUCCESS'): + # last_id为0表示redis中不存在key,task_id为已执行完成的任务,且redis缓存已过期 + logger.debug(f'{task_id} state:{result.state}') + try: + await ws.send_json({'task_id': task_id, 'data': AsyncResult(task_id).result}) + except websockets.exceptions.ConnectionClosed as e: + logger.warning(f"websocket 异常关闭:{e}") + break + elif result.state == 'PENDING' or result.state == 'STARTED': + logger.debug(f"{task_id} state:{result.state}") + continue + else: + logger.debug(f'{task_id}已结束') + break + return True From b0b99bf24107b94656ef3617f67f829375b24609 Mon Sep 17 00:00:00 2001 From: 4linuxfun Date: Tue, 20 Jun 2023 08:51:25 +0800 Subject: [PATCH 12/27] =?UTF-8?q?=E5=90=8E=E7=AB=AF=EF=BC=9A=E4=BF=AE?= =?UTF-8?q?=E6=94=B9job=E9=A1=B9=E7=9B=AE=E7=9B=AE=E5=BD=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/models/job.py | 23 ------ server/routers/form.py | 21 ----- server/routers/job.py | 169 ----------------------------------------- 3 files changed, 213 deletions(-) delete mode 100644 server/models/job.py delete mode 100644 server/routers/form.py delete mode 100644 server/routers/job.py diff --git a/server/models/job.py b/server/models/job.py deleted file mode 100644 index 92dab11..0000000 --- a/server/models/job.py +++ /dev/null @@ -1,23 +0,0 @@ -from typing import Union, List, Tuple, Dict, Any, Optional -from sqlmodel import SQLModel -from datetime import datetime -from pydantic import BaseModel - - -class DataJobArgs(BaseModel): - run_date: Optional[datetime] - - -class CronJobArgs(BaseModel): - cron: str - start_date: Optional[str] - end_date: Optional[str] - - -class CronJobAdd(BaseModel): - job_id: str - trigger: str - date_args: Optional[DataJobArgs] - cron_args: Optional[CronJobArgs] - command: str - type: str diff --git a/server/routers/form.py b/server/routers/form.py deleted file mode 100644 index 60d8df9..0000000 --- a/server/routers/form.py +++ /dev/null @@ -1,21 +0,0 @@ -from fastapi import APIRouter, Request -from ..schemas import ApiResponse -from ..models import User - -router = APIRouter(prefix='/api') - - -# class UserForm(User): -# roles: List[Role] - - -@router.get('/form/{name}') -async def get_form_model(name: str, request: Request): - print(name) - print(request.app.openapi()) - print(request.app.openapi()['components']['schemas'][name]['properties']) - return ApiResponse( - code=0, - message='success', - data=User.schema_json() - ) diff --git a/server/routers/job.py b/server/routers/job.py deleted file mode 100644 index 35b34fa..0000000 --- a/server/routers/job.py +++ /dev/null @@ -1,169 +0,0 @@ -import rpyc -import copy -import re -from typing import Union, List, Any, Dict -from sqlmodel import SQLModel -from apscheduler.job import Job -from apscheduler.triggers.cron import CronTrigger -from fastapi import APIRouter -from loguru import logger -from ..settings import settings -from ..models.job import CronJobAdd -from ..common.response_code import ApiResponse, SearchResponse -from ..schemas.internal.pagination import Pagination - -router = APIRouter(prefix='/api/jobs') - - -class JobSearch(SQLModel): - job_id: str - - -def cron_to_dict(cron): - cron_list = re.split(r'\s+', cron) - return {'minute': cron_list[0], 'hour': cron_list[1], 'day': cron_list[2], 'month': cron_list[3], - 'day_of_week': cron_list[4]} - - -@router.get('/pause/{job_id}', summary='暂停任务') -async def pause_job(job_id: str, ): - try: - conn = rpyc.connect(**settings.rpyc_config) - conn.root.pause_job(job_id) - except Exception as e: - logger.warning(e) - return ApiResponse( - code=500, - message='暂停任务出错,请联系管理员!' - ) - return ApiResponse() - - -@router.get('/resume/{job_id}', summary='恢复任务') -async def resume_job(job_id: str): - try: - conn = rpyc.connect(**settings.rpyc_config) - conn.root.resume_job(job_id) - except Exception as e: - logger.warning(e) - return ApiResponse( - code=500, - message='恢复任务出错,请联系管理员!' - ) - return ApiResponse() - - -@router.delete('/{job_id}', summary='删除任务', response_model=ApiResponse[str]) -async def delete_job(job_id: str): - try: - conn = rpyc.connect(**settings.rpyc_config) - conn.root.remove_job(job_id) - except Exception as e: - logger.warning(e) - return ApiResponse( - code=500, - message='删除任务出错,请联系管理员!' - ) - return ApiResponse() - - -@router.put('/', summary='修改任务') -async def modify_job(job: CronJobAdd): - cron_args = job.cron_args.dict() - cron_args.update(cron_to_dict(job.cron_args.cron)) - del cron_args['cron'] - logger.debug(cron_args) - try: - conn = rpyc.connect(**settings.rpyc_config) - trigger = CronTrigger(**cron_args) - conn.root.modify_job(job.job_id, trigger=trigger, kwargs={'task_id': job.job_id, 'command': job.command}) - except Exception as e: - logger.warning(e) - return ApiResponse( - code=500, - message=str(e) - ) - - -@router.post('/', summary='添加任务', response_model=ApiResponse[str]) -async def add_job(job: CronJobAdd): - logger.debug(job) - if job.trigger == 'cron': - trigger_args: Dict[str, Any] = job.cron_args.dict() - trigger_args.update(cron_to_dict(job.cron_args.cron)) - del trigger_args['cron'] - else: - trigger_args = job.date_args.dict() - try: - conn = rpyc.connect(**settings.rpyc_config) - job = conn.root.add_job('scheduler-server:subprocess_with_channel', trigger=job.trigger, - kwargs={'task_id': job.job_id, 'command': job.command}, id=job.job_id, **trigger_args) - except Exception as e: - logger.warning(e) - return ApiResponse( - code=500, - message=str(e) - ) - return ApiResponse( - data=job.id - ) - - -@router.post('/search', summary='获取所有任务', response_model=ApiResponse[SearchResponse[Any]]) -async def show_jobs(search: Pagination[JobSearch]): - job_id = search.search['job_id'] - try: - conn = rpyc.connect(**settings.rpyc_config) - if job_id is None: - all_jobs: List[Job] = conn.root.get_jobs() - else: - search_job = conn.root.get_job(job_id) - if search_job is None: - all_jobs: List[Job] = [] - else: - all_jobs: List[Job] = [search_job] - except Exception as e: - logger.warning(e) - return ApiResponse( - code=500, - message=str(e) - ) - if len(all_jobs) == 0: - return ApiResponse( - data={ - 'total': len(all_jobs), - 'data': [] - } - ) - job_info_list: List[Dict[str, Any]] = [] - start = (search.page - 1) * search.page_size - end = search.page * search.page_size - 1 - for job in all_jobs[start:end]: - cron: Dict[str, str] = {} - for field in job.trigger.fields: - cron[field.name] = str(field) - info = { - 'job_id': job.id, - 'next_run_time': job.next_run_time, - 'cron_args': { - 'cron': f"{cron['minute']} {cron['hour']} {cron['day']} {cron['month']} {cron['day_of_week']}", - 'start_date': None if job.trigger.start_date is None else job.trigger.start_date.strftime( - "%Y-%m-%d %H:%M:%S"), - 'end_date': None if job.trigger.end_date is None else job.trigger.end_date.strftime( - "%Y-%m-%d %H:%M:%S"), - }, - 'command': job.kwargs['command'], - 'status': 'running' if job.next_run_time is not None else 'stop' - } - job_info_list.append(info) - return ApiResponse( - data={ - 'total': len(all_jobs), - 'data': job_info_list - } - ) - - -@router.get('/logs') -async def job_logs(): - pass From c297407d4715375611071e636030d4f4029c2dbe Mon Sep 17 00:00:00 2001 From: 4linuxfun Date: Tue, 20 Jun 2023 08:52:15 +0800 Subject: [PATCH 13/27] =?UTF-8?q?=E5=90=8E=E7=AB=AF=EF=BC=9A=E6=B7=BB?= =?UTF-8?q?=E5=8A=A0=E4=BB=BB=E5=8A=A1=E7=AE=A1=E7=90=86=E7=9B=B8=E5=85=B3?= =?UTF-8?q?=E5=80=9F=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/main.py | 5 +- server/routers/internal/job.py | 248 +++++++++++++++++++++++++++++++++ 2 files changed, 250 insertions(+), 3 deletions(-) create mode 100644 server/routers/internal/job.py diff --git a/server/main.py b/server/main.py index 9f987c1..420e821 100644 --- a/server/main.py +++ b/server/main.py @@ -1,8 +1,7 @@ from fastapi import FastAPI, Depends from loguru import logger from .common.log import init_logging -from .routers.internal import login, user, menu, roles, dictonary -from .routers import job +from .routers.internal import login, user, menu, roles, dictonary, job from .common.security import auth_check init_logging() @@ -14,7 +13,7 @@ app.include_router(menu.router, tags=['菜单管理']) app.include_router(roles.router, tags=['角色管理']) app.include_router(dictonary.router, tags=['数据字典']) -app.include_router(job.router,tags=['任务管理']) +app.include_router(job.router, tags=['任务管理']) @app.on_event("startup") diff --git a/server/routers/internal/job.py b/server/routers/internal/job.py new file mode 100644 index 0000000..5441da7 --- /dev/null +++ b/server/routers/internal/job.py @@ -0,0 +1,248 @@ +import rpyc +import re +import anyio +from datetime import datetime +from uuid import uuid4 +from typing import List, Any, Dict +from sqlmodel import Session, text +from apscheduler.job import Job +from apscheduler.triggers.interval import IntervalTrigger +from apscheduler.triggers.date import DateTrigger +from apscheduler.triggers.cron import CronTrigger +from fastapi import APIRouter, Depends, WebSocket, Request +from loguru import logger +from server.settings import settings +from server.models.internal.job import JobAdd, JobLog +from server.common.database import get_session, get_redis +from server.common.response_code import ApiResponse, SearchResponse +from server.common.utils import get_task_logs +from server.schemas.internal.pagination import Pagination +from server.schemas.job import JobSearch, JobLogs, JobLogSearch +from server import crud +from server.common.dep import get_uid + +router = APIRouter(prefix='/api/jobs') + + +def cron_to_dict(cron): + cron_list = re.split(r'\s+', cron) + return {'minute': cron_list[0], 'hour': cron_list[1], 'day': cron_list[2], 'month': cron_list[3], + 'day_of_week': cron_list[4]} + + +@router.get('/switch/{job_id}', summary='切换任务状态') +async def switch_job(job_id: str, ): + try: + conn = rpyc.connect(**settings.rpyc_config) + conn.root.switch_job(job_id) + except ValueError as e: + logger.warning(e) + return ApiResponse( + code=500, + message=str(e) + ) + return ApiResponse() + + +@router.get('/resume/{job_id}', summary='恢复任务') +async def resume_job(job_id: str): + try: + conn = rpyc.connect(**settings.rpyc_config) + conn.root.resume_job(job_id) + except Exception as e: + logger.warning(e) + return ApiResponse( + code=500, + message='恢复任务出错,请联系管理员!' + ) + return ApiResponse() + + +@router.delete('/{job_id}', summary='删除任务', response_model=ApiResponse[str]) +async def delete_job(job_id: str, uid: int = Depends(get_uid), session: Session = Depends(get_session)): + sql = text("select job_id from user_job where user_id=:uid") + user_jobs = tuple([job[0] for job in session.execute(sql, {'uid': uid})]) + logger.debug(user_jobs) + if job_id not in user_jobs: + return ApiResponse( + code=500, + message='用户无权限操作此任务!' + ) + try: + conn = rpyc.connect(**settings.rpyc_config) + conn.root.pause_job(job_id) + delete_user_job = text("delete from user_job where job_id=:job_id") + session.execute(delete_user_job, {'job_id': job_id}) + delete_job_logs = text("delete from job_log where job_id=:job_id") + session.execute(delete_job_logs, {'job_id': job_id}) + session.commit() + conn.root.remove_job(job_id) + except Exception as e: + logger.warning(e) + return ApiResponse( + code=500, + message='删除任务出错,请联系管理员!' + ) + return ApiResponse() + + +@router.put('/', summary='修改任务', response_model=ApiResponse[str]) +async def modify_job(job: JobAdd, session: Session = Depends(get_session)): + logger.debug(job) + if job.trigger == 'cron': + trigger_args = job.trigger_args.dict() + trigger_args.update(cron_to_dict(job.trigger_args.cron)) + del trigger_args['cron'] + trigger = CronTrigger(**trigger_args) + elif job.trigger == 'date': + trigger = DateTrigger(run_date=job.trigger_args, timezone=job.trigger.timezone) + try: + conn = rpyc.connect(**settings.rpyc_config) + conn.root.modify_job(job.id, kwargs={'job_id': job.id, 'command': job.command}, + name=job.name, trigger=trigger) + except Exception as e: + logger.warning(e) + return ApiResponse( + code=500, + message=str(e) + ) + return ApiResponse() + + +@router.post('/', summary='添加任务', response_model=ApiResponse[str]) +async def add_job(job: JobAdd, uid: int = Depends(get_uid), session: Session = Depends(get_session)): + logger.debug(job) + # 手动生成job_id,需要传递到内部 + job_id = uuid4().hex + # 只支持cron的date任务,interval间隔任务完全可以用cron替代,没必要单独实现功能 + if job.trigger == 'cron': + trigger_args: Dict[str, Any] = job.trigger_args.dict() + trigger_args.update(cron_to_dict(job.trigger_args.cron)) + del trigger_args['cron'] + elif job.trigger == 'date': + trigger_args = {'run_date': job.trigger_args} + try: + conn = rpyc.connect(**settings.rpyc_config) + job = conn.root.add_job('scheduler-server:run_command_with_channel', trigger=job.trigger.value, + kwargs={'job_id': job_id, + 'command': job.command}, id=job_id, name=job.name, **trigger_args) + sql = text("INSERT INTO user_job values (:uid,:job_id)") + session.execute(sql, {'uid': uid, "job_id": job_id}) + session.commit() + except Exception as e: + logger.warning(e) + return ApiResponse( + code=500, + message=str(e) + ) + return ApiResponse( + data=job.id + ) + + +@router.post('/search', summary='获取所有任务', response_model=ApiResponse[SearchResponse[Any]]) +async def show_jobs(search: Pagination[JobSearch], uid: int = Depends(get_uid), + session: Session = Depends(get_session)): + job_name = search.search['job_name'] + sql = text("select job_id from user_job where user_id=:uid ") + results = session.execute(sql, {'uid': uid}) + user_job_ids: List[str] = [] + for job_id in results.fetchall(): + user_job_ids.append(job_id[0]) + logger.debug(f'uid:{uid},jobs:{user_job_ids}') + try: + conn = rpyc.connect(**settings.rpyc_config) + if job_name is None: + all_jobs: List[Job] = conn.root.get_jobs() + else: + search_job = conn.root.get_job(job_name) + if search_job is None: + all_jobs: List[Job] = [] + else: + all_jobs: List[Job] = [search_job] + except Exception as e: + logger.warning(e) + return ApiResponse( + code=500, + message=str(e) + ) + if len(all_jobs) == 0: + return ApiResponse( + data={ + 'total': len(all_jobs), + 'data': [] + } + ) + + user_jobs = [job for job in all_jobs if job.id in user_job_ids] + logger.debug(user_jobs) + + job_info_list: List[Dict[str, Any]] = [] + start = (search.page - 1) * search.page_size + end = search.page * search.page_size - 1 + for job in user_jobs[start:end]: + logger.debug(job.trigger) + trigger_args: Dict[str, str] = {} + info = {} + if isinstance(job.trigger, CronTrigger): + logger.debug('cron') + for field in job.trigger.fields: + trigger_args[field.name] = str(field) + info.update({ + 'id': job.id, + 'name': job.name, + 'trigger': 'cron', + 'trigger_args': { + 'cron': f"{trigger_args['minute']} {trigger_args['hour']} {trigger_args['day']} {trigger_args['month']} {trigger_args['day_of_week']}", + 'start_date': None if job.trigger.start_date is None else job.trigger.start_date.strftime( + "%Y-%m-%d %H:%M:%S"), + 'end_date': None if job.trigger.end_date is None else job.trigger.end_date.strftime( + "%Y-%m-%d %H:%M:%S"), + }, + 'command': job.kwargs['command'], + 'status': 'running' if job.next_run_time is not None else 'stop' + }) + elif isinstance(job.trigger, DateTrigger): + info.update({ + 'id': job.id, + 'name': job.name, + 'trigger': 'date', + 'trigger_args': job.trigger.run_date.strftime( + "%Y-%m-%d %H:%M:%S"), + 'command': job.kwargs['command'], + 'status': 'running' if job.next_run_time is not None else 'stop' + }) + logger.debug(info) + job_info_list.append(info) + logger.debug(job_info_list) + return ApiResponse( + data={ + 'total': len(user_jobs), + 'data': job_info_list + } + ) + + +@router.post('/logs', summary='任务日志查询', response_model=ApiResponse[SearchResponse[JobLogs]]) +async def job_logs(page_search: Pagination[JobLogSearch], session: Session = Depends(get_session)): + filter_type = JobLogSearch(job_id='like', cmd='like') + total = crud.internal.job_log.search_total(session, page_search.search, filter_type.dict()) + jobs = crud.internal.job_log.search(session, page_search, filter_type.dict()) + logger.debug(jobs) + return ApiResponse( + data={ + 'total': total, + 'data': jobs + } + ) + + +@router.websocket('/logs/ws/') +async def websocket_endpoint(id: int, job_id: str, trigger: str, websocket: WebSocket, + session: Session = Depends(get_session), + redis=Depends(get_redis)): + await websocket.accept() + async with anyio.create_task_group() as tg: + tg.start_soon(get_task_logs, websocket, redis, session, task_id, trigger) + logger.debug('close websocket') + await websocket.close() From 9980ee6686f81feed5717fea4321a8842174bb10 Mon Sep 17 00:00:00 2001 From: 4linuxfun Date: Tue, 20 Jun 2023 08:53:43 +0800 Subject: [PATCH 14/27] =?UTF-8?q?=E5=89=8D=E7=AB=AF=EF=BC=9A=E5=AE=9E?= =?UTF-8?q?=E7=8E=B0=E4=BB=BB=E5=8A=A1=E7=AE=A1=E7=90=86=E7=9B=B8=E5=85=B3?= =?UTF-8?q?=E5=8A=9F=E8=83=BD=E9=A1=B5=E9=9D=A2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- www/src/api/jobs.js | 4 +- www/src/views/jobs/JobManage/AddJob.vue | 152 ++++++++++++++++-------- www/src/views/jobs/JobManage/index.vue | 75 ++++++------ 3 files changed, 143 insertions(+), 88 deletions(-) diff --git a/www/src/api/jobs.js b/www/src/api/jobs.js index 50cc85e..0d8fc00 100644 --- a/www/src/api/jobs.js +++ b/www/src/api/jobs.js @@ -4,5 +4,5 @@ export const PostNewCronJob = (dict) => POST('/api/jobs/', dict) export const GetJobList = () => GET('/api/jobs/') export const DelJob = (jobId) => DELETE('/api/jobs/' + jobId) export const PutCronJob = (job) => PUT('/api/jobs/', job) -export const PauseJob = (jobId) => GET('/api/jobs/pause/' + jobId) -export const ResumeJob = (jobId) => GET('/api/jobs/resume/' + jobId) \ No newline at end of file +export const SwitchJob = (jobId) => GET('/api/jobs/switch/' + jobId) +export const GetLogs = (jobId) => GET('/api/jobs/logs') \ No newline at end of file diff --git a/www/src/views/jobs/JobManage/AddJob.vue b/www/src/views/jobs/JobManage/AddJob.vue index 765cd66..d625f8f 100644 --- a/www/src/views/jobs/JobManage/AddJob.vue +++ b/www/src/views/jobs/JobManage/AddJob.vue @@ -1,42 +1,76 @@ diff --git a/www/src/views/jobs/JobManage/index.vue b/www/src/views/jobs/JobManage/index.vue index caef5e1..314a99d 100644 --- a/www/src/views/jobs/JobManage/index.vue +++ b/www/src/views/jobs/JobManage/index.vue @@ -2,7 +2,7 @@ - + 查询 @@ -12,45 +12,43 @@ - - - - - - - - - + + + - + + - + - + - + + + + + + + \ No newline at end of file diff --git a/www/src/views/jobs/JobManage/LogDetail.vue b/www/src/views/jobs/JobManage/LogDetail.vue new file mode 100644 index 0000000..a94e1f3 --- /dev/null +++ b/www/src/views/jobs/JobManage/LogDetail.vue @@ -0,0 +1,102 @@ + + + + + \ No newline at end of file From 7fd5c91fcdd23a8cb62bd525d5d1c2a88af263fd Mon Sep 17 00:00:00 2001 From: 4linuxfun Date: Thu, 21 Dec 2023 16:50:16 +0800 Subject: [PATCH 25/27] =?UTF-8?q?=E6=9B=B4=E6=96=B0=E5=89=8D=E5=90=8E?= =?UTF-8?q?=E7=AB=AF=E9=A1=B9=E7=9B=AE=E4=BE=9D=E8=B5=96=E7=89=88=E6=9C=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- requirements.txt | Bin 0 -> 1984 bytes www/package.json | 44 ++++++++++++++++++++++++-------------------- 2 files changed, 24 insertions(+), 20 deletions(-) create mode 100644 requirements.txt diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000000000000000000000000000000000000..23b6030def4c3b2132ca38bf25d9ca2fe8418b9c GIT binary patch literal 1984 zcmZvd%}yIZ5QOI(DUV{r+CY*+4%~x8#3?6a*1rL-9q$@r9zIEZT{|9w&}zZZH9b|; zHRFGO$LT3e(<(irWvbJ6JykkOU-h1+&*@Vdr$)~@b^6o^IY|TkKBSF4w}C$w_UaA( zpkC{!jr9L7daLMloo>^ec3DK9UF^I);wtS}c5TyKUA98bj)==Q^@DiT!DcD`Q|-7D zGZb6|?ko*s*9UbO3pq_MQ5~wvYfiOlZK!Xeiu#eB%;dB_5b}>$5RUM&{nI)EIOg)f!oO4 zLm)@tJa~7TYL|%=ls;9DV9vOMNAzNPP4vUlO>{G>?8;1CS|L$wUF^dI%Z%VN)ze7B zRM?_EGg@dx7613@;`&1PuGfVJC25{5jF(c=^lDf+3PP{Vi&G5ZCxqr#lvcGRW%DJa@t-j?3 z%QDCYn%}7ChIC`!8uaG9YH$G4fpNpPL6Y`$iBY~%o#>k;rm{z zx^QkD`q%*sC)_>us&ZEwMV*@OroA-a9J@d2bJr{IH(y?lUecFOZY`X@;ppahXSM$U Du+Sbc literal 0 HcmV?d00001 diff --git a/www/package.json b/www/package.json index ade30cf..9291eaf 100644 --- a/www/package.json +++ b/www/package.json @@ -8,30 +8,34 @@ "preview": "vite preview" }, "dependencies": { - "@element-plus/icons-vue": "^2.0.10", - "@vueuse/core": "^9.4.0", - "axios": "^1.1.3", - "core-js": "^3.22.8", - "echarts": "^5.3.1", - "element-plus": "^2.2.28", - "js-base64": "^3.7.2", - "js-cookie": "^3.0.1", - "jsonref": "^7.0.0", - "pinia": "2.0.29", - "vue": "^3.2.45", - "vue-echarts": "^6.0.3", - "vue-router": "^4.1.6" + "@element-plus/icons-vue": "^2.3.1", + "@vueuse/core": "^10.7.0", + "axios": "^1.6.2", + "core-js": "^3.34.0", + "echarts": "^5.4.2", + "element-plus": "^2.4.4", + "js-base64": "^3.7.5", + "js-cookie": "^3.0.5", + "jsonref": "^8.0.8", + "pinia": "2.1.7", + "vue": "^3.3.13", + "vue-echarts": "^6.6.5", + "vue-router": "^4.2.5", + "xterm": "^5.3.0", + "xterm-addon-attach": "^0.9.0", + "xterm-addon-fit": "^0.8.0" + }, "devDependencies": { - "@rollup/plugin-dynamic-import-vars": "^2.0.2", - "@vitejs/plugin-vue": "^4.0.0", - "js-md5": "^0.7.3", + "@rollup/plugin-dynamic-import-vars": "^2.1.2", + "@vitejs/plugin-vue": "^4.5.2", + "js-md5": "^0.8.3", "sass": "^1.57.1", "unocss": "^0.48.4", - "unplugin-auto-import": "^0.12.1", - "unplugin-vue-components": "^0.22.12", - "vite": "^4.0.4", - "vite-plugin-dynamic-import": "^1.2.6" + "unplugin-auto-import": "^0.17.2", + "unplugin-vue-components": "^0.26.0", + "vite": "^5.0.10", + "vite-plugin-dynamic-import": "^1.5.0" }, "eslintConfig": { "root": true, From 3dc9ea1ad87ffef76b32d9cb6994865e5ae3028a Mon Sep 17 00:00:00 2001 From: 4linuxfun Date: Wed, 3 Jan 2024 16:08:27 +0800 Subject: [PATCH 26/27] =?UTF-8?q?update=EF=BC=9A=E6=9B=B4=E6=96=B0?= =?UTF-8?q?=E4=BE=9D=E8=B5=96=E5=BA=93=E7=89=88=E6=9C=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/requirements.txt | Bin 1254 -> 1384 bytes www/package.json | 8 ++++---- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/server/requirements.txt b/server/requirements.txt index cbbe88e8bbbab531def8c7dac8624126a796612f..2ad99e9edc1e6e9a27fefb0db5d06c776df50f14 100644 GIT binary patch delta 156 zcmaFH`GRXh5hHgUgDryrgC2tggX!j4#!TjVUIs3P9EN;`bcRxfA|Qq-Fas(wU;xP` z0_9T~av72sG8vM=@`hjoVDd!_1q_u?8B?H)F`b#8BqWL delta 24 fcmaFC^^9{v5hJ%PgDryrgC2u9gT>}r#!O}aR>uYO diff --git a/www/package.json b/www/package.json index 9291eaf..5d392ea 100644 --- a/www/package.json +++ b/www/package.json @@ -18,7 +18,7 @@ "js-cookie": "^3.0.5", "jsonref": "^8.0.8", "pinia": "2.1.7", - "vue": "^3.3.13", + "vue": "^3.4.3", "vue-echarts": "^6.6.5", "vue-router": "^4.2.5", "xterm": "^5.3.0", @@ -28,10 +28,10 @@ }, "devDependencies": { "@rollup/plugin-dynamic-import-vars": "^2.1.2", - "@vitejs/plugin-vue": "^4.5.2", + "@vitejs/plugin-vue": "^5.0.2", "js-md5": "^0.8.3", - "sass": "^1.57.1", - "unocss": "^0.48.4", + "sass": "^1.69.7", + "unocss": "^0.58.3", "unplugin-auto-import": "^0.17.2", "unplugin-vue-components": "^0.26.0", "vite": "^5.0.10", From 58d095b897b41d77c910d1f0300206cdf77073aa Mon Sep 17 00:00:00 2001 From: 4linuxfun Date: Wed, 3 Jan 2024 16:09:24 +0800 Subject: [PATCH 27/27] =?UTF-8?q?=E5=90=8E=E7=AB=AF=EF=BC=9A=E4=BF=AE?= =?UTF-8?q?=E5=A4=8D=E6=9B=B4=E6=96=B0pydantic=E5=90=8E=E5=B1=9E=E6=80=A7?= =?UTF-8?q?=E5=8F=98=E6=9B=B4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/settings.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/server/settings.py b/server/settings.py index 23601f3..651e438 100644 --- a/server/settings.py +++ b/server/settings.py @@ -3,7 +3,8 @@ import rpyc from typing import List from pathlib import Path -from pydantic import BaseSettings +from pydantic import MySQLDsn +from pydantic_settings import BaseSettings from sqlmodel import create_engine @@ -15,18 +16,19 @@ class APISettings(BaseSettings): CASBIN_MODEL_PATH: str = "server/model.conf" # sql数据库信息 - DATABASE_URI = "mysql+pymysql://root:123456@192.168.137.129/devops" + DATABASE_URI: MySQLDsn = "mysql+pymysql://root:123456@192.168.137.129/devops" # 白名单,不需要进行任何验证即可访问 NO_VERIFY_URL: List = [ '/', '/api/login', ] - rpyc_config = {'host': 'localhost', 'port': 18861, 'config': {"allow_public_attrs": True, 'allow_pickle': True}, - 'keepalive': True} + rpyc_config: dict = {'host': 'localhost', 'port': 18861, + 'config': {"allow_public_attrs": True, 'allow_pickle': True}, + 'keepalive': True} settings = APISettings() -engine = create_engine(settings.DATABASE_URI, pool_size=5, max_overflow=10, pool_timeout=30, pool_pre_ping=True) +engine = create_engine(str(settings.DATABASE_URI), pool_size=5, max_overflow=10, pool_timeout=30, pool_pre_ping=True) adapter = casbin_sqlalchemy_adapter.Adapter(engine) casbin_enforcer = casbin.Enforcer(settings.CASBIN_MODEL_PATH, adapter)