Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
53ba21d
update:前端-优化useTerm实现代码
4linuxfun Jan 17, 2025
2143ded
update:后端-简化scheduler任务,删除不必要代码
4linuxfun Jan 21, 2025
8daef2b
update:后端-scheduler完善
4linuxfun Jan 21, 2025
ae16ea1
update:后端-排除websocket权限验证
4linuxfun Jan 21, 2025
19e8bad
update:后端-实现“任务管理”模块功能
4linuxfun Jan 21, 2025
f2bf6d5
update:前端-重构usePagination
4linuxfun Jan 21, 2025
bf8f03c
update:前端-实现“执行任务”功能
4linuxfun Jan 21, 2025
c3c18b5
update:前端usePagination分页增加initialPageSize参数
4linuxfun Feb 10, 2025
7647004
update:前端-任务管理分页日志默认改为5
4linuxfun Feb 10, 2025
3544264
update:service.sh脚本增加部分逻辑判断
4linuxfun Feb 10, 2025
c75e188
update:前端usePagination补全async写法
4linuxfun Feb 11, 2025
9091c98
update:前端-补全缺失组件引用
4linuxfun Feb 11, 2025
ac23b67
fix:前端-修复主机选择关联带出bug
4linuxfun Feb 11, 2025
f9f57b1
fix:前端-修复翻页多选状态保持问题
4linuxfun Feb 12, 2025
e6a1b58
update:后端-更新config配置模式
4linuxfun Feb 12, 2025
23865ae
update:后端-修改yaml配置后的参数
4linuxfun Feb 12, 2025
e9c7183
update:后端-更新服务控制脚本
4linuxfun Feb 12, 2025
e493d2c
update:后端-修改redis过期时间
4linuxfun Feb 12, 2025
28ba966
update:后端-任务管理功能模块实现
4linuxfun Feb 12, 2025
0413a51
update:更新README.md
4linuxfun Feb 13, 2025
fd9180a
update:后端-增加生产环境配置模板
4linuxfun Feb 13, 2025
9cb6b59
update:其他
4linuxfun Feb 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,9 +92,15 @@ server {

* dev:开发环境,使用uvicorn启动
* pro:生产环境,使用gunicorn启动

```
./service.sh {dev|pro} {start|stop|restart}
```
后端需要修改对应配置文件信息
* dev: config/development.yaml
* pro: config/production.yaml

开发模式下,uvicorn服务能自动刷新,rpyc服务无法自动刷新,需要手动重启

## 约束

Expand Down
44 changes: 44 additions & 0 deletions config/development.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# 通用配置
common:
env: development
log_level: DEBUG

# FastAPI 服务器配置
server:
host: 0.0.0.0
port: 8000
debug: true
# token信息
secret_key: "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
algorithm: "HS256"
access_token_expire_minutes: 30
# 连接数据库
database_uri: "mysql+pymysql://root:123456@localhost/devops"
casbin_model_path: "server/model.conf"
# 白名单
no_verify_url:
- "/"
- "/api/login"
redis:
host: "192.168.137.129"
password: "seraphim"
port: 6379
health_check_interval: 30
# 配置连接rpyc信息
rpyc_config:
host: localhost
port: 18861
config:
allow_public_attrs: true
allow_pickle: true
keepalive: true

# RPyC Scheduler 配置
scheduler:
rpc_port: 18861
apscheduler_job_store: 'mysql+pymysql://root:[email protected]/devops'
redis:
host: "192.168.137.129"
password: "seraphim"
port: 6379
health_check_interval: 30
44 changes: 44 additions & 0 deletions config/production.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# 通用配置
common:
env: production
log_level: DEBUG

# FastAPI 服务器配置
server:
host: 0.0.0.0
port: 8000
debug: true
# token信息
secret_key: "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
algorithm: "HS256"
access_token_expire_minutes: 30
# 连接数据库
database_uri: "mysql+pymysql://root:123456@localhost/devops"
casbin_model_path: "server/model.conf"
# 白名单
no_verify_url:
- "/"
- "/api/login"
redis:
host: "192.168.137.129"
password: "seraphim"
port: 6379
health_check_interval: 30
# 配置连接rpyc信息
rpyc_config:
host: localhost
port: 18861
config:
allow_public_attrs: true
allow_pickle: true
keepalive: true

# RPyC Scheduler 配置
scheduler:
rpc_port: 18861
apscheduler_job_store: 'mysql+pymysql://root:[email protected]/devops'
redis:
host: "192.168.137.129"
password: "seraphim"
port: 6379
health_check_interval: 30
1 change: 1 addition & 0 deletions rpyc_scheduler/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from .server import main
25 changes: 16 additions & 9 deletions rpyc_scheduler/config.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,20 @@
from pydantic_settings import BaseSettings
from pydantic import MySQLDsn, RedisDsn
import yaml
import os
from pathlib import Path

# 获取环境变量,默认为 development
env = os.getenv('ENV', 'development')
config_path = Path(__file__).parent.parent / 'config' / f'{env}.yaml'

class RpcConfig(BaseSettings):
# apscheduler指定job store和excutors
apscheduler_job_store: MySQLDsn = 'mysql+pymysql://root:[email protected]/devops'
redis: dict = {'host': '192.168.137.129', 'password': 'seraphim', 'port': 6379, 'health_check_interval': 30}
rpc_port: int = 18861
# 初始化配置变量
rpc_config = {}

# 读取配置文件
def load_config():
global rpc_config
with open(config_path, 'r', encoding='utf-8') as f:
config = yaml.safe_load(f)
rpc_config.update(config['scheduler'])


rpc_config = RpcConfig()
# 加载配置
load_config()
4 changes: 2 additions & 2 deletions rpyc_scheduler/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@
from datetime import datetime
from sqlmodel import SQLModel, Field, Column, Integer, create_engine, Session
from sqlalchemy.dialects import mysql
from config import rpc_config
from .config import rpc_config
from sqlmodel import SQLModel
from typing import Union

engine = create_engine(str(rpc_config.apscheduler_job_store), pool_size=5, max_overflow=10, pool_timeout=30,
engine = create_engine(str(rpc_config['apscheduler_job_store']), pool_size=5, max_overflow=10, pool_timeout=30,
pool_pre_ping=True)


Expand Down
95 changes: 0 additions & 95 deletions rpyc_scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@
import six
import warnings
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.base import MaxInstancesReachedError, BaseExecutor
from apscheduler.util import (timedelta_seconds, TIMEOUT_MAX)
from apscheduler.events import (JobSubmissionEvent, EVENT_JOB_SUBMITTED, EVENT_JOB_MAX_INSTANCES)

try:
from collections.abc import MutableMapping
Expand Down Expand Up @@ -74,95 +71,3 @@ def get_jobs(self, jobstore=None, pending=None):
if jobstore is None or alias == jobstore:
jobs.extend(store.get_all_jobs())
return jobs

def _process_jobs(self):
"""
修改_process_jobs方法,把remove_job全部替换为pause_job,当任务执行完成后改成暂停状态
"""
if self.state == STATE_PAUSED:
self._logger.debug('Scheduler is paused -- not processing jobs')
return None

self._logger.debug('Looking for jobs to run')
now = datetime.now(self.timezone)
next_wakeup_time = None
events = []

with self._jobstores_lock:
for jobstore_alias, jobstore in six.iteritems(self._jobstores):
try:
due_jobs = jobstore.get_due_jobs(now)
except Exception as e:
# Schedule a wakeup at least in jobstore_retry_interval seconds
self._logger.warning('Error getting due jobs from job store %r: %s',
jobstore_alias, e)
retry_wakeup_time = now + timedelta(seconds=self.jobstore_retry_interval)
if not next_wakeup_time or next_wakeup_time > retry_wakeup_time:
next_wakeup_time = retry_wakeup_time

continue

for job in due_jobs:
# Look up the job's executor
try:
executor = self._lookup_executor(job.executor)
except BaseException:
self._logger.error(
'Executor lookup ("%s") failed for job "%s" -- pause it from the '
'job store', job.executor, job)
self.pause_job(job.id, jobstore_alias)
continue

run_times = job._get_run_times(now)
run_times = run_times[-1:] if run_times and job.coalesce else run_times
if run_times:
try:
executor.submit_job(job, run_times)
except MaxInstancesReachedError:
self._logger.warning(
'Execution of job "%s" skipped: maximum number of running '
'instances reached (%d)', job, job.max_instances)
event = JobSubmissionEvent(EVENT_JOB_MAX_INSTANCES, job.id,
jobstore_alias, run_times)
events.append(event)
except BaseException:
self._logger.exception('Error submitting job "%s" to executor "%s"',
job, job.executor)
else:
event = JobSubmissionEvent(EVENT_JOB_SUBMITTED, job.id, jobstore_alias,
run_times)
events.append(event)

# 如果存在next_run_time,则更新
# 否则暂停此任务
job_next_run = job.trigger.get_next_fire_time(run_times[-1], now)
if job_next_run:
job._modify(next_run_time=job_next_run)
jobstore.update_job(job)
else:
self.pause_job(job.id, jobstore_alias)

# Set a new next wakeup time if there isn't one yet or
# the jobstore has an even earlier one
jobstore_next_run_time = jobstore.get_next_run_time()
if jobstore_next_run_time and (next_wakeup_time is None or
jobstore_next_run_time < next_wakeup_time):
next_wakeup_time = jobstore_next_run_time.astimezone(self.timezone)

# Dispatch collected events
for event in events:
self._dispatch_event(event)

# Determine the delay until this method should be called again
if self.state == STATE_PAUSED:
wait_seconds = None
self._logger.debug('Scheduler is paused; waiting until resume() is called')
elif next_wakeup_time is None:
wait_seconds = None
self._logger.debug('No jobs; waiting until a job is added')
else:
wait_seconds = min(max(timedelta_seconds(next_wakeup_time - now), 0), TIMEOUT_MAX)
self._logger.debug('Next wakeup is due at %s (in %f seconds)', next_wakeup_time,
wait_seconds)

return wait_seconds
50 changes: 35 additions & 15 deletions rpyc_scheduler/scheduler-server.py → rpyc_scheduler/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,13 @@

import rpyc
from typing import List
from rpyc.utils.server import ThreadedServer
from loguru import logger

from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.date import DateTrigger
from sqlmodel import text
from tasks import *
from datetime import datetime
from loguru import logger
from config import rpc_config
from rpyc.utils.server import ThreadedServer
from rpyc_scheduler.tasks import *
from apscheduler.job import Job
# from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.events import (
EVENT_JOB_EXECUTED,
Expand All @@ -25,16 +21,33 @@
EVENT_JOB_SUBMITTED,
EVENT_JOB_REMOVED
)
from jobstore import CustomJobStore
from scheduler import CustomBackgroundScheduler
from models import engine
from rpyc_scheduler.jobstore import CustomJobStore
from rpyc_scheduler.scheduler import CustomBackgroundScheduler
from rpyc_scheduler.config import rpc_config

# 全局变量定义
scheduler = None


def print_text(*args, **kwargs):
logger.debug(args)
logger.debug(kwargs)


import sys
import pdb


def trace_calls(frame, event, arg):
if event == "call" and "apscheduler" in frame.f_globals.get("__name__", ""):
if frame.f_code.co_name == "_get_jobs":
pdb.set_trace() # 自动启动调试
return trace_calls


sys.settrace(trace_calls)


class SchedulerService(rpyc.Service):
def exposed_add_job(self, func, **kwargs):
trigger = kwargs.pop('trigger', None)
Expand All @@ -45,8 +58,10 @@ def exposed_add_job(self, func, **kwargs):
trigger = CronTrigger(minute=values[0], hour=values[1], day=values[2], month=values[3],
day_of_week=values[4], start_date=trigger_args['start_date'],
end_date=trigger_args['end_date'])
return scheduler.add_job(func, CronTrigger.from_crontab(trigger), **kwargs)
return scheduler.add_job(func, trigger=trigger, **kwargs)
elif trigger == 'date':
logger.debug(kwargs)
logger.debug(trigger, trigger_args)
return scheduler.add_job(func, DateTrigger(
run_date=trigger_args['run_date'] if trigger_args is not None else None), **kwargs)

Expand Down Expand Up @@ -107,7 +122,7 @@ def exposed_get_user_jobs(self, uid=None, job_name=None, jobstore=None) -> list[

def event_listener(event):
if event.code == EVENT_JOB_ADDED:
logger.debug('event add')
logger.debug('event add', event)
elif event.code == EVENT_JOB_EXECUTED:
logger.debug('event executed')
elif event.code == EVENT_JOB_REMOVED:
Expand All @@ -116,9 +131,10 @@ def event_listener(event):
logger.debug('event error')


if __name__ == '__main__':
def main():
global scheduler
job_store = {
'default': CustomJobStore(url=str(rpc_config.apscheduler_job_store))
'default': CustomJobStore(url=str(rpc_config['apscheduler_job_store']))
}
apscheduler_excutors = {
'default': ThreadPoolExecutor(20),
Expand All @@ -131,11 +147,15 @@ def event_listener(event):
scheduler.start()

# 启动rpyc服务
server = ThreadedServer(SchedulerService, port=rpc_config.rpc_port,
server = ThreadedServer(SchedulerService, port=rpc_config['rpc_port'],
protocol_config={'allow_public_attrs': True, 'allow_pickle': True})
try:
server.start()
except (KeyboardInterrupt, SystemExit):
pass
finally:
scheduler.shutdown()


if __name__ == "__main__":
main()
Loading