Redis cache mechanism
Source location
src/sqlmodel_ext/mixins/cached_table.py — CachedTableBaseMixin (~1900 lines)
CachedTableBaseMixin provides a transparent Redis cache layer for get() queries with automatic invalidation on CRUD operations. This chapter explains how the internals work; to plug it into your own project, see Cache queries with Redis.
Dual-layer cache architecture
1. ID Cache (id:{ModelName}:{id_value})
- For cls.id == value single-row exact queries
- Row-level invalidation O(1)
2. Query Cache (query:{ModelName}:v{version}:{md5_hash})
- For conditional and list queries
- Model-level invalidation: version bump O(1) (old keys expire via TTL)0.3 version-based invalidation
Starting with 0.3.0, model-level query cache invalidation switched from SCAN+DEL to version bumping: every model has a ver:{ModelName} counter, the cache key embeds the version, and invalidation only requires a single INCR. Old-version keys can no longer be read and disappear naturally via TTL. This drops model-level invalidation cost from O(N keys) to O(1).
Cache key generation
ID cache keys are directly concatenated: id:Character:550e8400-...
Query cache keys normalize all parameters (conditions, pagination, sorting, filtering, time) and compute an MD5 hash (first 16 characters), ensuring semantically identical queries produce the same key. Full format: query:Character:v3:abcdef0123456789.
Core class structure
class CachedTableBaseMixin(TableBaseMixin):
__cache_ttl__: ClassVar[int] = 3600
# Redis client (shared at class level)
_redis_client: ClassVar[Any] = None
# Optional metric hooks
on_cache_hit: ClassVar[Callable[[str], None] | None] = None
on_cache_miss: ClassVar[Callable[[str], None] | None] = None
@classmethod
def configure_redis(cls, client: Any) -> None: ...
@classmethod
def check_cache_config(cls) -> None: ...
# Cache primitives
@classmethod
async def _cache_get(cls, key: str) -> bytes | None: ...
@classmethod
async def _cache_set(cls, key: str, value: bytes, ttl: int) -> None: ...
@classmethod
async def _cache_delete(cls, key: str) -> None: ...
@classmethod
async def _cache_delete_pattern(cls, pattern: str) -> None: ...on_cache_hit / on_cache_miss are optional metric hooks — set callbacks at startup to feed hit ratios into Prometheus / Grafana.
get() override
Overrides TableBaseMixin.get(), adding cache logic before and after the database query:
@classmethod
async def get(cls, session, condition, *, no_cache=False, ...):
# 1. Determine if cache can be used
if no_cache or with_for_update or populate_existing or ...:
return await super().get(session, condition, ...)
# 2. Check for pending invalidation data in transaction
if session.info has pending invalidation for this model:
return await super().get(session, condition, ...)
# 3. Detect if this is an ID query
id_value = cls._extract_id_from_condition(condition)
# 4. Multi-ID cache joint query (load + MANYTOONE relations)
if id_value and load contains only cacheable MANYTOONE:
result = await cls._try_load_from_id_caches(...)
if result is not _LOAD_CACHE_MISS:
return result
# 5. Build cache key + try reading
cache_key = cls._build_cache_key(condition, fetch_mode, ...)
cached = await cls._cache_get(cache_key)
if cached:
return cls._deserialize_result(cached, fetch_mode)
# 6. Cache miss, query database
result = await super().get(session, condition, ...)
# 7. Write to cache
serialized = cls._serialize_result(result)
await cls._cache_set(cache_key, serialized, cls.__cache_ttl__)
return resultID query detection
@classmethod
def _extract_id_from_condition(cls, condition):
"""Detects pure ID equality queries, returns ID value or None"""Detects cls.id == value form conditions, using precise ID cache keys instead of query hashes.
Multi-ID cache joint query
When load specifies only cacheable MANYTOONE relations, attempts to read the primary object and relation objects from their respective ID caches — returning zero-SQL results if all hit.
@classmethod
async def _try_load_from_id_caches(cls, session, id_value, rel_info):
# 1. Read primary model ID cache
# 2. Read each relation target's ID cache
# 3. All hit → assemble and return
# 4. Any miss → return _LOAD_CACHE_MISSSerialization scheme
# Wrapper format
{
"_t": "none|single|list", # Result type
"_data": {...}, # Single item data (result of model_dump_json)
"_items": [{...}, ...], # List data
"_c": "ClassName" # Polymorphic safety: records actual class name
}Serialization uses model_dump_json() → JSON → json.loads(). Deserialization uses model_validate() (not model_validate_json to avoid UUID stringification issues with table=True models).
Optional orjson support for faster serialization.
Cache invalidation
Invalidation in CRUD methods
Each CRUD method is overridden to perform invalidation around commit:
async def save(self, session, ...):
result = await super().save(session, ...)
# Immediately invalidate when commit=True
await self._invalidate_for_model(instance_id)
# Write-through refresh: write latest data to ID cache
serialized = cls._serialize_result(result)
await cls._cache_set(id_cache_key, serialized, cls.__cache_ttl__)
return resultInvalidation granularity
| Operation | Strategy |
|---|---|
save/update | DEL id:{cls}:{id} + INCR ver:{cls} |
delete(instances) | per-instance DEL id:* + INCR ver:{cls} |
delete(condition) | model-level ID cleanup + INCR ver:{cls} |
add() | only INCR ver:{cls} (new objects have no stale cache) |
Polymorphic inheritance cascading
When STI subclass data changes, traverses MRO to invalidate all ancestor caches:
async def _invalidate_id_cache(cls, instance_id):
await cls._cache_delete(f"id:{cls.__name__}:{instance_id}")
# Traverse ancestors
for ancestor in cls._cached_ancestors():
await ancestor._cache_delete(f"id:{ancestor.__name__}:{instance_id}")_cached_ancestors() caches all ancestors in the MRO that also inherit CachedTableBaseMixin.
Invalidation compensation mechanism
Handles commit=False scenarios (deferred commit):
session.info state tracking
session.info['_cache_pending'] # Pending invalidation: dict[type, set[id]]
session.info['_cache_synced'] # Already synced: dict[type, set[id]]Two paths
- Synchronous path (CRUD method with
commit=True): directlyawaitinvalidation - Async compensation path (
commit=False):- Record pending invalidation types and IDs in
session.info - Register SQLAlchemy
after_commitevent hook - On commit, the compensation function invalidates what synced path didn't cover
- Record pending invalidation types and IDs in
Sentinel objects
_QUERY_ONLY_INVALIDATION # add() scenario: only invalidate query cache
_FULL_MODEL_INVALIDATION # delete(condition) scenario: full model invalidation
_LOAD_CACHE_MISS # Multi-ID cache joint query missMissingGreenlet avoidance
Risk
After commit, SQLAlchemy resets object association states. Directly accessing attributes triggers synchronous queries.
Solutions:
- Extract IDs with
getattr()before commit - After commit, read from identity map via
sa_inspect()(no DB query) - External SQL methods use
_register_pending_invalidation()+_commit_and_invalidate()
check_cache_config() static check
@classmethod
def check_cache_config(cls) -> None:Validates:
- Redis client has been set via
configure_redis() - No subclass overrides
_get_client(would break the shared client contract) - All subclasses'
__cache_ttl__are positive integers - AST check: forbids direct calls to
invalidate_by_id()etc. in non-cache methods (prevents MissingGreenlet)
Side effect: registers SQLAlchemy after_commit / after_rollback / persistent_to_deleted event hooks.
Graceful degradation
All Redis operations are wrapped in try/except:
- Read failure → return None (degrade to database)
- Write failure → log + continue
- Delete failure → log (TTL provides eventual consistency)