Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
079438c
后端:添加rpyc+apscheduler服务
4linuxfun Feb 24, 2023
30c6b8e
后端:rpyc+apscheduler服务 fix #100
4linuxfun Feb 24, 2023
46d69e6
后端: 添加任务管理对应接口 fix #100
4linuxfun Feb 24, 2023
9532018
前端:任务中心-任务管理:功能实现 fix #100
4linuxfun Feb 24, 2023
d4e6530
前端:组件封装
4linuxfun Feb 24, 2023
6057efe
后端:通过rpyc实现apscheduler的远程调用,进行任务的管理
4linuxfun Jun 19, 2023
065e8f0
后端:添加Job相关model和crud
4linuxfun Jun 19, 2023
d86d098
后端:添加get_uid依赖
4linuxfun Jun 20, 2023
b9c9caf
后端:修改使用get_uid获取uid
4linuxfun Jun 20, 2023
ce6ee7c
后端:增加get_redis函数,用户获取redis连接
4linuxfun Jun 20, 2023
39ca9f2
后端:实现ws获取redis中的日志信息
4linuxfun Jun 20, 2023
b0b99bf
后端:修改job项目目录
4linuxfun Jun 20, 2023
c297407
后端:添加任务管理相关借口
4linuxfun Jun 20, 2023
9980ee6
前端:实现任务管理相关功能页面
4linuxfun Jun 20, 2023
95d60e5
后端:scheduler后端task进行执行主机任务分离,方便后期扩展执行远程ssh
4linuxfun Jun 28, 2023
80a277f
后端:job增加targets参数,用于指定任务执行主机
4linuxfun Jun 28, 2023
0a438e2
前端:修复升级element plus后中文乱码问题
4linuxfun Jun 28, 2023
cb45acf
前端:任务管理增加targets参数
4linuxfun Jun 28, 2023
f0e7cc5
后端:scheduer增加get_user_jobs,通过uid和job_name快速获取用户的job列表
4linuxfun Jun 30, 2023
2486db8
后端:scheduler task增加时间值
4linuxfun Jun 30, 2023
7e9e4bf
后端:优化任务获取逻辑
4linuxfun Jun 30, 2023
6cee979
后端:重新定义Job相关模型
4linuxfun Jun 30, 2023
0bb4ba9
前端:优化任务管理页面相关代码
4linuxfun Jun 30, 2023
1ce9cc8
前端:任务日志页面展示
4linuxfun Jun 30, 2023
7fd5c91
更新前后端项目依赖版本
4linuxfun Dec 21, 2023
3dc9ea1
update:更新依赖库版本
4linuxfun Jan 3, 2024
58d095b
后端:修复更新pydantic后属性变更
4linuxfun Jan 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
后端:通过rpyc实现apscheduler的远程调用,进行任务的管理
  • Loading branch information
4linuxfun committed Jun 19, 2023
commit 6057efe9877f1fb3914a98c695b10613eeb947a7
38 changes: 38 additions & 0 deletions rpyc_scheduler/job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from apscheduler.jobstores.base import BaseJobStore, JobLookupError, ConflictingIdError
from apscheduler.util import maybe_ref, datetime_to_utc_timestamp, utc_timestamp_to_datetime
from apscheduler.job import Job
try:
import cPickle as pickle
except ImportError: # pragma: nocover
import pickle

try:
from sqlalchemy import (
create_engine, Table, Column, MetaData, Unicode, Float, LargeBinary, select, and_)
from sqlalchemy.exc import IntegrityError
from sqlalchemy.sql.expression import null
except ImportError: # pragma: nocover
raise ImportError('SQLAlchemyJobStore requires SQLAlchemy installed')


class MyJobStore(BaseJobStore):
"""
参照apscheduler自带的SQLAlchemyJobStore,重写JobStore
1. 添加额外的字段
2. 重写部分方法,实现Job的保持
"""

def __init__(self, url=None, engine=None, tablename='apscheduler_jobs', metadata=None,
pickle_protocol=pickle.HIGHEST_PROTOCOL, tableschema=None, engine_options=None):
super().__init__(url=None, engine=None, tablename='apscheduler_jobs', metadata=None,
pickle_protocol=pickle.HIGHEST_PROTOCOL, tableschema=None, engine_options=None)
self.jobs_t = Table(
tablename, metadata,
Column('id', Unicode(191), primary_key=True),
Column('next_run_time', Float(25), index=True),
Column('job_state', LargeBinary, nullable=False),
schema=tableschema
)

def start(self, scheduler, alias):
super().start(scheduler, alias)
11 changes: 0 additions & 11 deletions rpyc_scheduler/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,6 @@
from config import rpc_config


