This repository was archived by the owner on Apr 14, 2026. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathcache.py
More file actions
557 lines (470 loc) · 21.2 KB
/
cache.py
File metadata and controls
557 lines (470 loc) · 21.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
"""
LLM Semantic Cache for NOMYO Router.
Strategy:
- Namespace: sha256(route :: model :: system_prompt)[:16] — exact context isolation
- Cache key: hash(normalize(last_user_message), namespace) — exact lookup
- Embedding: weighted mean of
α * embed(bm25_weighted(chat_history)) — conversation context
1-α * embed(last_user_message) — the actual question
with α = cache_history_weight (default 0.3).
- Exact-match caching (similarity=1.0) uses DummyEmbeddingProvider — zero extra deps.
- Semantic caching (similarity<1.0) requires sentence-transformers. If missing the
library falls back to exact-match with a warning (lean Docker image behaviour).
- MOE models (moe-*) always bypass the cache.
- Token counts are never recorded for cache hits.
- Streaming cache hits are served as a single-chunk response.
- Privacy protection: responses that echo back user-identifying tokens from the system
prompt (names, emails, IDs) are stored WITHOUT an embedding. They remain findable
by exact-match for the same user but are invisible to cross-user semantic search.
Generic responses (capital of France → "Paris") keep their embeddings and can still
produce cross-user semantic hits as intended.
"""
import hashlib
import math
import re
import time
import warnings
from collections import Counter
from typing import Any, Optional
# Lazily resolved once at first embed() call
_semantic_available: Optional[bool] = None
def _check_sentence_transformers() -> bool:
global _semantic_available
if _semantic_available is None:
try:
import sentence_transformers # noqa: F401
_semantic_available = True
except ImportError:
_semantic_available = False
return _semantic_available # type: ignore[return-value]
# ---------------------------------------------------------------------------
# BM25-weighted text representation of chat history
# ---------------------------------------------------------------------------
def _bm25_weighted_text(history: list[dict]) -> str:
"""
Produce a BM25-importance-weighted text string from chat history turns.
High-IDF (rare, domain-specific) terms are repeated proportionally to
their BM25 score so the downstream sentence-transformer embedding
naturally upweights topical signal and downweights stop words.
"""
docs = [m.get("content", "") for m in history if m.get("content")]
if not docs:
return ""
def _tok(text: str) -> list[str]:
return [w.lower() for w in text.split() if len(w) > 2]
tokenized = [_tok(d) for d in docs]
N = len(tokenized)
df: Counter = Counter()
for tokens in tokenized:
for term in set(tokens):
df[term] += 1
k1, b = 1.5, 0.75
avg_dl = sum(len(t) for t in tokenized) / max(N, 1)
term_scores: Counter = Counter()
for tokens in tokenized:
tf_c = Counter(tokens)
dl = len(tokens)
for term, tf in tf_c.items():
idf = math.log((N + 1) / (df[term] + 1)) + 1.0
score = idf * (tf * (k1 + 1)) / (tf + k1 * (1 - b + b * dl / max(avg_dl, 1)))
term_scores[term] += score
top = term_scores.most_common(50)
if not top:
return " ".join(docs)
max_s = top[0][1]
out: list[str] = []
for term, score in top:
out.extend([term] * max(1, round(3 * score / max_s)))
return " ".join(out)
# ---------------------------------------------------------------------------
# LLMCache
# ---------------------------------------------------------------------------
class LLMCache:
"""
Thin async wrapper around async-semantic-llm-cache that adds:
- Route-aware namespace isolation
- Two-vector weighted-mean embedding (history context + question)
- Per-instance hit/miss counters
- Graceful fallback when sentence-transformers is absent
"""
def __init__(self, cfg: Any) -> None:
self._cfg = cfg
self._backend: Any = None
self._emb_cache: Any = None
self._semantic: bool = False
self._hits: int = 0
self._misses: int = 0
async def init(self) -> None:
from semantic_llm_cache.similarity import EmbeddingCache
# --- Backend ---
backend_type: str = self._cfg.cache_backend
if backend_type == "sqlite":
from semantic_llm_cache.backends.sqlite import SQLiteBackend
self._backend = SQLiteBackend(db_path=self._cfg.cache_db_path)
elif backend_type == "redis":
from semantic_llm_cache.backends.redis import RedisBackend
self._backend = RedisBackend(url=self._cfg.cache_redis_url)
await self._backend.ping()
else:
from semantic_llm_cache.backends.memory import MemoryBackend
self._backend = MemoryBackend()
# --- Embedding provider ---
if self._cfg.cache_similarity < 1.0:
if _check_sentence_transformers():
from semantic_llm_cache.similarity import create_embedding_provider
provider = create_embedding_provider("sentence-transformer")
self._emb_cache = EmbeddingCache(provider=provider)
self._semantic = True
print(
f"[cache] Semantic cache ready "
f"(similarity≥{self._cfg.cache_similarity}, backend={backend_type})"
)
else:
warnings.warn(
"[cache] sentence-transformers is not installed. "
"Falling back to exact-match caching (similarity=1.0). "
"Use the :semantic Docker image tag to enable semantic caching.",
RuntimeWarning,
stacklevel=2,
)
self._emb_cache = EmbeddingCache() # DummyEmbeddingProvider
print(f"[cache] Exact-match cache ready (backend={backend_type}) [semantic unavailable]")
else:
self._emb_cache = EmbeddingCache() # DummyEmbeddingProvider
print(f"[cache] Exact-match cache ready (backend={backend_type})")
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _namespace(self, route: str, model: str, system: str) -> str:
raw = f"{route}::{model}::{system}"
return hashlib.sha256(raw.encode()).hexdigest()[:16]
def _cache_key(self, namespace: str, last_user: str) -> str:
from semantic_llm_cache.utils import hash_prompt, normalize_prompt
return hash_prompt(normalize_prompt(last_user), namespace)
# ------------------------------------------------------------------
# Privacy helpers — prevent cross-user leakage of personal data
# ------------------------------------------------------------------
_IDENTITY_STOPWORDS = frozenset({
"user", "users", "name", "names", "email", "phone", "their", "they",
"this", "that", "with", "from", "have", "been", "also", "more",
"tags", "identity", "preference", "context",
})
# Patterns that unambiguously signal personal data in a response
_EMAIL_RE = re.compile(r'\b[\w.%+-]+@[\w.-]+\.[a-zA-Z]{2,}\b')
_UUID_RE = re.compile(
r'\b[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}\b',
re.IGNORECASE,
)
# Standalone numeric run ≥ 8 digits (common user/account IDs)
_NUMERIC_ID_RE = re.compile(r'\b\d{8,}\b')
def _extract_response_content(self, response_bytes: bytes) -> str:
"""Parse response bytes (Ollama or OpenAI format) and return the text content."""
try:
import orjson
data = orjson.loads(response_bytes)
if "choices" in data: # OpenAI ChatCompletion
return (data["choices"][0].get("message") or {}).get("content", "")
if "message" in data: # Ollama chat
return (data.get("message") or {}).get("content", "")
if "response" in data: # Ollama generate
return data.get("response", "")
except Exception:
pass
return ""
def _extract_personal_tokens(self, system: str) -> frozenset[str]:
"""
Extract user-identifying tokens from the system prompt.
Looks for:
- Email addresses anywhere in the system prompt
- Numeric IDs (≥ 4 digits) appearing after "id" keyword
- Proper-noun-like words from [Tags: identity] lines
"""
if not system:
return frozenset()
tokens: set[str] = set()
# Email addresses
for email in self._EMAIL_RE.findall(system):
tokens.add(email.lower())
# Numeric IDs: "id: 1234", "id=5678"
for uid in re.findall(r'\bid\s*[=:]?\s*(\d{4,})\b', system, re.IGNORECASE):
tokens.add(uid)
# Values from [Tags: identity] lines (e.g. "User's name is Andreas")
for line in re.findall(
r'\[Tags:.*?identity.*?\]\s*(.+?)(?:\n|$)', system, re.IGNORECASE
):
for word in re.findall(r'\b\w{4,}\b', line):
w = word.lower()
if w not in self._IDENTITY_STOPWORDS:
tokens.add(w)
return frozenset(tokens)
def _response_is_personalized(self, response_bytes: bytes, system: str) -> bool:
"""
Return True if the response contains user-personal information.
Two complementary checks:
1. Direct PII detection in the response content — emails, UUIDs, long
numeric IDs. Catches data retrieved at runtime via tool calls that
never appears in the system prompt.
2. System-prompt token overlap — words extracted from [Tags: identity]
lines that reappear verbatim in the response (catches names, etc.).
Such responses are stored WITHOUT a semantic embedding so they are
invisible to cross-user semantic search while still being cacheable for
the same user via exact-match.
"""
content = self._extract_response_content(response_bytes)
if not content:
# Can't parse → err on the side of caution
return bool(response_bytes)
# 1. Direct PII patterns — independent of what's in the system prompt
if (
self._EMAIL_RE.search(content)
or self._UUID_RE.search(content)
or self._NUMERIC_ID_RE.search(content)
):
return True
# 2. System-prompt identity tokens echoed back in the response
personal = self._extract_personal_tokens(system)
if personal:
content_lower = content.lower()
if any(token in content_lower for token in personal):
return True
return False
def _parse_messages(
self, messages: list[dict]
) -> tuple[str, list[dict], str]:
"""
Returns (system_prompt, prior_history_turns, last_user_message).
Multimodal content lists are reduced to their text parts.
"""
system = ""
turns: list[dict] = []
for m in messages:
role = m.get("role", "")
content = m.get("content", "")
if isinstance(content, list):
content = " ".join(
p.get("text", "")
for p in content
if isinstance(p, dict) and p.get("type") == "text"
)
if role == "system":
system = content
else:
turns.append({"role": role, "content": content})
last_user = ""
for m in reversed(turns):
if m["role"] == "user":
last_user = m["content"]
break
# History = all turns before the final user message
history = turns[:-1] if turns and turns[-1]["role"] == "user" else turns
return system, history, last_user
async def _build_embedding(
self, history: list[dict], last_user: str
) -> list[float] | None:
"""
Weighted mean of BM25-weighted history embedding and last-user embedding.
Returns None when not in semantic mode.
"""
if not self._semantic:
return None
import numpy as np
alpha: float = self._cfg.cache_history_weight # weight for history signal
q_vec = np.array(await self._emb_cache.aencode(last_user), dtype=float)
if not history:
# No history → use question embedding alone (alpha has no effect)
return q_vec.tolist()
h_text = _bm25_weighted_text(history)
h_vec = np.array(await self._emb_cache.aencode(h_text), dtype=float)
combined = alpha * h_vec + (1.0 - alpha) * q_vec
norm = float(np.linalg.norm(combined))
if norm > 0.0:
combined /= norm
return combined.tolist()
# ------------------------------------------------------------------
# Public interface: chat (handles both Ollama and OpenAI message lists)
# ------------------------------------------------------------------
async def get_chat(
self, route: str, model: str, messages: list[dict]
) -> bytes | None:
"""Return cached response bytes, or None on miss."""
if not self._backend:
return None
system, history, last_user = self._parse_messages(messages)
if not last_user:
return None
ns = self._namespace(route, model, system)
key = self._cache_key(ns, last_user)
print(
f"[cache] get_chat route={route} model={model} ns={ns} "
f"prompt={last_user[:80]!r} "
f"system_snippet={system[:120]!r}"
)
# 1. Exact key match
entry = await self._backend.get(key)
if entry is not None:
self._hits += 1
print(f"[cache] HIT (exact) ns={ns} prompt={last_user[:80]!r}")
return entry.response # type: ignore[return-value]
# 2. Semantic similarity match
if self._semantic and self._cfg.cache_similarity < 1.0:
emb = await self._build_embedding(history, last_user)
result = await self._backend.find_similar(
emb, threshold=self._cfg.cache_similarity, namespace=ns
)
if result is not None:
_, matched, sim = result
self._hits += 1
print(
f"[cache] HIT (semantic sim={sim:.3f}) ns={ns} "
f"prompt={last_user[:80]!r} matched={matched.prompt[:80]!r}"
)
return matched.response # type: ignore[return-value]
self._misses += 1
print(f"[cache] MISS ns={ns} prompt={last_user[:80]!r}")
return None
async def set_chat(
self, route: str, model: str, messages: list[dict], response_bytes: bytes
) -> None:
"""Store a response in the cache (fire-and-forget friendly)."""
if not self._backend:
return
system, history, last_user = self._parse_messages(messages)
if not last_user:
return
ns = self._namespace(route, model, system)
key = self._cache_key(ns, last_user)
# Privacy guard: check whether the response contains personal data.
personalized = self._response_is_personalized(response_bytes, system)
if personalized:
# Exact-match is only safe when the system prompt is user-specific
# (i.e. different per user → different namespace). When the system
# prompt is generic and shared across all users, the namespace is the
# same for everyone: storing even without an embedding would let any
# user who asks the identical question get another user's personal data
# via exact-match. In that case skip storage entirely.
system_is_user_specific = bool(self._extract_personal_tokens(system))
if not system_is_user_specific:
print(
f"[cache] SKIP personalized response with generic system prompt "
f"route={route} model={model} ns={ns} prompt={last_user[:80]!r} "
f"system_snippet={system[:120]!r}"
)
return
print(
f"[cache] set_chat route={route} model={model} ns={ns} "
f"personalized={personalized} "
f"prompt={last_user[:80]!r} "
f"system_snippet={system[:120]!r}"
)
# Store without embedding when personalized (invisible to semantic search
# across users, but still reachable by exact-match within this namespace).
emb = (
await self._build_embedding(history, last_user)
if self._semantic and self._cfg.cache_similarity < 1.0 and not personalized
else None
)
from semantic_llm_cache.config import CacheEntry
await self._backend.set(
key,
CacheEntry(
prompt=last_user,
response=response_bytes,
embedding=emb,
created_at=time.time(),
ttl=self._cfg.cache_ttl,
namespace=ns,
hit_count=0,
),
)
# ------------------------------------------------------------------
# Convenience wrappers for the generate route (prompt string, not messages)
# ------------------------------------------------------------------
async def get_generate(
self, model: str, prompt: str, system: str = ""
) -> bytes | None:
messages: list[dict] = []
if system:
messages.append({"role": "system", "content": system})
messages.append({"role": "user", "content": prompt})
return await self.get_chat("generate", model, messages)
async def set_generate(
self, model: str, prompt: str, system: str, response_bytes: bytes
) -> None:
messages: list[dict] = []
if system:
messages.append({"role": "system", "content": system})
messages.append({"role": "user", "content": prompt})
await self.set_chat("generate", model, messages, response_bytes)
# ------------------------------------------------------------------
# Management
# ------------------------------------------------------------------
def stats(self) -> dict:
total = self._hits + self._misses
return {
"hits": self._hits,
"misses": self._misses,
"hit_rate": round(self._hits / total, 3) if total else 0.0,
"semantic": self._semantic,
"backend": self._cfg.cache_backend,
"similarity_threshold": self._cfg.cache_similarity,
"history_weight": self._cfg.cache_history_weight,
}
async def clear(self) -> None:
if self._backend:
await self._backend.clear()
self._hits = 0
self._misses = 0
# ---------------------------------------------------------------------------
# Module-level singleton
# ---------------------------------------------------------------------------
_cache: LLMCache | None = None
async def init_llm_cache(cfg: Any) -> LLMCache | None:
"""Initialise the module-level cache singleton. Returns None if disabled."""
global _cache
if not cfg.cache_enabled:
print("[cache] Cache disabled (cache_enabled=false).")
return None
_cache = LLMCache(cfg)
await _cache.init()
return _cache
def get_llm_cache() -> LLMCache | None:
return _cache
# ---------------------------------------------------------------------------
# Helper: convert a stored Ollama-format non-streaming response to an
# OpenAI SSE single-chunk stream (used when a streaming OpenAI request
# hits the cache whose entry was populated from a non-streaming response).
# ---------------------------------------------------------------------------
def openai_nonstream_to_sse(cached_bytes: bytes, model: str) -> bytes:
"""
Wrap a stored OpenAI ChatCompletion JSON as a minimal single-chunk SSE stream.
The stored entry always uses the non-streaming ChatCompletion format so that
non-streaming cache hits can be served directly; this function adapts it for
streaming clients.
"""
import orjson, time as _time
try:
d = orjson.loads(cached_bytes)
content = (d.get("choices") or [{}])[0].get("message", {}).get("content", "")
chunk = {
"id": d.get("id", "cache-hit"),
"object": "chat.completion.chunk",
"created": d.get("created", int(_time.time())),
"model": d.get("model", model),
"choices": [
{
"index": 0,
"delta": {"role": "assistant", "content": content},
"finish_reason": "stop",
}
],
}
if d.get("usage"):
chunk["usage"] = d["usage"]
return f"data: {orjson.dumps(chunk).decode()}\n\ndata: [DONE]\n\n".encode()
except Exception as exc:
warnings.warn(
f"[cache] openai_nonstream_to_sse: corrupt cache entry, returning empty stream: {exc}",
RuntimeWarning,
stacklevel=2,
)
return b"data: [DONE]\n\n"