Skip to content
Merged
Changes from 1 commit
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
53ba21d
update:前端-优化useTerm实现代码
4linuxfun Jan 17, 2025
2143ded
update:后端-简化scheduler任务,删除不必要代码
4linuxfun Jan 21, 2025
8daef2b
update:后端-scheduler完善
4linuxfun Jan 21, 2025
ae16ea1
update:后端-排除websocket权限验证
4linuxfun Jan 21, 2025
19e8bad
update:后端-实现“任务管理”模块功能
4linuxfun Jan 21, 2025
f2bf6d5
update:前端-重构usePagination
4linuxfun Jan 21, 2025
bf8f03c
update:前端-实现“执行任务”功能
4linuxfun Jan 21, 2025
c3c18b5
update:前端usePagination分页增加initialPageSize参数
4linuxfun Feb 10, 2025
7647004
update:前端-任务管理分页日志默认改为5
4linuxfun Feb 10, 2025
3544264
update:service.sh脚本增加部分逻辑判断
4linuxfun Feb 10, 2025
c75e188
update:前端usePagination补全async写法
4linuxfun Feb 11, 2025
9091c98
update:前端-补全缺失组件引用
4linuxfun Feb 11, 2025
ac23b67
fix:前端-修复主机选择关联带出bug
4linuxfun Feb 11, 2025
f9f57b1
fix:前端-修复翻页多选状态保持问题
4linuxfun Feb 12, 2025
e6a1b58
update:后端-更新config配置模式
4linuxfun Feb 12, 2025
23865ae
update:后端-修改yaml配置后的参数
4linuxfun Feb 12, 2025
e9c7183
update:后端-更新服务控制脚本
4linuxfun Feb 12, 2025
e493d2c
update:后端-修改redis过期时间
4linuxfun Feb 12, 2025
28ba966
update:后端-任务管理功能模块实现
4linuxfun Feb 12, 2025
0413a51
update:更新README.md
4linuxfun Feb 13, 2025
fd9180a
update:后端-增加生产环境配置模板
4linuxfun Feb 13, 2025
9cb6b59
update:其他
4linuxfun Feb 13, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
update:后端-简化scheduler任务,删除不必要代码
  • Loading branch information
4linuxfun committed Jan 21, 2025
commit 2143ded72cfdb02f487afdb619a5473f577e4821
95 changes: 0 additions & 95 deletions rpyc_scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@
import six
import warnings
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.base import MaxInstancesReachedError, BaseExecutor
from apscheduler.util import (timedelta_seconds, TIMEOUT_MAX)
from apscheduler.events import (JobSubmissionEvent, EVENT_JOB_SUBMITTED, EVENT_JOB_MAX_INSTANCES)

try:
from collections.abc import MutableMapping
Expand Down Expand Up @@ -74,95 +71,3 @@ def get_jobs(self, jobstore=None, pending=None):
if jobstore is None or alias == jobstore:
jobs.extend(store.get_all_jobs())
return jobs

def _process_jobs(self):
"""
修改_process_jobs方法,把remove_job全部替换为pause_job,当任务执行完成后改成暂停状态
"""
if self.state == STATE_PAUSED:
self._logger.debug('Scheduler is paused -- not processing jobs')
return None

self._logger.debug('Looking for jobs to run')
now = datetime.now(self.timezone)
next_wakeup_time = None
events = []

with self._jobstores_lock:
for jobstore_alias, jobstore in six.iteritems(self._jobstores):
try:
due_jobs = jobstore.get_due_jobs(now)
except Exception as e:
# Schedule a wakeup at least in jobstore_retry_interval seconds
self._logger.warning('Error getting due jobs from job store %r: %s',
jobstore_alias, e)
retry_wakeup_time = now + timedelta(seconds=self.jobstore_retry_interval)
if not next_wakeup_time or next_wakeup_time > retry_wakeup_time:
next_wakeup_time = retry_wakeup_time

continue

for job in due_jobs:
# Look up the job's executor
try:
executor = self._lookup_executor(job.executor)
except BaseException:
self._logger.error(
'Executor lookup ("%s") failed for job "%s" -- pause it from the '
'job store', job.executor, job)
self.pause_job(job.id, jobstore_alias)
continue

run_times = job._get_run_times(now)
run_times = run_times[-1:] if run_times and job.coalesce else run_times
if run_times:
try:
executor.submit_job(job, run_times)
except MaxInstancesReachedError:
self._logger.warning(
'Execution of job "%s" skipped: maximum number of running '
'instances reached (%d)', job, job.max_instances)
event = JobSubmissionEvent(EVENT_JOB_MAX_INSTANCES, job.id,
jobstore_alias, run_times)
events.append(event)
except BaseException:
self._logger.exception('Error submitting job "%s" to executor "%s"',
job, job.executor)
else:
event = JobSubmissionEvent(EVENT_JOB_SUBMITTED, job.id, jobstore_alias,
run_times)
events.append(event)

# 如果存在next_run_time,则更新
# 否则暂停此任务
job_next_run = job.trigger.get_next_fire_time(run_times[-1], now)
if job_next_run:
job._modify(next_run_time=job_next_run)
jobstore.update_job(job)
else:
self.pause_job(job.id, jobstore_alias)

# Set a new next wakeup time if there isn't one yet or
# the jobstore has an even earlier one
jobstore_next_run_time = jobstore.get_next_run_time()
if jobstore_next_run_time and (next_wakeup_time is None or
jobstore_next_run_time < next_wakeup_time):
next_wakeup_time = jobstore_next_run_time.astimezone(self.timezone)

# Dispatch collected events
for event in events:
self._dispatch_event(event)

# Determine the delay until this method should be called again
if self.state == STATE_PAUSED:
wait_seconds = None
self._logger.debug('Scheduler is paused; waiting until resume() is called')
elif next_wakeup_time is None:
wait_seconds = None
self._logger.debug('No jobs; waiting until a job is added')
else:
wait_seconds = min(max(timedelta_seconds(next_wakeup_time - now), 0), TIMEOUT_MAX)
self._logger.debug('Next wakeup is due at %s (in %f seconds)', next_wakeup_time,
wait_seconds)

return wait_seconds