Optimistic locking mechanism
Source location
src/sqlmodel_ext/mixins/optimistic_lock.py — OptimisticLockMixin and OptimisticLockError
Retry logic lives in src/sqlmodel_ext/mixins/table.py's save() / update() methods.
Why this exists
Concurrent updates on the same record cause lost updates: A modifies status, B simultaneously modifies amount, and whoever writes last overwrites the other's changes. Optimistic locking detects these conflicts via a version number, making them recognizable and retryable instead of silently dropping data.
Looking for how to use it? See Handle concurrent updates. This chapter only explains why it's implemented this way.
OptimisticLockMixin
The entire Mixin is surprisingly short:
class OptimisticLockMixin:
_has_optimistic_lock: ClassVar[bool] = True
version: int = 0_has_optimistic_lock— internal marker sosave()/update()know whether to apply optimistic lock logicversion— the version number field
SQLAlchemy's version_id_col mechanism is enabled through __mapper_args__ in the metaclass — automatically generating WHERE version = ? and SET version = version + 1 on every UPDATE.
OptimisticLockError
class OptimisticLockError(Exception):
def __init__(self, message, model_class=None, record_id=None,
expected_version=None, original_error=None):
super().__init__(message)
self.model_class = model_class # "Order"
self.record_id = record_id # "a1b2c3..."
self.expected_version = expected_version # 3
self.original_error = original_error # StaleDataErrorCarries rich context information for debugging and logging.
Retry logic in save()
save() and update() share the same optimistic lock retry structure:
async def save(self, session, ..., optimistic_retry_count=0):
cls = type(self)
instance = self
retries_remaining = optimistic_retry_count
current_data = None
while True:
session.add(instance)
try:
await session.commit()
break # Success
except StaleDataError as e: # Version conflict!
await session.rollback()
if retries_remaining <= 0:
raise OptimisticLockError(
message=f"optimistic lock conflict",
model_class=cls.__name__,
record_id=str(instance.id),
expected_version=instance.version,
original_error=e,
) from e
retries_remaining -= 1
# Save current modifications (excluding metadata fields)
if current_data is None:
current_data = self.model_dump(
exclude={'id', 'version', 'created_at', 'updated_at'}
)
# Get the latest record from the database
fresh = await cls.get(session, cls.id == self.id)
if fresh is None:
raise OptimisticLockError("record has been deleted") from e
# Re-apply my changes to the latest record
for key, value in current_data.items():
if hasattr(fresh, key):
setattr(fresh, key, value)
instance = freshRetry flow visualization
Key implementation details
- Lazy
current_datasaving — only usesmodel_dump()on the first conflict to save modifications, excluding metadata fieldsid,version,created_at,updated_at. This avoids overhead on the conflict-free path. - Deleted-record detection — if the record has been deleted during re-query, throws a specific error instead of retrying infinitely.
- Re-applying changes — uses
setattrto apply original modifications field-by-field onto the latest record, then retries commit with the new version. This is far simpler than "abort, then re-run business logic" — the calling code is completely unaware that retries happened.