class TaskLog(SQLModel, table=True):
id: Optional[int] = Field(sa_column=Column('id', Integer, primary_key=True, autoincrement=True))
task_id: str = Field(max_length=20, sa_column_kwargs={'comment': '任务名'})
status: int = Field(default=0, sa_column=Column(mysql.TINYINT, comment='执行命令返回状态'))
exe_time: datetime = Field(default_factory=datetime.now, sa_column_kwargs={'comment': '任务执行时间'})
cmd: str = Field(sa_column_kwargs={'comment': '执行命令'})
type: int = Field(default=0, sa_column=Column(mysql.TINYINT, comment='类型:0单行命令,1:脚本文件'))
stdout: str = Field(sa_column=Column(mysql.MEDIUMTEXT, comment='执行日志'))


engine = create_engine(rpc_config.apscheduler_job_store, pool_size=5, max_overflow=10, pool_timeout=30,
pool_pre_ping=True)
# SQLModel.metadata.create_all(engine)
session = Session(engine)
25 changes: 21 additions & 4 deletions rpyc_scheduler/scheduler-server.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@
"""

import rpyc
from tasks import subprocess_with_channel
from tasks import run_command_with_channel
from datetime import datetime
from loguru import logger
from config import rpc_config
from rpyc.utils.server import ThreadedServer
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.events import (
Expand All @@ -20,6 +20,8 @@
EVENT_JOB_REMOVED
)

from scheduler import CustomScheduler


def print_text(*args, **kwargs):
logger.debug(args)
Expand Down Expand Up @@ -48,6 +50,21 @@ def exposed_get_jobs(self, jobstore=None):
def exposed_get_job(self, job_id, jobstore=None):
return scheduler.get_job(job_id, jobstore)

def exposed_switch_job(self, job_id, jobstore=None):
"""
任务状态切换,暂停、启用
"""
job = scheduler.get_job(job_id, jobstore)
if job.next_run_time is None:
now = datetime.now(job.trigger.timezone)
next_fire_time = job.trigger.get_next_fire_time(None, now)
if next_fire_time:
scheduler.resume_job(job_id, jobstore)
else:
raise ValueError('无法指定下次运行时间,请确认任务时间配置')
else:
scheduler.pause_job(job_id, jobstore)


def event_listener(event):
if event.code == EVENT_JOB_ADDED:
Expand All @@ -68,8 +85,8 @@ def event_listener(event):
'default': ThreadPoolExecutor(20),
'processpool': ProcessPoolExecutor(20)
}
scheduler = BackgroundScheduler(jobstores=job_store,
excutors=apscheduler_excutors)
scheduler = CustomScheduler(jobstores=job_store,
excutors=apscheduler_excutors)
scheduler.add_listener(event_listener,
EVENT_JOB_EXECUTED | EVENT_JOB_ERROR | EVENT_JOB_ADDED | EVENT_JOB_REMOVED | EVENT_JOB_SUBMITTED)
scheduler.start()
Expand Down
114 changes: 114 additions & 0 deletions rpyc_scheduler/scheduler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
from __future__ import print_function

from datetime import datetime, timedelta
import six

from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.base import MaxInstancesReachedError, BaseExecutor
from apscheduler.util import (timedelta_seconds, TIMEOUT_MAX)
from apscheduler.events import (JobSubmissionEvent, EVENT_JOB_SUBMITTED, EVENT_JOB_MAX_INSTANCES)

try:
from collections.abc import MutableMapping
except ImportError:
from collections import MutableMapping

STATE_STOPPED = 0
#: constant indicating a scheduler's running state (started and processing jobs)
STATE_RUNNING = 1
#: constant indicating a scheduler's paused state (started but not processing jobs)
STATE_PAUSED = 2


class CustomScheduler(BackgroundScheduler):
def _process_jobs(self):
"""
修改_process_jobs方法,把remove_job全部替换为pause_job,当任务执行完成后改成暂停状态
"""
if self.state == STATE_PAUSED:
self._logger.debug('Scheduler is paused -- not processing jobs')
return None

self._logger.debug('Looking for jobs to run')
now = datetime.now(self.timezone)
next_wakeup_time = None
events = []

with self._jobstores_lock:
for jobstore_alias, jobstore in six.iteritems(self._jobstores):
try:
due_jobs = jobstore.get_due_jobs(now)
except Exception as e:
# Schedule a wakeup at least in jobstore_retry_interval seconds
self._logger.warning('Error getting due jobs from job store %r: %s',
jobstore_alias, e)
retry_wakeup_time = now + timedelta(seconds=self.jobstore_retry_interval)
if not next_wakeup_time or next_wakeup_time > retry_wakeup_time:
next_wakeup_time = retry_wakeup_time

continue

for job in due_jobs:
# Look up the job's executor
try:
executor = self._lookup_executor(job.executor)
except BaseException:
self._logger.error(
'Executor lookup ("%s") failed for job "%s" -- pause it from the '
'job store', job.executor, job)
self.pause_job(job.id, jobstore_alias)
continue

run_times = job._get_run_times(now)
run_times = run_times[-1:] if run_times and job.coalesce else run_times
if run_times:
try:
executor.submit_job(job, run_times)
except MaxInstancesReachedError:
self._logger.warning(
'Execution of job "%s" skipped: maximum number of running '
'instances reached (%d)', job, job.max_instances)
event = JobSubmissionEvent(EVENT_JOB_MAX_INSTANCES, job.id,
jobstore_alias, run_times)
events.append(event)
except BaseException:
self._logger.exception('Error submitting job "%s" to executor "%s"',
job, job.executor)
else:
event = JobSubmissionEvent(EVENT_JOB_SUBMITTED, job.id, jobstore_alias,
run_times)
events.append(event)

# 如果存在next_run_time,则更新
# 否则暂停此任务
job_next_run = job.trigger.get_next_fire_time(run_times[-1], now)
if job_next_run:
job._modify(next_run_time=job_next_run)
jobstore.update_job(job)
else:
self.pause_job(job.id, jobstore_alias)

# Set a new next wakeup time if there isn't one yet or
# the jobstore has an even earlier one
jobstore_next_run_time = jobstore.get_next_run_time()
if jobstore_next_run_time and (next_wakeup_time is None or
jobstore_next_run_time < next_wakeup_time):
next_wakeup_time = jobstore_next_run_time.astimezone(self.timezone)

# Dispatch collected events
for event in events:
self._dispatch_event(event)

# Determine the delay until this method should be called again
if self.state == STATE_PAUSED:
wait_seconds = None
self._logger.debug('Scheduler is paused; waiting until resume() is called')
elif next_wakeup_time is None:
wait_seconds = None
self._logger.debug('No jobs; waiting until a job is added')
else:
wait_seconds = min(max(timedelta_seconds(next_wakeup_time - now), 0), TIMEOUT_MAX)
self._logger.debug('Next wakeup is due at %s (in %f seconds)', next_wakeup_time,
wait_seconds)

return wait_seconds
27 changes: 18 additions & 9 deletions rpyc_scheduler/tasks.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,35 @@
import subprocess
import json
from datetime import datetime
from loguru import logger
from sqlmodel import text
from utils import Channel
from config import rpc_config
from models import session, TaskLog
from models import engine


def subprocess_with_channel(task_id=None, command=None):
def run_command_with_channel(job_id=None, command=None):
"""
本地执行命令,并把任务实时输入redis中。任务结束后,日志写入数据库中
:param job_id:任务名
:param command: 执行的命令
"""
with Channel(rpc_config.redis, task_id=task_id) as channel:
logger.debug(f"task is:{task_id}")
with Channel(rpc_config.redis, job_id=job_id) as channel:
logger.debug(f"job id:{job_id}")
channel.send({'msg': '开始执行任务:'})
channel.send({'msg': f"执行命令:{command}"})
start_time = datetime.now()
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True,
shell=True)
while process.poll() is None:
while (status := process.poll()) is None:
message = process.stdout.readline()
channel.send({'msg': message})
channel.send({'msg': '结束执行任务'})
task_log = TaskLog(task_id=task_id, status=process.returncode, cmd=command, type=0,
stdout=json.dumps(channel.msg))
session.add(task_log)
session.commit()
end_time = datetime.now()
with engine.connect() as conn:
sql = text(
"INSERT INTO job_log (status,job_id,start_time,end_time,log) values (:status,:job_id,:start_time,:end_time,:log)")
conn.execute(sql,
{'status': status, 'job_id': job_id, 'start_time': start_time, 'end_time': end_time,
'log': json.dumps(channel.msg)})
conn.commit()
9 changes: 6 additions & 3 deletions rpyc_scheduler/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@


class Channel:
def __init__(self, redis_config, task_id):
def __init__(self, redis_config, job_id):
self.conn = redis.Redis(**redis_config, decode_responses=True)
self.task_id = task_id
self.task_key = f"tasks:{self.task_id}"
self.job_id = job_id
self.task_key = f"tasks:{self.job_id}"
self._expire = 60

@property
Expand All @@ -25,6 +25,9 @@ def expire(self, value: int):
def send(self, msg: Dict[Any, Any]):
self.conn.xadd(self.task_key, msg)

def delete(self, ):
self.conn.delete(self.task_key)

def __enter__(self):
return self

Expand Down