Source code for kotti.sqla

"""
Inheritance Diagram
-------------------

.. inheritance-diagram:: kotti.sqla
"""
from pyramid.compat import json
from pyramid.security import ALL_PERMISSIONS
from pyramid.security import Allow
from sqlalchemy.ext import baked
from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.ext.mutable import Mutable
from sqlalchemy.types import Text
from sqlalchemy.types import TypeDecorator

bakery = baked.bakery()
baked.bake_lazy_loaders()


def dump_default(obj):
    if isinstance(obj, MutationDict):
        return obj._d
    elif isinstance(obj, MutationList):
        return obj._d


def no_autoflush(func):
    from kotti.resources import DBSession

    def wrapper(*args, **kwargs):
        with DBSession.no_autoflush:
            return func(*args, **kwargs)

    return wrapper


[docs]class JsonType(TypeDecorator): """http://www.sqlalchemy.org/docs/core/types.html#marshal-json-strings """ impl = Text # noinspection PyMethodOverriding
[docs] @staticmethod def process_bind_param(value, dialect): if value is not None: value = json.dumps(value, default=dump_default) return value
# noinspection PyMethodOverriding
[docs] @staticmethod def process_result_value(value, dialect): if value is not None: value = json.loads(value) return value
[docs]class ACLType(JsonType): ALL_PERMISSIONS_SERIALIZED = "__ALL_PERMISSIONS__" DEFAULT_ACE = (Allow, "role:admin", ALL_PERMISSIONS)
[docs] def process_bind_param(self, value, dialect): if value is not None: value = [list(ace) for ace in value if ace != self.DEFAULT_ACE] for ace in value: if ace[2] == ALL_PERMISSIONS: ace[2] = self.ALL_PERMISSIONS_SERIALIZED return super().process_bind_param(value, dialect)
[docs] def process_result_value(self, value, dialect): acl = super().process_result_value(value, dialect) if acl is not None: for ace in acl: if ace[2] == self.ALL_PERMISSIONS_SERIALIZED: ace[2] = ALL_PERMISSIONS return [self.DEFAULT_ACE] + [tuple(ace) for ace in acl]
[docs]class MutationDict(Mutable): """http://www.sqlalchemy.org/docs/orm/extensions/mutable.html """ _wraps = dict def __init__(self, data): self._d = data super().__init__()
[docs] @classmethod def coerce(cls, key, value): if not isinstance(value, MutationDict): if isinstance(value, dict): return cls(value) return Mutable.coerce(key, value) else: return value
def __json__(self, request=None): return dict( [ (key, value.__json__(request)) if hasattr(value, "__json__") else (key, value) for key, value in self._d.items() ] )
[docs]class MutationList(Mutable): _wraps = list def __init__(self, data): self._d = data super().__init__()
[docs] @classmethod def coerce(cls, key, value): if not isinstance(value, MutationList): if isinstance(value, list): return cls(value) return Mutable.coerce(key, value) else: return value
def __radd__(self, other): return other + self._d def __json__(self, request=None): return [ item.__json__(request) if hasattr(item, "__json__") else item for item in self._d ]
def _make_mutable_method_wrapper(wrapper_class, methodname, mutates): def replacer(self, *args, **kwargs): method = getattr(self._d, methodname) value = method(*args, **kwargs) if mutates: self.changed() return value replacer.__name__ = methodname return replacer for wrapper_class in (MutationDict, MutationList): for methodname, mutates in ( ("__iter__", False), ("__len__", False), ("__eq__", False), ("__add__", False), ("__getitem__", False), ("__getslice__", False), ("__repr__", False), ("get", False), ("keys", False), ("__setitem__", True), ("__delitem__", True), ("__setslice__", True), ("__delslice__", True), ("append", True), ("clear", True), ("extend", True), ("insert", True), ("pop", True), ("setdefault", True), ("update", True), ("remove", True), ): # Only wrap methods that do exist on the wrapped type! if getattr(wrapper_class._wraps, methodname, False): setattr( wrapper_class, methodname, _make_mutable_method_wrapper(wrapper_class, methodname, mutates), ) class NestedMixin: __parent__ = None def __init__(self, *args, **kwargs): self.__parent__ = kwargs.pop("__parent__", None) super().__init__(*args, **kwargs) def __getitem__(self, key): value = self._d.__getitem__(key) return self.try_wrap(value) def changed(self): if self.__parent__ is not None: self.__parent__.changed() else: super().changed() def try_wrap(self, value): for typ, wrapper in MUTATION_WRAPPERS.items(): if isinstance(value, typ): value = wrapper(value, __parent__=self) break return value def __eq__(self, other): return self._d == other
[docs]class NestedMutationDict(NestedMixin, MutationDict): def setdefault(self, key, default): if isinstance(default, list): default = NestedMutationList(default, __parent__=self) elif isinstance(default, dict): default = NestedMutationDict(default, __parent__=self) return super().setdefault(key, default)
[docs]class NestedMutationList(NestedMixin, MutationList): pass
MUTATION_WRAPPERS = {dict: NestedMutationDict, list: NestedMutationList} class Base: @declared_attr def __tablename__(cls): from kotti.util import camel_case_to_name # prevent circ import return "{}s".format(camel_case_to_name(cls.__name__))