Source code for kotti.sqla

# -*- coding: utf-8 -*-

"""
Inheritance Diagram
-------------------

.. inheritance-diagram:: kotti.sqla
"""

from pyramid.compat import json
from pyramid.security import ALL_PERMISSIONS
from pyramid.security import Allow
from sqlalchemy.types import TypeDecorator, TEXT
from sqlalchemy.ext.mutable import Mutable
from sqlalchemy.ext.declarative import declared_attr


def dump_default(obj):
    if isinstance(obj, MutationDict):
        return obj._d
    elif isinstance(obj, MutationList):
        return obj._d


def no_autoflush(func):
    from kotti.resources import DBSession

    def wrapper(*args, **kwargs):
        with DBSession.no_autoflush:
            return func(*args, **kwargs)
    return wrapper


[docs]class JsonType(TypeDecorator): """http://www.sqlalchemy.org/docs/core/types.html#marshal-json-strings """ impl = TEXT
[docs] def process_bind_param(self, value, dialect): if value is not None: value = json.dumps(value, default=dump_default) return value
[docs] def process_result_value(self, value, dialect): if value is not None: value = json.loads(value) return value
[docs]class ACLType(JsonType): ALL_PERMISSIONS_SERIALIZED = '__ALL_PERMISSIONS__' DEFAULT_ACE = (Allow, 'role:admin', ALL_PERMISSIONS)
[docs] def process_bind_param(self, value, dialect): if value is not None: value = [list(ace) for ace in value if ace != self.DEFAULT_ACE] for ace in value: if ace[2] == ALL_PERMISSIONS: ace[2] = self.ALL_PERMISSIONS_SERIALIZED return super(ACLType, self).process_bind_param(value, dialect)
[docs] def process_result_value(self, value, dialect): acl = super(ACLType, self).process_result_value(value, dialect) if acl is not None: for ace in acl: if ace[2] == self.ALL_PERMISSIONS_SERIALIZED: ace[2] = ALL_PERMISSIONS return [self.DEFAULT_ACE] + [tuple(ace) for ace in acl]
[docs]class MutationDict(Mutable): """http://www.sqlalchemy.org/docs/orm/extensions/mutable.html """ _wraps = dict def __init__(self, data): self._d = data super(MutationDict, self).__init__()
[docs] @classmethod def coerce(cls, key, value): if not isinstance(value, MutationDict): if isinstance(value, dict): return cls(value) return Mutable.coerce(key, value) else: return value
def __json__(self, request=None): return dict([(key, value.__json__(request)) if hasattr(value, '__json__') else (key, value) for key, value in self._d.items()])
[docs]class MutationList(Mutable): _wraps = list def __init__(self, data): self._d = data super(MutationList, self).__init__()
[docs] @classmethod def coerce(cls, key, value): if not isinstance(value, MutationList): if isinstance(value, list): return cls(value) return Mutable.coerce(key, value) else: return value
def __radd__(self, other): return other + self._d def __json__(self, request=None): return [item.__json__(request) if hasattr(item, '__json__') else item for item in self._d]
def _make_mutable_method_wrapper(wrapper_class, methodname, mutates): def replacer(self, *args, **kwargs): method = getattr(self._d, methodname) value = method(*args, **kwargs) if mutates: self.changed() return value replacer.__name__ = methodname return replacer for wrapper_class in (MutationDict, MutationList): for methodname, mutates in ( ('__iter__', False), ('__len__', False), ('__eq__', False), ('__add__', False), ('__getitem__', False), ('__getslice__', False), ('__repr__', False), ('get', False), ('keys', False), ('__setitem__', True), ('__delitem__', True), ('__setslice__', True), ('__delslice__', True), ('append', True), ('clear', True), ('extend', True), ('insert', True), ('pop', True), ('setdefault', True), ('update', True), ('remove', True), ): # Only wrap methods that do exist on the wrapped type! if getattr(wrapper_class._wraps, methodname, False): setattr( wrapper_class, methodname, _make_mutable_method_wrapper( wrapper_class, methodname, mutates), ) class NestedMixin(object): __parent__ = None def __init__(self, *args, **kwargs): self.__parent__ = kwargs.pop('__parent__', None) super(NestedMixin, self).__init__(*args, **kwargs) def __getitem__(self, key): value = self._d.__getitem__(key) return self.try_wrap(value) def changed(self): if self.__parent__ is not None: self.__parent__.changed() else: super(NestedMixin, self).changed() def try_wrap(self, value): for typ, wrapper in MUTATION_WRAPPERS.items(): if isinstance(value, typ): value = wrapper(value, __parent__=self) break return value def __eq__(self, other): return self._d == other
[docs]class NestedMutationDict(NestedMixin, MutationDict): def setdefault(self, key, default): if isinstance(default, list): default = NestedMutationList(default, __parent__=self) elif isinstance(default, dict): default = NestedMutationDict(default, __parent__=self) return super(NestedMutationDict, self).setdefault(key, default)
[docs]class NestedMutationList(NestedMixin, MutationList): pass
MUTATION_WRAPPERS = { dict: NestedMutationDict, list: NestedMutationList, } class Base(object): @declared_attr def __tablename__(cls): from kotti.util import camel_case_to_name # prevent circ import return '{0}s'.format(camel_case_to_name(cls.__name__))