forked from MemTensor/MemOS
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils.py
More file actions
122 lines (97 loc) · 3.88 KB
/
utils.py
File metadata and controls
122 lines (97 loc) · 3.88 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import functools
import time
import traceback
from memos.log import get_logger
logger = get_logger(__name__)
def timed_with_status(
func=None,
*,
log_prefix="",
log_args=None,
log_extra_args=None,
fallback=None,
):
"""
Parameters:
- log: enable timing logs (default True)
- log_prefix: prefix; falls back to function name
- log_args: names to include in logs (str or list/tuple of str), values are taken from kwargs by name.
- log_extra_args:
- can be a dict: fixed contextual fields that are always attached to logs;
- or a callable: like `fn(*args, **kwargs) -> dict`, used to dynamically generate contextual fields at runtime.
"""
if isinstance(log_args, str):
effective_log_args = [log_args]
else:
effective_log_args = list(log_args) if log_args else []
def decorator(fn):
@functools.wraps(fn)
def wrapper(*args, **kwargs):
start = time.perf_counter()
exc_type = None
exc_message = None
result = None
success_flag = False
try:
result = fn(*args, **kwargs)
success_flag = True
return result
except Exception as e:
exc_type = type(e)
exc_message = traceback.format_exc()
success_flag = False
if fallback is not None and callable(fallback):
result = fallback(e, *args, **kwargs)
return result
finally:
elapsed_ms = (time.perf_counter() - start) * 1000.0
ctx_parts = []
# 1) Collect parameters from kwargs by name
for key in effective_log_args:
val = kwargs.get(key)
ctx_parts.append(f"{key}={val}")
# 2) Support log_extra_args as dict or callable, so we can dynamically
# extract values from self or other runtime context
extra_items = {}
try:
if callable(log_extra_args):
extra_items = log_extra_args(*args, **kwargs) or {}
elif isinstance(log_extra_args, dict):
extra_items = log_extra_args
except Exception as e:
logger.warning(f"[TIMER_WITH_STATUS] log_extra_args callback error: {e!r}")
if extra_items:
ctx_parts.extend(f"{key}={val}" for key, val in extra_items.items())
ctx_str = f" [{', '.join(ctx_parts)}]" if ctx_parts else ""
status = "SUCCESS" if success_flag else "FAILED"
status_info = f", status: {status}"
if not success_flag and exc_type is not None:
status_info += (
f", error_type: {exc_type.__name__}, error_message: {exc_message}"
)
msg = (
f"[TIMER_WITH_STATUS] {log_prefix or fn.__name__} "
f"took {elapsed_ms:.0f} ms{status_info}, args: {ctx_str}"
)
logger.info(msg)
return wrapper
if func is None:
return decorator
return decorator(func)
def timed(func=None, *, log=True, log_prefix=""):
def decorator(fn):
def wrapper(*args, **kwargs):
start = time.perf_counter()
result = fn(*args, **kwargs)
elapsed_ms = (time.perf_counter() - start) * 1000.0
if log is not True:
return result
# 100ms threshold
if elapsed_ms >= 100.0:
logger.info(f"[TIMER] {log_prefix or fn.__name__} took {elapsed_ms:.0f} ms")
return result
return wrapper
# Handle both @timed and @timed(log=True) cases
if func is None:
return decorator
return decorator(func)