diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..23b6030 Binary files /dev/null and b/requirements.txt differ diff --git a/rpyc_scheduler/config.py b/rpyc_scheduler/config.py new file mode 100644 index 0000000..610e2eb --- /dev/null +++ b/rpyc_scheduler/config.py @@ -0,0 +1,11 @@ +from pydantic import BaseSettings + + +class RpcConfig(BaseSettings): + # apscheduler指定job store和excutors + apscheduler_job_store = 'mysql+pymysql://root:123456@192.168.137.129/devops' + redis = {'host': '192.168.137.129', 'password':'seraphim','port': 6379, 'health_check_interval': 30} + rpc_port = 18861 + + +rpc_config = RpcConfig() diff --git a/rpyc_scheduler/job.py b/rpyc_scheduler/job.py new file mode 100644 index 0000000..ddb3f82 --- /dev/null +++ b/rpyc_scheduler/job.py @@ -0,0 +1,38 @@ +from apscheduler.jobstores.base import BaseJobStore, JobLookupError, ConflictingIdError +from apscheduler.util import maybe_ref, datetime_to_utc_timestamp, utc_timestamp_to_datetime +from apscheduler.job import Job +try: + import cPickle as pickle +except ImportError: # pragma: nocover + import pickle + +try: + from sqlalchemy import ( + create_engine, Table, Column, MetaData, Unicode, Float, LargeBinary, select, and_) + from sqlalchemy.exc import IntegrityError + from sqlalchemy.sql.expression import null +except ImportError: # pragma: nocover + raise ImportError('SQLAlchemyJobStore requires SQLAlchemy installed') + + +class MyJobStore(BaseJobStore): + """ + 参照apscheduler自带的SQLAlchemyJobStore,重写JobStore + 1. 添加额外的字段 + 2. 重写部分方法,实现Job的保持 + """ + + def __init__(self, url=None, engine=None, tablename='apscheduler_jobs', metadata=None, + pickle_protocol=pickle.HIGHEST_PROTOCOL, tableschema=None, engine_options=None): + super().__init__(url=None, engine=None, tablename='apscheduler_jobs', metadata=None, + pickle_protocol=pickle.HIGHEST_PROTOCOL, tableschema=None, engine_options=None) + self.jobs_t = Table( + tablename, metadata, + Column('id', Unicode(191), primary_key=True), + Column('next_run_time', Float(25), index=True), + Column('job_state', LargeBinary, nullable=False), + schema=tableschema + ) + + def start(self, scheduler, alias): + super().start(scheduler, alias) diff --git a/rpyc_scheduler/models.py b/rpyc_scheduler/models.py new file mode 100644 index 0000000..d1e0a11 --- /dev/null +++ b/rpyc_scheduler/models.py @@ -0,0 +1,10 @@ +from typing import Optional +from datetime import datetime +from sqlmodel import SQLModel, Field, Column, Integer, create_engine, Session +from sqlalchemy.dialects import mysql +from config import rpc_config + + +engine = create_engine(rpc_config.apscheduler_job_store, pool_size=5, max_overflow=10, pool_timeout=30, + pool_pre_ping=True) +# SQLModel.metadata.create_all(engine) diff --git a/rpyc_scheduler/scheduler-server.py b/rpyc_scheduler/scheduler-server.py new file mode 100644 index 0000000..f4f408a --- /dev/null +++ b/rpyc_scheduler/scheduler-server.py @@ -0,0 +1,125 @@ +""" +远程调用apscheduler,参考: +https://github.com/agronholm/apscheduler/tree/3.x/examples/rpc +https://gist.github.com/gsw945/15cbb71eaca5be66787a2c187414e36f +""" + +import rpyc +from sqlmodel import text +from tasks import run_command_with_channel +from datetime import datetime +from loguru import logger +from config import rpc_config +from rpyc.utils.server import ThreadedServer +from apscheduler.job import Job +from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore +from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor +from apscheduler.events import ( + EVENT_JOB_EXECUTED, + EVENT_JOB_ERROR, + EVENT_JOB_ADDED, + EVENT_JOB_SUBMITTED, + EVENT_JOB_REMOVED +) + +from scheduler import CustomScheduler +from models import engine + + +def print_text(*args, **kwargs): + logger.debug(args) + logger.debug(kwargs) + + +class SchedulerService(rpyc.Service): + def exposed_add_job(self, func, *args, **kwargs): + return scheduler.add_job(func, *args, **kwargs) + + def exposed_modify_job(self, job_id, jobstore=None, **changes): + return scheduler.modify_job(job_id, jobstore, **changes) + + def exposed_pause_job(self, job_id, jobstore=None): + return scheduler.pause_job(job_id, jobstore) + + def exposed_resume_job(self, job_id, jobstore=None): + return scheduler.resume_job(job_id, jobstore) + + def exposed_remove_job(self, job_id, jobstore=None): + return scheduler.remove_job(job_id, jobstore) + + def exposed_get_jobs(self, jobstore=None): + return scheduler.get_jobs(jobstore) + + def exposed_get_job(self, job_id, jobstore=None): + return scheduler.get_job(job_id, jobstore) + + def exposed_get_user_jobs(self, uid, job_name, jobstore=None) -> list[Job]: + """ + 通过uid和job_name,获取用户的jobs列表 + :param uid:用户ID + :param job_name:指定匹配job name + :param jobstore: None + """ + logger.debug(f'get_user_jobs:{job_name}') + sql = text("select job_id from user_job where user_id=:uid ") + user_jobs = [] + with engine.connect() as conn: + results = conn.execute(sql, {'uid': uid}) + for job_id in results.fetchall(): + job = self.exposed_get_job(job_id[0], jobstore) + if job_name is None: + user_jobs.append(job) + elif (job.name.find(job_name)) >= 0: + user_jobs.append(job) + return user_jobs + + def exposed_switch_job(self, job_id, jobstore=None): + """ + 任务状态切换,暂停、启用 + """ + job = scheduler.get_job(job_id, jobstore) + if job.next_run_time is None: + now = datetime.now(job.trigger.timezone) + next_fire_time = job.trigger.get_next_fire_time(None, now) + if next_fire_time: + scheduler.resume_job(job_id, jobstore) + else: + raise ValueError('无法指定下次运行时间,请确认任务时间配置') + else: + scheduler.pause_job(job_id, jobstore) + + +def event_listener(event): + if event.code == EVENT_JOB_ADDED: + logger.debug('event add') + elif event.code == EVENT_JOB_EXECUTED: + logger.debug('event executed') + elif event.code == EVENT_JOB_REMOVED: + logger.debug('event removed') + elif event.code == EVENT_JOB_ERROR: + logger.debug('event error') + + +if __name__ == '__main__': + job_store = { + 'default': SQLAlchemyJobStore(url=rpc_config.apscheduler_job_store) + } + apscheduler_excutors = { + 'default': ThreadPoolExecutor(20), + 'processpool': ProcessPoolExecutor(20) + } + scheduler = CustomScheduler(jobstores=job_store, + excutors=apscheduler_excutors) + scheduler.add_listener(event_listener, + EVENT_JOB_EXECUTED | EVENT_JOB_ERROR | EVENT_JOB_ADDED | EVENT_JOB_REMOVED | EVENT_JOB_SUBMITTED) + scheduler.start() + + # 启动rpyc服务 + server = ThreadedServer(SchedulerService, port=rpc_config.rpc_port, + protocol_config={'allow_public_attrs': True, 'allow_pickle': True}) + try: + server.start() + except (KeyboardInterrupt, SystemExit): + pass + finally: + scheduler.shutdown() diff --git a/rpyc_scheduler/scheduler.py b/rpyc_scheduler/scheduler.py new file mode 100644 index 0000000..8b69e6b --- /dev/null +++ b/rpyc_scheduler/scheduler.py @@ -0,0 +1,114 @@ +from __future__ import print_function + +from datetime import datetime, timedelta +import six + +from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.executors.base import MaxInstancesReachedError, BaseExecutor +from apscheduler.util import (timedelta_seconds, TIMEOUT_MAX) +from apscheduler.events import (JobSubmissionEvent, EVENT_JOB_SUBMITTED, EVENT_JOB_MAX_INSTANCES) + +try: + from collections.abc import MutableMapping +except ImportError: + from collections import MutableMapping + +STATE_STOPPED = 0 +#: constant indicating a scheduler's running state (started and processing jobs) +STATE_RUNNING = 1 +#: constant indicating a scheduler's paused state (started but not processing jobs) +STATE_PAUSED = 2 + + +class CustomScheduler(BackgroundScheduler): + def _process_jobs(self): + """ + 修改_process_jobs方法,把remove_job全部替换为pause_job,当任务执行完成后改成暂停状态 + """ + if self.state == STATE_PAUSED: + self._logger.debug('Scheduler is paused -- not processing jobs') + return None + + self._logger.debug('Looking for jobs to run') + now = datetime.now(self.timezone) + next_wakeup_time = None + events = [] + + with self._jobstores_lock: + for jobstore_alias, jobstore in six.iteritems(self._jobstores): + try: + due_jobs = jobstore.get_due_jobs(now) + except Exception as e: + # Schedule a wakeup at least in jobstore_retry_interval seconds + self._logger.warning('Error getting due jobs from job store %r: %s', + jobstore_alias, e) + retry_wakeup_time = now + timedelta(seconds=self.jobstore_retry_interval) + if not next_wakeup_time or next_wakeup_time > retry_wakeup_time: + next_wakeup_time = retry_wakeup_time + + continue + + for job in due_jobs: + # Look up the job's executor + try: + executor = self._lookup_executor(job.executor) + except BaseException: + self._logger.error( + 'Executor lookup ("%s") failed for job "%s" -- pause it from the ' + 'job store', job.executor, job) + self.pause_job(job.id, jobstore_alias) + continue + + run_times = job._get_run_times(now) + run_times = run_times[-1:] if run_times and job.coalesce else run_times + if run_times: + try: + executor.submit_job(job, run_times) + except MaxInstancesReachedError: + self._logger.warning( + 'Execution of job "%s" skipped: maximum number of running ' + 'instances reached (%d)', job, job.max_instances) + event = JobSubmissionEvent(EVENT_JOB_MAX_INSTANCES, job.id, + jobstore_alias, run_times) + events.append(event) + except BaseException: + self._logger.exception('Error submitting job "%s" to executor "%s"', + job, job.executor) + else: + event = JobSubmissionEvent(EVENT_JOB_SUBMITTED, job.id, jobstore_alias, + run_times) + events.append(event) + + # 如果存在next_run_time,则更新 + # 否则暂停此任务 + job_next_run = job.trigger.get_next_fire_time(run_times[-1], now) + if job_next_run: + job._modify(next_run_time=job_next_run) + jobstore.update_job(job) + else: + self.pause_job(job.id, jobstore_alias) + + # Set a new next wakeup time if there isn't one yet or + # the jobstore has an even earlier one + jobstore_next_run_time = jobstore.get_next_run_time() + if jobstore_next_run_time and (next_wakeup_time is None or + jobstore_next_run_time < next_wakeup_time): + next_wakeup_time = jobstore_next_run_time.astimezone(self.timezone) + + # Dispatch collected events + for event in events: + self._dispatch_event(event) + + # Determine the delay until this method should be called again + if self.state == STATE_PAUSED: + wait_seconds = None + self._logger.debug('Scheduler is paused; waiting until resume() is called') + elif next_wakeup_time is None: + wait_seconds = None + self._logger.debug('No jobs; waiting until a job is added') + else: + wait_seconds = min(max(timedelta_seconds(next_wakeup_time - now), 0), TIMEOUT_MAX) + self._logger.debug('Next wakeup is due at %s (in %f seconds)', next_wakeup_time, + wait_seconds) + + return wait_seconds diff --git a/rpyc_scheduler/tasks.py b/rpyc_scheduler/tasks.py new file mode 100644 index 0000000..a978e9b --- /dev/null +++ b/rpyc_scheduler/tasks.py @@ -0,0 +1,58 @@ +import subprocess +import json +from typing import List, Dict, Any +from datetime import datetime +from loguru import logger +from sqlmodel import text +from utils import Channel +from config import rpc_config +from models import engine + + +def local_executor(job_id, host, command): + with Channel(rpc_config.redis, job_id=f"{job_id}:{host}") as channel: + start_time = datetime.now() + channel.send({'msg': '开始执行任务:'}) + channel.send({'msg': f"执行命令:{command}"}) + process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, + shell=True) + while (status := process.poll()) is None: + message = process.stdout.readline() + channel.send({'msg': message}) + channel.send({'msg': '结束执行任务'}) + end_time = datetime.now() + return status, (end_time - start_time).total_seconds(), channel.msg + + +def host_executor(job_id, host, command): + pass + + +def run_command_with_channel(job_id=None, targets: List[str] = None, command=None): + """ + 本地执行命令,并把任务实时输入redis中。任务结束后,日志写入数据库中 + :param job_id:任务名 + :param targets: 主机列表 + :param command: 执行的命令 + """ + run_logs: Dict[str, Dict[str, Any]] = {} + remote_host: List[str] = [] + start_time = datetime.now() + for host in targets: + if host == 'localhost': + status, duration, log = local_executor(job_id, host, command) + run_logs[host] = {'status': status, + 'duration': 0, + 'log': log} + else: + remote_host.append(host) + end_time = datetime.now() + with engine.connect() as conn: + sql = text( + "INSERT INTO job_log (status,job_id,start_time,end_time,log) values (:status,:job_id,:start_time,:end_time,:log)") + conn.execute(sql, + {'status': status, 'job_id': job_id, + 'start_time': start_time, + 'end_time': end_time, + 'log': json.dumps(run_logs)}) + conn.commit() diff --git a/rpyc_scheduler/utils.py b/rpyc_scheduler/utils.py new file mode 100644 index 0000000..c197311 --- /dev/null +++ b/rpyc_scheduler/utils.py @@ -0,0 +1,39 @@ +import redis +from loguru import logger +from typing import Dict, Any + + +class Channel: + def __init__(self, redis_config, job_id): + self.conn = redis.Redis(**redis_config, decode_responses=True) + self.job_id = job_id + self.task_key = f"tasks:{self.job_id}" + self._expire = 60 + + @property + def msg(self, ): + return self.conn.xrange(self.task_key, '-', '+') + + @property + def expire(self, ) -> int: + return self._expire + + @expire.setter + def expire(self, value: int): + self._expire = value + + def send(self, msg: Dict[Any, Any]): + self.conn.xadd(self.task_key, msg) + + def delete(self, ): + self.conn.delete(self.task_key) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.conn.expire(self.task_key, self._expire) + self.close() + + def close(self, ): + self.conn.close() diff --git a/server/common/database.py b/server/common/database.py index 0aefca4..90b4865 100644 --- a/server/common/database.py +++ b/server/common/database.py @@ -1,3 +1,4 @@ +import redis.asyncio as redis from sqlmodel import create_engine, SQLModel, Session, select from ..settings import engine @@ -32,3 +33,11 @@ def get_or_create(session: Session, model, **kwargs): session.add(instance) session.commit() return instance + + +async def get_redis(): + redis_conn = redis.Redis(**settings.redis_config) + try: + yield redis_conn + finally: + await redis_conn.close() diff --git a/server/common/dep.py b/server/common/dep.py new file mode 100644 index 0000000..dd3ba05 --- /dev/null +++ b/server/common/dep.py @@ -0,0 +1,8 @@ +from fastapi import Request + + +async def get_uid(request: Request) -> int: + """ + 从request头部中获取uid信息 + """ + return request.state.uid diff --git a/server/common/utils.py b/server/common/utils.py index 5c2e890..9150880 100644 --- a/server/common/utils.py +++ b/server/common/utils.py @@ -1,8 +1,11 @@ import os +import websockets +from fastapi import WebSocket from typing import List, Generic, Type, TypeVar from loguru import logger from sqlmodel import SQLModel from ..models.internal.menu import MenusWithChild +from ..crud.internal import job_log T = TypeVar('T', bound=SQLModel) @@ -64,3 +67,39 @@ def update_model(old_model, new_model): def remove_tmp_file(file): logger.debug(f'删除临时文件{file}') os.remove(file) + + +async def get_task_logs(ws: WebSocket, redis, session, task_id: str, trigger: str): + key_name = f'tasks:{task_id}' + last_id = 0 + sleep_ms = 5000 + if trigger == 'date': + job_log.get(session,) + while True: + if await redis.exists(key_name): + resp = await redis.xread({f'{key_name}': last_id}, count=1, block=sleep_ms) + if resp: + key, message = resp[0] + last_id = message[0][0] + # last_id, data = message[0] + # data_dict = {k.decode("utf-8"): data[k].decode("utf-8") for k in data} + try: + await ws.send_json({'task_id': task_id, 'data': message}) + except websockets.exceptions.ConnectionClosed as e: + logger.warning(f"websocket 异常关闭:{e}") + break + elif last_id == 0 and (result.state == 'FAILURE' or result.state == 'SUCCESS'): + # last_id为0表示redis中不存在key,task_id为已执行完成的任务,且redis缓存已过期 + logger.debug(f'{task_id} state:{result.state}') + try: + await ws.send_json({'task_id': task_id, 'data': AsyncResult(task_id).result}) + except websockets.exceptions.ConnectionClosed as e: + logger.warning(f"websocket 异常关闭:{e}") + break + elif result.state == 'PENDING' or result.state == 'STARTED': + logger.debug(f"{task_id} state:{result.state}") + continue + else: + logger.debug(f'{task_id}已结束') + break + return True diff --git a/server/crud/internal/__init__.py b/server/crud/internal/__init__.py index 51bdf03..56f1406 100644 --- a/server/crud/internal/__init__.py +++ b/server/crud/internal/__init__.py @@ -2,3 +2,4 @@ from .roles import role from .menu import menu from .dictonary import data_dict, dict_item +from .job import job_log diff --git a/server/crud/internal/job.py b/server/crud/internal/job.py new file mode 100644 index 0000000..67c8254 --- /dev/null +++ b/server/crud/internal/job.py @@ -0,0 +1,13 @@ +from typing import Union +from loguru import logger +from sqlmodel import select, Session +from ...models.internal.job import JobLog +from ..base import CRUDBase +from .roles import role + + +class CRUDJobLog(CRUDBase[JobLog]): + pass + + +job_log = CRUDJobLog(JobLog) diff --git a/server/main.py b/server/main.py index 6c46ca1..420e821 100644 --- a/server/main.py +++ b/server/main.py @@ -1,7 +1,7 @@ from fastapi import FastAPI, Depends from loguru import logger from .common.log import init_logging -from .routers.internal import login, user, menu, roles, dictonary +from .routers.internal import login, user, menu, roles, dictonary, job from .common.security import auth_check init_logging() @@ -13,6 +13,7 @@ app.include_router(menu.router, tags=['菜单管理']) app.include_router(roles.router, tags=['角色管理']) app.include_router(dictonary.router, tags=['数据字典']) +app.include_router(job.router, tags=['任务管理']) @app.on_event("startup") diff --git a/server/models/internal/__init__.py b/server/models/internal/__init__.py index 95cd37a..b138003 100644 --- a/server/models/internal/__init__.py +++ b/server/models/internal/__init__.py @@ -2,3 +2,4 @@ from .menu import Menu from .role import Role, RoleMenu from .dictonary import DataDict, DictItem +from .job import Job, JobLog diff --git a/server/models/internal/job.py b/server/models/internal/job.py new file mode 100644 index 0000000..a15c622 --- /dev/null +++ b/server/models/internal/job.py @@ -0,0 +1,54 @@ +from enum import Enum +from typing import Union, List, Tuple, Dict, Any, Optional +from sqlmodel import SQLModel, Field, Column, Relationship, Integer, Unicode, LargeBinary, JSON +from sqlalchemy.dialects import mysql +from datetime import datetime +from pydantic import BaseModel +from .relationships import UserJob + + +class CronJobArgs(BaseModel): + cron: str + start_date: Optional[str] + end_date: Optional[str] + + +class TriggerEnum(str, Enum): + date = 'date' + cron = 'cron' + + +class JobAdd(BaseModel): + id: Optional[str] = None + name: str + targets: List[str] + trigger: TriggerEnum + trigger_args: Union[CronJobArgs, str] = None + command: str + type: str + + +class Job(SQLModel, table=True): + """ + 此表同步apschedule中的建表语句,如果没有,则apscheduler会自动创建对应表 + """ + __tablename__ = 'apscheduler_jobs' + id: str = Field(sa_column=Column('id', Unicode(191), primary_key=True)) + next_run_time: float = Field(sa_column=Column('next_run_time', mysql.DOUBLE, index=True)) + job_state: bytes = Field(sa_column=Column('job_state', LargeBinary, nullable=False)) + logs: List["JobLog"] = Relationship(back_populates="job") + user: "User" = Relationship(back_populates="jobs", link_model=UserJob) + + +class JobLog(SQLModel, table=True): + __tablename__ = 'job_log' + id: Optional[int] = Field(sa_column=Column('id', Integer, primary_key=True, autoincrement=True)) + status: int = Field(default=0, sa_column=Column(mysql.TINYINT, comment='执行命令返回状态')) + start_time: datetime = Field(default_factory=datetime.now, sa_column_kwargs={'comment': '任务开始时间'}) + end_time: datetime = Field(default=datetime.now, sa_column_kwargs={'comment': '任务结束时间'}) + log: JSON = Field(sa_column=Column(JSON, comment='执行日志')) + job_id: Optional[str] = Field(default=None, foreign_key="apscheduler_jobs.id") + job: Optional[Job] = Relationship(back_populates="logs") + + class Config: + arbitrary_types_allowed = True diff --git a/server/models/internal/relationships.py b/server/models/internal/relationships.py index 1a4188f..37fcf55 100644 --- a/server/models/internal/relationships.py +++ b/server/models/internal/relationships.py @@ -1,4 +1,4 @@ -from sqlmodel import SQLModel, Field +from sqlmodel import SQLModel, Field, Unicode # 这些是权限验证的基础表,单独放置 @@ -14,3 +14,11 @@ class UserRole(SQLModel, table=True): user_id: int = Field(foreign_key="user.id", primary_key=True) role_id: int = Field(foreign_key="roles.id", primary_key=True) + +class UserJob(SQLModel, table=True): + """ + 通过中间表实现:用户-任务的对应关系 + """ + __tablename__ = 'user_job' + user_id: int = Field(foreign_key="user.id", primary_key=True, nullable=False) + job_id: str = Field(Unicode(191), foreign_key="apscheduler_jobs.id", primary_key=True, nullable=False) diff --git a/server/models/internal/user.py b/server/models/internal/user.py index 0292a31..f192ccc 100644 --- a/server/models/internal/user.py +++ b/server/models/internal/user.py @@ -1,11 +1,12 @@ from typing import Optional, List, Literal, TYPE_CHECKING from pydantic import BaseModel from sqlmodel import SQLModel, Field, Relationship, Column, Integer, Boolean -from .relationships import UserRole +from .relationships import UserRole, UserJob from .role import Role if TYPE_CHECKING: from .role import Role + from .job import Job class UserWithOutPasswd(SQLModel): @@ -38,6 +39,7 @@ class UserRoles(SQLModel): class User(UserBase, table=True): id: Optional[int] = Field(sa_column=Column('id', Integer, primary_key=True, autoincrement=True)) roles: List['Role'] = Relationship(back_populates="users", link_model=UserRole) + jobs: List['Job'] = Relationship(back_populates="user", link_model=UserJob) class UserCreateWithRoles(SQLModel): diff --git a/server/requirements.txt b/server/requirements.txt index cbbe88e..2ad99e9 100644 Binary files a/server/requirements.txt and b/server/requirements.txt differ diff --git a/server/routers/form.py b/server/routers/form.py deleted file mode 100644 index 60d8df9..0000000 --- a/server/routers/form.py +++ /dev/null @@ -1,21 +0,0 @@ -from fastapi import APIRouter, Request -from ..schemas import ApiResponse -from ..models import User - -router = APIRouter(prefix='/api') - - -# class UserForm(User): -# roles: List[Role] - - -@router.get('/form/{name}') -async def get_form_model(name: str, request: Request): - print(name) - print(request.app.openapi()) - print(request.app.openapi()['components']['schemas'][name]['properties']) - return ApiResponse( - code=0, - message='success', - data=User.schema_json() - ) diff --git a/server/routers/internal/job.py b/server/routers/internal/job.py new file mode 100644 index 0000000..9e81c7b --- /dev/null +++ b/server/routers/internal/job.py @@ -0,0 +1,238 @@ +import rpyc +import re +import anyio +from datetime import datetime +from uuid import uuid4 +from typing import List, Any, Dict +from sqlmodel import Session, text +from apscheduler.job import Job +from apscheduler.triggers.interval import IntervalTrigger +from apscheduler.triggers.date import DateTrigger +from apscheduler.triggers.cron import CronTrigger +from fastapi import APIRouter, Depends, WebSocket, Request +from loguru import logger +from server.settings import settings +from server.models.internal.job import JobAdd, JobLog +from server.common.database import get_session, get_redis +from server.common.response_code import ApiResponse, SearchResponse +from server.common.utils import get_task_logs +from server.schemas.internal.pagination import Pagination +from server.schemas.job import JobSearch, JobLogs, JobLogSearch +from server import crud +from server.common.dep import get_uid + +router = APIRouter(prefix='/api/jobs') + + +def cron_to_dict(cron): + cron_list = re.split(r'\s+', cron) + return {'minute': cron_list[0], 'hour': cron_list[1], 'day': cron_list[2], 'month': cron_list[3], + 'day_of_week': cron_list[4]} + + +@router.get('/switch/{job_id}', summary='切换任务状态') +async def switch_job(job_id: str, ): + try: + conn = rpyc.connect(**settings.rpyc_config) + conn.root.switch_job(job_id) + except ValueError as e: + logger.warning(e) + return ApiResponse( + code=500, + message=str(e) + ) + return ApiResponse() + + +@router.get('/resume/{job_id}', summary='恢复任务') +async def resume_job(job_id: str): + try: + conn = rpyc.connect(**settings.rpyc_config) + conn.root.resume_job(job_id) + except Exception as e: + logger.warning(e) + return ApiResponse( + code=500, + message='恢复任务出错,请联系管理员!' + ) + return ApiResponse() + + +@router.delete('/{job_id}', summary='删除任务', response_model=ApiResponse[str]) +async def delete_job(job_id: str, uid: int = Depends(get_uid), session: Session = Depends(get_session)): + sql = text("select job_id from user_job where user_id=:uid") + user_jobs = tuple([job[0] for job in session.execute(sql, {'uid': uid})]) + logger.debug(user_jobs) + if job_id not in user_jobs: + return ApiResponse( + code=500, + message='用户无权限操作此任务!' + ) + try: + conn = rpyc.connect(**settings.rpyc_config) + conn.root.pause_job(job_id) + delete_user_job = text("delete from user_job where job_id=:job_id") + session.execute(delete_user_job, {'job_id': job_id}) + delete_job_logs = text("delete from job_log where job_id=:job_id") + session.execute(delete_job_logs, {'job_id': job_id}) + session.commit() + conn.root.remove_job(job_id) + except Exception as e: + logger.warning(e) + return ApiResponse( + code=500, + message='删除任务出错,请联系管理员!' + ) + return ApiResponse() + + +@router.put('/', summary='修改任务', response_model=ApiResponse[str]) +async def modify_job(job: JobAdd, session: Session = Depends(get_session)): + logger.debug(job) + if job.trigger == 'cron': + trigger_args = job.trigger_args.dict() + trigger_args.update(cron_to_dict(job.trigger_args.cron)) + del trigger_args['cron'] + trigger = CronTrigger(**trigger_args) + elif job.trigger == 'date': + trigger = DateTrigger(run_date=job.trigger_args) + try: + conn = rpyc.connect(**settings.rpyc_config) + conn.root.modify_job(job.id, kwargs={'job_id': job.id, 'targets': job.targets, 'command': job.command}, + name=job.name, trigger=trigger) + except Exception as e: + logger.warning(e) + return ApiResponse( + code=500, + message=str(e) + ) + return ApiResponse() + + +@router.post('/', summary='添加任务', response_model=ApiResponse[str]) +async def add_job(job: JobAdd, uid: int = Depends(get_uid), session: Session = Depends(get_session)): + logger.debug(job) + # 手动生成job_id,需要传递到内部 + job_id = uuid4().hex + # 只支持cron的date任务,interval间隔任务完全可以用cron替代,没必要单独实现功能 + if job.trigger == 'cron': + trigger_args: Dict[str, Any] = job.trigger_args.dict() + trigger_args.update(cron_to_dict(job.trigger_args.cron)) + del trigger_args['cron'] + elif job.trigger == 'date': + trigger_args = {'run_date': job.trigger_args} + try: + conn = rpyc.connect(**settings.rpyc_config) + job = conn.root.add_job('scheduler-server:run_command_with_channel', trigger=job.trigger.value, + kwargs={'job_id': job_id, + 'targets': job.targets, + 'command': job.command}, id=job_id, name=job.name, **trigger_args) + sql = text("INSERT INTO user_job values (:uid,:job_id)") + session.execute(sql, {'uid': uid, "job_id": job_id}) + session.commit() + except Exception as e: + logger.warning(e) + return ApiResponse( + code=500, + message=str(e) + ) + return ApiResponse( + data=job.id + ) + + +@router.post('/search', summary='获取所有任务', response_model=ApiResponse[SearchResponse[Any]]) +async def show_jobs(search: Pagination[JobSearch], uid: int = Depends(get_uid)): + job_name = search.search['job_name'] + try: + conn = rpyc.connect(**settings.rpyc_config) + user_jobs: List[Job] = conn.root.get_user_jobs(uid, job_name) + except Exception as e: + logger.warning(e) + return ApiResponse( + code=500, + message=str(e) + ) + if len(user_jobs) == 0: + return ApiResponse( + data={ + 'total': len(user_jobs), + 'data': [] + } + ) + + logger.debug(user_jobs) + + job_info_list: List[Dict[str, Any]] = [] + start = (search.page - 1) * search.page_size + end = search.page * search.page_size - 1 + for job in user_jobs[start:end]: + logger.debug(job.trigger) + logger.debug(job.kwargs) + trigger_args: Dict[str, str] = {} + info = {} + if isinstance(job.trigger, CronTrigger): + logger.debug('cron') + for field in job.trigger.fields: + trigger_args[field.name] = str(field) + info.update({ + 'id': job.id, + 'name': job.name, + 'targets': job.kwargs['targets'], + 'trigger': 'cron', + 'trigger_args': { + 'cron': f"{trigger_args['minute']} {trigger_args['hour']} {trigger_args['day']} {trigger_args['month']} {trigger_args['day_of_week']}", + 'start_date': None if job.trigger.start_date is None else job.trigger.start_date.strftime( + "%Y-%m-%d %H:%M:%S"), + 'end_date': None if job.trigger.end_date is None else job.trigger.end_date.strftime( + "%Y-%m-%d %H:%M:%S"), + }, + 'command': job.kwargs['command'], + 'status': 'running' if job.next_run_time is not None else 'stop' + }) + elif isinstance(job.trigger, DateTrigger): + info.update({ + 'id': job.id, + 'name': job.name, + 'targets': job.kwargs['targets'], + 'trigger': 'date', + 'trigger_args': job.trigger.run_date.strftime( + "%Y-%m-%d %H:%M:%S"), + 'command': job.kwargs['command'], + 'status': 'running' if job.next_run_time is not None else 'stop' + }) + logger.debug(info) + job_info_list.append(info) + logger.debug(job_info_list) + return ApiResponse( + data={ + 'total': len(user_jobs), + 'data': job_info_list + } + ) + + +@router.post('/logs', summary='任务日志查询', response_model=ApiResponse[SearchResponse[JobLogs]]) +async def job_logs(page_search: Pagination[JobLogSearch], session: Session = Depends(get_session)): + filter_type = JobLogSearch(job_id='eq') + logger.debug(page_search) + total = crud.internal.job_log.search_total(session, page_search.search, filter_type.dict()) + jobs = crud.internal.job_log.search(session, page_search, filter_type.dict()) + logger.debug(jobs) + return ApiResponse( + data={ + 'total': total, + 'data': jobs + } + ) + + +@router.websocket('/logs/ws/') +async def websocket_endpoint(id: int, job_id: str, trigger: str, websocket: WebSocket, + session: Session = Depends(get_session), + redis=Depends(get_redis)): + await websocket.accept() + async with anyio.create_task_group() as tg: + tg.start_soon(get_task_logs, websocket, redis, session, task_id, trigger) + logger.debug('close websocket') + await websocket.close() diff --git a/server/routers/internal/login.py b/server/routers/internal/login.py index 84dccd0..3e3e434 100644 --- a/server/routers/internal/login.py +++ b/server/routers/internal/login.py @@ -7,6 +7,7 @@ from ...common.response_code import ApiResponse from ...common.security import create_access_token from ...models.internal import User, Menu +from ...common.dep import get_uid from ...models.internal.user import UserLogin, LoginResponse from ... import crud from ...common.utils import menu_convert @@ -44,7 +45,7 @@ async def login(login_form: UserLogin, session: Session = Depends(get_session)): @router.get('/permission', summary='获取权限') -async def get_permission(request: Request, session: Session = Depends(get_session)): +async def get_permission(uid:int=Depends(get_uid), session: Session = Depends(get_session)): """ 用户权限请求,返回拥有权限的菜单列表,前端根据返回的菜单列表信息,合成菜单项 :param request: @@ -52,7 +53,6 @@ async def get_permission(request: Request, session: Session = Depends(get_sessio :param token: :return: """ - uid: int = request.state.uid logger.debug(f"uid is:{uid}") user: User = crud.internal.user.get(session, uid) logger.debug(user.roles) diff --git a/server/schemas/job.py b/server/schemas/job.py new file mode 100644 index 0000000..df302ea --- /dev/null +++ b/server/schemas/job.py @@ -0,0 +1,21 @@ +from typing import Optional, Any +from pydantic import BaseModel +from sqlmodel import SQLModel +from datetime import datetime + + +class JobSearch(SQLModel): + job_name: Optional[str] = None + + +class JobLogs(SQLModel): + id: int + status: int + start_time: datetime + end_time: datetime + log: Any + job_id: str + + +class JobLogSearch(BaseModel): + job_id: str diff --git a/server/settings.py b/server/settings.py index c1b649d..651e438 100644 --- a/server/settings.py +++ b/server/settings.py @@ -1,8 +1,10 @@ import casbin_sqlalchemy_adapter import casbin +import rpyc from typing import List from pathlib import Path -from pydantic import BaseSettings +from pydantic import MySQLDsn +from pydantic_settings import BaseSettings from sqlmodel import create_engine @@ -14,16 +16,19 @@ class APISettings(BaseSettings): CASBIN_MODEL_PATH: str = "server/model.conf" # sql数据库信息 - DATABASE_URI = "mysql+pymysql://root:123456@192.168.137.129/devops" + DATABASE_URI: MySQLDsn = "mysql+pymysql://root:123456@192.168.137.129/devops" # 白名单,不需要进行任何验证即可访问 NO_VERIFY_URL: List = [ '/', '/api/login', ] + rpyc_config: dict = {'host': 'localhost', 'port': 18861, + 'config': {"allow_public_attrs": True, 'allow_pickle': True}, + 'keepalive': True} settings = APISettings() -engine = create_engine(settings.DATABASE_URI, pool_size=5, max_overflow=10, pool_timeout=30, pool_pre_ping=True) +engine = create_engine(str(settings.DATABASE_URI), pool_size=5, max_overflow=10, pool_timeout=30, pool_pre_ping=True) adapter = casbin_sqlalchemy_adapter.Adapter(engine) casbin_enforcer = casbin.Enforcer(settings.CASBIN_MODEL_PATH, adapter) diff --git a/www/package.json b/www/package.json index ade30cf..5d392ea 100644 --- a/www/package.json +++ b/www/package.json @@ -8,30 +8,34 @@ "preview": "vite preview" }, "dependencies": { - "@element-plus/icons-vue": "^2.0.10", - "@vueuse/core": "^9.4.0", - "axios": "^1.1.3", - "core-js": "^3.22.8", - "echarts": "^5.3.1", - "element-plus": "^2.2.28", - "js-base64": "^3.7.2", - "js-cookie": "^3.0.1", - "jsonref": "^7.0.0", - "pinia": "2.0.29", - "vue": "^3.2.45", - "vue-echarts": "^6.0.3", - "vue-router": "^4.1.6" + "@element-plus/icons-vue": "^2.3.1", + "@vueuse/core": "^10.7.0", + "axios": "^1.6.2", + "core-js": "^3.34.0", + "echarts": "^5.4.2", + "element-plus": "^2.4.4", + "js-base64": "^3.7.5", + "js-cookie": "^3.0.5", + "jsonref": "^8.0.8", + "pinia": "2.1.7", + "vue": "^3.4.3", + "vue-echarts": "^6.6.5", + "vue-router": "^4.2.5", + "xterm": "^5.3.0", + "xterm-addon-attach": "^0.9.0", + "xterm-addon-fit": "^0.8.0" + }, "devDependencies": { - "@rollup/plugin-dynamic-import-vars": "^2.0.2", - "@vitejs/plugin-vue": "^4.0.0", - "js-md5": "^0.7.3", - "sass": "^1.57.1", - "unocss": "^0.48.4", - "unplugin-auto-import": "^0.12.1", - "unplugin-vue-components": "^0.22.12", - "vite": "^4.0.4", - "vite-plugin-dynamic-import": "^1.2.6" + "@rollup/plugin-dynamic-import-vars": "^2.1.2", + "@vitejs/plugin-vue": "^5.0.2", + "js-md5": "^0.8.3", + "sass": "^1.69.7", + "unocss": "^0.58.3", + "unplugin-auto-import": "^0.17.2", + "unplugin-vue-components": "^0.26.0", + "vite": "^5.0.10", + "vite-plugin-dynamic-import": "^1.5.0" }, "eslintConfig": { "root": true, diff --git a/www/src/api/jobs.js b/www/src/api/jobs.js new file mode 100644 index 0000000..0d8fc00 --- /dev/null +++ b/www/src/api/jobs.js @@ -0,0 +1,8 @@ +import {GET, POST, PUT, DELETE} from '@/utils/request' + +export const PostNewCronJob = (dict) => POST('/api/jobs/', dict) +export const GetJobList = () => GET('/api/jobs/') +export const DelJob = (jobId) => DELETE('/api/jobs/' + jobId) +export const PutCronJob = (job) => PUT('/api/jobs/', job) +export const SwitchJob = (jobId) => GET('/api/jobs/switch/' + jobId) +export const GetLogs = (jobId) => GET('/api/jobs/logs') \ No newline at end of file diff --git a/www/src/main.js b/www/src/main.js index 7e7d136..4c350ea 100644 --- a/www/src/main.js +++ b/www/src/main.js @@ -2,7 +2,7 @@ import {createApp} from 'vue' import App from './App.vue' import ElementPlus from 'element-plus' import * as ElementPlusIconsVue from '@element-plus/icons-vue' -import zhCn from 'element-plus/es/locale/lang/zh-cn' +import zhCn from 'element-plus/dist/locale/zh-cn' import 'element-plus/dist/index.css' import router from './router' import {createPinia} from 'pinia' diff --git a/www/src/utils/request.js b/www/src/utils/request.js index bbc5ac6..c8f8081 100644 --- a/www/src/utils/request.js +++ b/www/src/utils/request.js @@ -2,7 +2,8 @@ import axios from 'axios' // import { Notification, detailBox } from 'element-ui' import {useStore} from '../stores' import {getToken} from '@/utils/auth' -import {ElNotification} from 'element-plus' +import {ElMessage, ElMessageBox, ElNotification} from 'element-plus' +import {DelJob} from '@/api/jobs' // 创建axios实例 const service = axios.create({ @@ -148,4 +149,22 @@ export function DELETE(url, params) { }) } +export function ConfirmDel(txt,func,id) { + ElMessageBox.confirm(txt, '警告', {type: 'warning'}).then(() => { + func(id).then(() => { + ElMessage({ + title: 'success', + message: '删除成功', + type: 'success' + }) + }) + }).catch(() => { + ElMessage({ + title: 'success', + message: '取消删除操作', + type: 'warning' + }) + }) + } + export default service \ No newline at end of file diff --git a/www/src/views/jobs/JobManage/AddJob.vue b/www/src/views/jobs/JobManage/AddJob.vue new file mode 100644 index 0000000..10294c9 --- /dev/null +++ b/www/src/views/jobs/JobManage/AddJob.vue @@ -0,0 +1,183 @@ + + + + + + + + + + + + + + + + + + + + + 在脚本页面创建脚本,然后这里选择对应脚本 + + + + + + + + + + + + 添加执行对象 + + + + + + + + + + + + + 执行时间 + + 不指定时间表示立即执行。 + + + + + + 执行规则 + + 开始时间 + + 结束时间 + + + + + + + + + + + + 提交 + 下一步 + 上一步 + + + + + + + + + + + \ No newline at end of file diff --git a/www/src/views/jobs/JobManage/JobLogs.vue b/www/src/views/jobs/JobManage/JobLogs.vue new file mode 100644 index 0000000..d9771f2 --- /dev/null +++ b/www/src/views/jobs/JobManage/JobLogs.vue @@ -0,0 +1,75 @@ + + + {{ props.job.id }} + {{ props.job.name }} + {{ props.job.trigger }} + {{ props.job.command }} + + {{ props.job.trigger_args.cron }} + {{ props.job.trigger_args.start_date }} + {{ props.job.trigger_args.end_date }} + + + {{ props.job.trigger_args }} + + + + + + + + + + {{ scope.row.status === 0 ? '正常' : '失败' }} + + + + + + 详情 + + + + + + + + + + + + + + \ No newline at end of file diff --git a/www/src/views/jobs/JobManage/LogDetail.vue b/www/src/views/jobs/JobManage/LogDetail.vue new file mode 100644 index 0000000..a94e1f3 --- /dev/null +++ b/www/src/views/jobs/JobManage/LogDetail.vue @@ -0,0 +1,102 @@ + + + + 执行成功 + + {{ status.success }} + + + + 执行失败 + + {{ status.failure }} + + + + 平均耗时(秒) + + {{status.duration}} + + + + + + + 运行耗时:{{value['duration']}} + 返回状态:{{value['status']}} + 执行日志:{{hostLogs[key]}} + + + + + + + + + \ No newline at end of file diff --git a/www/src/views/jobs/JobManage/index.vue b/www/src/views/jobs/JobManage/index.vue new file mode 100644 index 0000000..85a951d --- /dev/null +++ b/www/src/views/jobs/JobManage/index.vue @@ -0,0 +1,124 @@ + + + + + + + + 查询 + 新建 + + + + + + + + + + + {{ scope.row.trigger === 'cron' ? 'Cron' : 'Date'}} + + + + + + + + + + + + 编辑 + 日志 + 删除 + + + + + + + + + + + + + + + + + + + + \ No newline at end of file
+ {{ status.success }} +
+ {{ status.failure }} +
+ {{status.duration}} +