Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
079438c
后端:添加rpyc+apscheduler服务
4linuxfun Feb 24, 2023
30c6b8e
后端:rpyc+apscheduler服务 fix #100
4linuxfun Feb 24, 2023
46d69e6
后端: 添加任务管理对应接口 fix #100
4linuxfun Feb 24, 2023
9532018
前端:任务中心-任务管理:功能实现 fix #100
4linuxfun Feb 24, 2023
d4e6530
前端:组件封装
4linuxfun Feb 24, 2023
6057efe
后端:通过rpyc实现apscheduler的远程调用,进行任务的管理
4linuxfun Jun 19, 2023
065e8f0
后端:添加Job相关model和crud
4linuxfun Jun 19, 2023
d86d098
后端:添加get_uid依赖
4linuxfun Jun 20, 2023
b9c9caf
后端:修改使用get_uid获取uid
4linuxfun Jun 20, 2023
ce6ee7c
后端:增加get_redis函数,用户获取redis连接
4linuxfun Jun 20, 2023
39ca9f2
后端:实现ws获取redis中的日志信息
4linuxfun Jun 20, 2023
b0b99bf
后端:修改job项目目录
4linuxfun Jun 20, 2023
c297407
后端:添加任务管理相关借口
4linuxfun Jun 20, 2023
9980ee6
前端:实现任务管理相关功能页面
4linuxfun Jun 20, 2023
95d60e5
后端:scheduler后端task进行执行主机任务分离,方便后期扩展执行远程ssh
4linuxfun Jun 28, 2023
80a277f
后端:job增加targets参数,用于指定任务执行主机
4linuxfun Jun 28, 2023
0a438e2
前端:修复升级element plus后中文乱码问题
4linuxfun Jun 28, 2023
cb45acf
前端:任务管理增加targets参数
4linuxfun Jun 28, 2023
f0e7cc5
后端:scheduer增加get_user_jobs,通过uid和job_name快速获取用户的job列表
4linuxfun Jun 30, 2023
2486db8
后端:scheduler task增加时间值
4linuxfun Jun 30, 2023
7e9e4bf
后端:优化任务获取逻辑
4linuxfun Jun 30, 2023
6cee979
后端:重新定义Job相关模型
4linuxfun Jun 30, 2023
0bb4ba9
前端:优化任务管理页面相关代码
4linuxfun Jun 30, 2023
1ce9cc8
前端:任务日志页面展示
4linuxfun Jun 30, 2023
7fd5c91
更新前后端项目依赖版本
4linuxfun Dec 21, 2023
3dc9ea1
update:更新依赖库版本
4linuxfun Jan 3, 2024
58d095b
后端:修复更新pydantic后属性变更
4linuxfun Jan 3, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
后端:添加rpyc+apscheduler服务
  • Loading branch information
4linuxfun committed Feb 24, 2023
commit 079438ca0bfbfe31fb61cc0bee1f4dc6dd4b672c
Empty file added rpyc_scheduler/config.py
Empty file.
Empty file added rpyc_scheduler/models.py
Empty file.
128 changes: 128 additions & 0 deletions rpyc_scheduler/scheduler-server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
"""
远程调用apscheduler,参考:
https://github.com/agronholm/apscheduler/tree/3.x/examples/rpc
https://gist.github.com/gsw945/15cbb71eaca5be66787a2c187414e36f
"""

import redis
import rpyc
from loguru import logger
from typing import Dict
from rpyc.utils.server import ThreadedServer
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor
from apscheduler.events import (
EVENT_JOB_EXECUTED,
EVENT_JOB_ERROR,
EVENT_JOB_ADDED,
EVENT_JOB_SUBMITTED,
EVENT_JOB_REMOVED
)


class Channel:
def __init__(self, redis_config, task_id):
logger.debug(f'__init__,redis:{redis_config},task id:{task_id}')
self.conn = redis.Redis(**redis_config, decode_responses=True)
self.task_id = task_id
self.task_key = f"tasks:{self.task_id}"
self._expire = 60

@property
def msg(self, ):
return self.conn.xrange(self.task_key, '-', '+')

@property
def expire(self, ):
return self._expire

@expire.setter
def expire(self, value):
self._expire = value

def send(self, msg: Dict[Any, Any]):
self.conn.xadd(self.task_key, msg)

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.conn.expire(self.task_key, self._expire)
self.close()

def close(self, ):
self.conn.close()


def subprocess_with_channel(task_id, command):
with Channel(redis_config, task_id=task_id) as channel:
channel.send({'msg': '开始执行任务:'})
channel.send({'msg': f"执行命令:{command}"})
process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True,
shell=True)
while process.poll() is None:
message = process.stdout.readline()
channel.send({'msg': message})
channel.send({'msg': '结束执行任务'})
results = channel.msg
return results


def print_text(text):
logger.debug(text)


class SchedulerService(rpyc.Service):
def exposed_add_job(self, func, *args, **kwargs):
return scheduler.add_job(func, *args, **kwargs)

def exposed_modify_job(self, job_id, jobstore=None, **changes):
return scheduler.modify_job(job_id, jobstore, **changes)

def exposed_pause_job(self, job_id, jobstore=None):
return scheduler.pause_job(job_id, jobstore)

def exposed_resume_job(self, job_id, jobstore=None):
return scheduler.resume_job(job_id, jobstore)

def exposed_remove_job(self, job_id, jobstore=None):
return scheduler.remove_job(job_id, jobstore)

def exposed_get_jobs(self, jobstore=None):
return scheduler.get_jobs(jobstore)


def event_listener(event):
if event.code == EVENT_JOB_ADDED:
logger.debug('event add')
elif event.code == EVENT_JOB_EXECUTED:
logger.debug('event executed')
elif event.code == EVENT_JOB_REMOVED:
logger.debug('event removed')
elif event.code == EVENT_JOB_ERROR:
logger.debug('event error')


if __name__ == '__main__':
# apscheduler相关服务启动
jobstores = {
'default': SQLAlchemyJobStore(url='mysql+pymysql://root:[email protected]/devops')
}
excutors = {
'default': ThreadPoolExecutor(20),
'processpool': ProcessPoolExecutor(20)
}
scheduler = BackgroundScheduler(jobstores=jobstores, excutors=excutors)
scheduler.add_listener(event_listener,
EVENT_JOB_EXECUTED | EVENT_JOB_ERROR | EVENT_JOB_ADDED | EVENT_JOB_REMOVED | EVENT_JOB_SUBMITTED)
scheduler.start()

# 启动rpyc服务
server = ThreadedServer(SchedulerService, port=18861, protocol_config={'allow_public_attrs': True})
try:
server.start()
except (KeyboardInterrupt, SystemExit):
pass
finally:
scheduler.shutdown()
Empty file added rpyc_scheduler/tasks.py
Empty file.
37 changes: 37 additions & 0 deletions rpyc_scheduler/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import redis
from loguru import logger
from typing import Dict, Any


class Channel:
def __init__(self, redis_config, task_id):
logger.debug(f'__init__,redis:{redis_config},task id:{task_id}')
self.conn = redis.Redis(**redis_config, decode_responses=True)
self.task_id = task_id
self.task_key = f"tasks:{self.task_id}"
self._expire = 60

@property
def msg(self, ):
return self.conn.xrange(self.task_key, '-', '+')

@property
def expire(self, ):
return self._expire

@expire.setter
def expire(self, value):
self._expire = value

def send(self, msg: Dict[Any, Any]):
self.conn.xadd(self.task_key, msg)

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
self.conn.expire(self.task_key, self._expire)
self.close()

def close(self, ):
self.conn.close()