"""
The :mod:`~kotti.resources` module contains all the classes for Kotti's
persistence layer, which is based on SQLAlchemy.
Inheritance Diagram
-------------------
.. inheritance-diagram:: kotti.resources
"""
import abc
import datetime
import os
import warnings
from cgi import FieldStorage
from collections.abc import MutableMapping
from copy import copy
from fnmatch import fnmatch
from io import BufferedReader
from io import BytesIO
from typing import Any
from typing import Iterable
from typing import List
from typing import Optional
from typing import Union
from depot.fields.sqlalchemy import UploadedFileField
from depot.fields.sqlalchemy import _SQLAMutationTracker
from depot.fields.upload import UploadedFile
from pyramid.decorator import reify
from pyramid.traversal import resource_path
from sqlalchemy import Boolean
from sqlalchemy import Column
from sqlalchemy import DateTime
from sqlalchemy import ForeignKey
from sqlalchemy import Integer
from sqlalchemy import String
from sqlalchemy import Unicode
from sqlalchemy import UnicodeText
from sqlalchemy import UniqueConstraint
from sqlalchemy import bindparam
from sqlalchemy import event
from sqlalchemy import inspect
from sqlalchemy.engine.base import Engine
from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy.ext.declarative import DeclarativeMeta
from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.ext.orderinglist import OrderingList
from sqlalchemy.ext.orderinglist import ordering_list
from sqlalchemy.orm import backref
from sqlalchemy.orm import object_mapper
from sqlalchemy.orm import relation
from sqlalchemy.orm.attributes import Event
from sqlalchemy.orm.exc import NoResultFound
from sqlalchemy.orm.scoping import scoped_session
from sqlalchemy.sql import and_
from sqlalchemy.sql import select
from sqlalchemy.util import classproperty
from sqlalchemy.util.langhelpers import _symbol
from transaction import commit
from zope.interface import implementer
from kotti import Base
from kotti import DBSession
from kotti import TRUE_VALUES
from kotti import _resolve_dotted
from kotti import get_settings
from kotti import metadata
from kotti.interfaces import IContent
from kotti.interfaces import IDefaultWorkflow
from kotti.interfaces import IDocument
from kotti.interfaces import IFile
from kotti.interfaces import INode
from kotti.migrate import stamp_heads
from kotti.request import Request
from kotti.security import PersistentACLMixin
from kotti.security import view_permitted
from kotti.sqla import ACLType
from kotti.sqla import JsonType
from kotti.sqla import MutationList
from kotti.sqla import NestedMutationDict
from kotti.sqla import bakery
from kotti.util import Link
from kotti.util import LinkParent
from kotti.util import LinkRenderer
from kotti.util import _
from kotti.util import _to_fieldstorage
from kotti.util import camel_case_to_name
from kotti.util import get_paste_items
[docs]class ContainerMixin(MutableMapping):
"""Containers form the API of a Node that's used for subitem
access and in traversal.
"""
def __iter__(self):
return iter(self.children)
def __len__(self):
return len(self.keys())
def __setitem__(self, key: str, node: "Node") -> None:
node.name = key
self.children.append(node)
self.children.reorder()
def __delitem__(self, key: str) -> None:
node = self[key]
self.children.remove(node)
DBSession.delete(node)
[docs] def keys(self) -> List[str]:
"""
:result: children names
:rtype: list
"""
return [child.name for child in self.children]
[docs] def values(self) -> OrderingList:
return self.children
def __getitem__(self, path: Union[str, Iterable[str]]) -> "Node":
db_session = DBSession()
db_session._autoflush()
# if not hasattr(path, '__iter__'):
if isinstance(path, str):
path = (path,)
path = [p for p in path]
# Optimization: don't query children if self._children already there:
if "_children" in self.__dict__:
rest = path[1:]
try:
[child] = filter(lambda ch: ch.name == path[0], self._children)
except ValueError:
raise KeyError(path)
if rest:
return child[rest]
else:
return child
baked_query = bakery(lambda session: session.query(Node))
if len(path) == 1:
try:
baked_query += lambda q: q.filter(
Node.name == bindparam("name"),
Node.parent_id == bindparam("parent_id"),
)
return (
baked_query(db_session)
.params(name=path[0], parent_id=self.id)
.one()
)
except NoResultFound:
raise KeyError(path)
# We have a path with more than one element, so let's be a
# little clever about fetching the requested node:
nodes = Node.__table__
conditions = [nodes.c.id == self.id]
alias = nodes
for name in path:
alias, old_alias = nodes.alias(), alias
conditions.append(alias.c.parent_id == old_alias.c.id)
conditions.append(alias.c.name == name)
expr = select([alias.c.id], and_(*conditions))
row = db_session.execute(expr).fetchone()
if row is None:
raise KeyError(path)
return baked_query(db_session).get(row.id)
@hybrid_property
def children(self):
"""
:result: *all* child nodes without considering permissions.
:rtype: list
"""
return self._children
[docs] def children_with_permission(
self, request: Request, permission: str = "view"
) -> "List[Node]":
"""Return only those children for which the user initiating the
request has the asked permission.
:param request: current request
:type request: :class:`kotti.request.Request`
:param permission: The permission for which you want the allowed
children
:type permission: str
:result: List of child nodes
:rtype: list
"""
return [c for c in self.children if request.has_permission(permission, c)]
[docs]class LocalGroup(Base):
"""Local groups allow the assignment of groups or roles to principals
(users or groups) **for a certain context** (i.e. a :class:`Node` in the
content tree).
"""
__tablename__ = "local_groups"
__table_args__ = (UniqueConstraint("node_id", "principal_name", "group_name"),)
#: Primary key for the node in the DB
#: (:class:`sqlalchemy.types.Integer`)
id = Column(Integer(), primary_key=True)
#: ID of the node for this assignment
#: (:class:`sqlalchemy.types.Integer`)
node_id = Column(ForeignKey("nodes.id"), index=True)
#: Name of the principal (user or group)
#: (:class:`sqlalchemy.types.Unicode`)
principal_name = Column(Unicode(100), index=True)
#: Name of the assigned group or role
#: (:class:`sqlalchemy.types.Unicode`)
group_name = Column(Unicode(100))
def __init__(self, node: "Node", principal_name: str, group_name: str):
self.node = node
self.principal_name = principal_name
self.group_name = group_name
def copy(self, **kwargs) -> "LocalGroup":
kwargs.setdefault("node", self.node)
kwargs.setdefault("principal_name", self.principal_name)
kwargs.setdefault("group_name", self.group_name)
return self.__class__(**kwargs)
def __repr__(self):
return "<{} {} => {} at {}>".format(
self.__class__.__name__,
self.principal_name,
self.group_name,
resource_path(self.node),
)
[docs]@implementer(INode)
class Node(Base, ContainerMixin, PersistentACLMixin, metaclass=NodeMeta):
"""Basic node in the persistance hierarchy."""
__table_args__ = (UniqueConstraint("parent_id", "name"),)
__mapper_args__ = dict(
polymorphic_on="type", polymorphic_identity="node", with_polymorphic="*"
)
#: Primary key for the node in the DB
#: (:class:`sqlalchemy.types.Integer`)
id = Column(Integer(), primary_key=True)
#: Lowercase class name of the node instance
#: (:class:`sqlalchemy.types.String`)
type = Column(String(30), nullable=False)
#: ID of the node's parent
#: (:class:`sqlalchemy.types.Integer`)
parent_id = Column(ForeignKey("nodes.id"), index=True)
#: Position of the node within its container / parent
#: (:class:`sqlalchemy.types.Integer`)
position = Column(Integer())
_acl = Column(MutationList.as_mutable(ACLType))
#: Name of the node as used in the URL
#: (:class:`sqlalchemy.types.Unicode`)
name = Column(Unicode(250), nullable=False)
#: Title of the node, e.g. as shown in search results
#: (:class:`sqlalchemy.types.Unicode`)
title = Column(Unicode(250))
#: Annotations can be used to store arbitrary data in a nested dictionary
#: (:class:`kotti.sqla.NestedMustationDict`)
annotations = Column(NestedMutationDict.as_mutable(JsonType))
#: The path can be used to efficiently filter for child objects
#: (:class:`sqlalchemy.types.Unicode`).
path = Column(Unicode(2000), index=True)
parent = relation(
"Node",
remote_side=[id],
backref=backref(
"_children",
collection_class=ordering_list("position", reorder_on_append=True),
order_by=[position],
cascade="all",
),
)
local_groups = relation(
LocalGroup, backref=backref("node"), cascade="all", lazy="joined"
)
__hash__ = Base.__hash__
def __init__(
self,
name: str = None,
parent: "Node" = None,
title: str = "",
annotations: dict = None,
**kwargs,
):
"""Constructor"""
super().__init__(**kwargs)
if annotations is None:
annotations = {}
self.parent = parent
self.name = name
self.title = title
self.annotations = annotations
@property
def __name__(self):
return self.name
@property
def __parent__(self):
return self.parent
@__parent__.setter
def __parent__(self, value):
self.parent = value
def __repr__(self) -> str:
return "<{} {} at {}>".format(
self.__class__.__name__, self.id, resource_path(self)
)
def __eq__(self, other: Any) -> bool:
return isinstance(other, Node) and self.id == other.id
def __ne__(self, other: Any) -> bool:
return not self == other
copy_properties_blacklist = (
"id",
"parent",
"parent_id",
"_children",
"local_groups",
"_tags",
)
[docs] def clear(self) -> None:
for node in DBSession.query(Node).filter(Node.parent == self):
DBSession.delete(node)
[docs] def copy(self, **kwargs) -> "Node":
"""
:result: A copy of the current instance
:rtype: :class:`~kotti.resources.Node`
"""
children = list(self.children)
copy = self.__class__()
for prop in object_mapper(self).iterate_properties:
if prop.key not in self.copy_properties_blacklist:
setattr(copy, prop.key, getattr(self, prop.key))
for key, value in kwargs.items():
setattr(copy, key, value)
for child in children:
copy.children.append(child.copy())
return copy
[docs]class TypeInfo:
"""TypeInfo instances contain information about the type of a node.
You can pass arbitrary keyword arguments in the constructor, they
will become instance attributes. The most common are:
- name
- title
- add_view
- addable_to
- edit_links
- selectable_default_views
- uploadable_mimetypes
- add_permission
"""
addable_to = ()
selectable_default_views = ()
uploadable_mimetypes = ()
edit_links = ()
action_links = () # BBB
def __init__(self, **kwargs) -> None:
if "action_links" in kwargs:
msg = (
"'action_links' is deprecated as of Kotti 1.0.0. "
"'edit_links' includes 'action_links' and should "
"be used instead."
)
edit_links = kwargs.get("edit_links")
last_link = edit_links[-1] if edit_links else None
if isinstance(last_link, LinkParent):
last_link.children.extend(kwargs["action_links"])
warnings.warn(msg, DeprecationWarning)
else:
raise ValueError(msg)
# default value for add_permission should be 'add'
if "add_permission" not in kwargs:
kwargs["add_permission"] = "add"
self.__dict__.update(kwargs)
[docs] def copy(self, **kwargs) -> "TypeInfo":
"""
:result: a copy of the current TypeInfo instance
:rtype: :class:`~kotti.resources.TypeInfo`
"""
d = self.__dict__.copy()
d["selectable_default_views"] = copy(self.selectable_default_views)
d.update(kwargs)
return TypeInfo(**d)
[docs] def addable(self, context: "Content", request: Optional[Request]) -> bool:
"""
:param context:
:type context: Content or subclass thereof (or anything that has a
type_info attribute of type
:class:`~kotti.resources.TypeInfo`)
:param request: current request
:type request: :class:`kotti.request.Request`
:result: True if the type described in 'self' may be added to 'context',
False otherwise.
:rtype: Boolean
"""
if self.add_view is None:
return False
if context.type_info.name in self.addable_to:
return bool(view_permitted(context, request, self.add_view))
else:
return False
[docs] def add_selectable_default_view(self, name: str, title: str) -> None:
"""Add a view to the list of default views selectable by the
user in the UI.
:param name: Name the view is registered with
:type name: str
:param title: Title for the view for display in the UI.
:type title: str or TranslationString
"""
self.selectable_default_views.append((name, title))
[docs] def is_uploadable_mimetype(self, mimetype: str) -> int:
"""Check if uploads of the given MIME type are allowed.
:param mimetype: MIME type
:type mimetype: str
:result: Upload allowed (>0) or forbidden (0). The greater the result,
the better is the match. E.g. ``image/*`` (6) is a better
match for ``image/png`` than `*` (1).
:rtype: int
"""
match_score = 0
for mt in self.uploadable_mimetypes:
if fnmatch(mimetype, mt) and len(mt) > match_score:
match_score = len(mt)
return match_score
[docs]class Tag(Base):
"""Basic tag implementation. Instances of this class are just the tag
itself and can be mapped to instances of :class:`~kotti.resources.Content`
(or any of its descendants) via instances of
:class:`~kotti.resources.TagsToContents`.
"""
#: Primary key column in the DB
#: (:class:`sqlalchemy.types.Integer`)
id = Column(Integer, primary_key=True)
#: Title of the tag
#: :class:`sqlalchemy.types.Unicode`
title = Column(Unicode(100), unique=True, nullable=False)
def __repr__(self) -> str:
return f"<Tag ('{self.title}')>"
@property
def items(self) -> List[Node]:
"""
:result:
:rtype: list
"""
return [rel.item for rel in self.content_tags]
[docs]class TagsToContents(Base):
"""Tags to contents mapping"""
__tablename__ = "tags_to_contents"
#: Foreign key referencing :attr:`Tag.id`
#: (:class:`sqlalchemy.types.Integer`)
tag_id = Column(ForeignKey("tags.id"), primary_key=True, index=True)
#: Foreign key referencing :attr:`Content.id`
#: (:class:`sqlalchemy.types.Integer`)
content_id = Column(ForeignKey("contents.id"), primary_key=True, index=True)
#: Relation that adds a ``content_tags`` :func:`sqlalchemy.orm.backref`
#: to :class:`~kotti.resources.Tag` instances to allow easy access to all
#: content tagged with that tag.
#: (:func:`sqlalchemy.orm.relationship`)
tag = relation(Tag, backref=backref("content_tags", cascade="all"), lazy="joined")
#: Ordering position of the tag
#: :class:`sqlalchemy.types.Integer`
position = Column(Integer, nullable=False)
#: title of the associated :class:`~kotti.resources.Tag` instance
#: (:class:`sqlalchemy.ext.associationproxy.association_proxy`)
title = association_proxy("tag", "title")
@classmethod
def _tag_find_or_create(cls, title: str) -> "TagsToContents":
"""
Find or create a tag with the given title.
:param title: Title of the tag to find or create.
:type title: str
:result:
:rtype: :class:`~kotti.resources.TagsToContents`
"""
with DBSession.no_autoflush:
tag = DBSession.query(Tag).filter_by(title=title).first()
if tag is None:
tag = Tag(title=title)
return cls(tag=tag)
# noinspection PyUnusedLocal
def _not_root(context: Node, request: Request) -> bool:
return context is not get_root()
default_actions = [
Link("copy", title=_("Copy")),
Link("cut", title=_("Cut"), predicate=_not_root),
Link("paste", title=_("Paste"), predicate=get_paste_items),
Link("rename", title=_("Rename"), predicate=_not_root),
Link("delete", title=_("Delete"), predicate=_not_root),
LinkRenderer("default-view-selector"),
]
default_type_info = TypeInfo(
name="Content",
title="type_info title missing", # BBB
add_view=None,
addable_to=[],
edit_links=[
Link("contents", title=_("Contents")),
Link("edit", title=_("Edit")),
Link("share", title=_("Share")),
LinkParent(title=_("Actions"), children=default_actions),
],
selectable_default_views=[("folder_view", _("Folder view"))],
)
[docs]@implementer(IContent)
class Content(Node):
"""Content adds some attributes to :class:`~kotti.resources.Node` that are
useful for content objects in a CMS.
"""
@classproperty
def __mapper_args__(cls):
return dict(polymorphic_identity=camel_case_to_name(cls.__name__))
#: Primary key column in the DB
#: (:class:`sqlalchemy.types.Integer`)
id = Column(ForeignKey(Node.id), primary_key=True)
#: Name of the view that should be displayed to the user when
#: visiting an URL without a explicit view name appended
#: (:class:`sqlalchemy.types.String`)
default_view = Column(String(50))
#: Description of the content object. In default Kotti this is
#: used e.g. in the description tag in the HTML, in the search results
#: and rendered below the title in most views.
#: (:class:`sqlalchemy.types.Unicode`)
description = Column(UnicodeText())
#: Language code (ISO 639) of the content object
#: (:class:`sqlalchemy.types.Unicode`)
language = Column(Unicode(10))
#: Owner (username) of the content object
#: (:class:`sqlalchemy.types.Unicode`)
owner = Column(Unicode(100))
#: Workflow state of the content object
#: (:class:`sqlalchemy.types.String`)
state = Column(String(50))
#: Date / time the content was created
#: (:class:`sqlalchemy.types.DateTime`)
creation_date = Column(DateTime())
#: Date / time the content was last modified
#: (:class:`sqlalchemy.types.DateTime`)
modification_date = Column(DateTime())
#: Shall the content be visible in the navigation?
#: (:class:`sqlalchemy.types.Boolean`)
in_navigation = Column(Boolean())
_tags = relation(
TagsToContents,
backref=backref("item"),
order_by=[TagsToContents.position],
collection_class=ordering_list("position"),
cascade="all, delete-orphan",
)
#: Tags assigned to the content object (list of str)
tags = association_proxy(
"_tags", "title", creator=TagsToContents._tag_find_or_create
)
#: type_info is a class attribute (:class:`TypeInfo`)
type_info = default_type_info
def __init__(
self,
name: Optional[str] = None,
parent: Optional[Node] = None,
title: Optional[str] = "",
annotations: Optional[dict] = None,
default_view: Optional[str] = None,
description: Optional[str] = "",
language: Optional[str] = None,
owner: Optional[str] = None,
creation_date: Optional[datetime.datetime] = None,
modification_date: Optional[datetime.datetime] = None,
in_navigation: Optional[bool] = True,
tags: Optional[List[str]] = None,
**kwargs,
):
super().__init__(name, parent, title, annotations, **kwargs)
self.default_view = default_view
self.description = description
self.language = language
self.owner = owner
self.in_navigation = in_navigation
# These are set by events if not defined at this point:
self.creation_date = creation_date
self.modification_date = modification_date
self.tags = tags or []
[docs] def copy(self, **kwargs) -> "Content":
# Same as `Node.copy` with additional tag support.
kwargs["tags"] = self.tags
return super().copy(**kwargs)
[docs]@implementer(IDocument, IDefaultWorkflow)
class Document(Content):
"""Document extends :class:`~kotti.resources.Content` with a body and its
mime_type. In addition Document and its descendants implement
:class:`~kotti.interfaces.IDefaultWorkflow` and therefore
are associated with the default workflow (at least in
unmodified Kotti installations).
"""
#: Primary key column in the DB
#: (:class:`sqlalchemy.types.Integer`)
id = Column(ForeignKey(Content.id), primary_key=True)
#: Body text of the Document
#: (:class:`sqlalchemy.types.Unicode`)
body = Column(UnicodeText())
#: MIME type of the Document
#: (:class:`sqlalchemy.types.String`)
mime_type = Column(String(30))
#: type_info is a class attribute
#: (:class:`~kotti.resources.TypeInfo`)
type_info = Content.type_info.copy(
name="Document",
title=_("Document"),
add_view="add_document",
addable_to=["Document"],
)
def __init__(
self, body: Optional[str] = "", mime_type: Optional[str] = "text/html", **kwargs
):
super().__init__(**kwargs)
self.body = body
self.mime_type = mime_type
# noinspection PyMethodParameters
[docs]class SaveDataMixin:
"""The classmethods must not be implemented on a class that inherits
from ``Base`` with ``SQLAlchemy>=1.0``, otherwise that class cannot be
subclassed further.
See http://stackoverflow.com/questions/30433960/how-to-use-declare-last-in-sqlalchemy-1-0 # noqa
"""
#: The filename is used in the attachment view to give downloads
#: the original filename it had when it was uploaded.
#: (:class:`sqlalchemy.types.Unicode`)
filename = Column(Unicode(100))
#: MIME type of the file
#: (:class:`sqlalchemy.types.String`)
mimetype = Column(String(100))
#: Size of the file in bytes
#: (:class:`sqlalchemy.types.Integer`)
size = Column(Integer())
#: Filedepot mapped blob
#: (:class:`depot.fileds.sqlalchemy.UploadedFileField`)
@declared_attr
def data(cls) -> Column:
return cls.__table__.c.get("data", Column(UploadedFileField(cls.data_filters)))
data_filters = ()
@classmethod
def __declare_last__(cls) -> None:
"""Unconfigure the event set in _SQLAMutationTracker,
we have _save_data"""
mapper = cls._sa_class_manager.mapper
args = (mapper.attrs["data"], "set", _SQLAMutationTracker._field_set)
if event.contains(*args):
event.remove(*args)
# Declaring the event on the class attribute instead of mapper property
# enables proper registration on its subclasses
event.listen(cls.data, "set", cls._save_data, retval=True)
@staticmethod
def _save_data(
target: "File",
value: Optional[
Union[FieldStorage, bytes, UploadedFile, BufferedReader]
], # noqa
oldvalue: Optional[Union[UploadedFile, _symbol]],
initiator: Event,
) -> Optional[UploadedFile]:
"""Refresh metadata and save the binary data to the data field.
:param target: The File instance
:type target: :class:`kotti.resources.File` or subclass
:param value: The container for binary data
:type value: A :class:`cgi.FieldStorage` instance
"""
if isinstance(value, bytes):
fp = BytesIO(value)
value = _to_fieldstorage(
fp=fp,
filename=target.filename,
mimetype=target.mimetype,
size=len(value),
)
newvalue = _SQLAMutationTracker._field_set(target, value, oldvalue, initiator)
if newvalue is None:
return
target.filename = newvalue.filename
target.mimetype = newvalue.content_type
target.size = newvalue.file.content_length
return newvalue
[docs] @classmethod
def from_field_storage(cls, fs):
"""Create and return an instance of this class from a file upload
through a webbrowser.
:param fs: FieldStorage instance as found in a
:class:`kotti.request.Request`'s ``POST`` MultiDict.
:type fs: :class:`cgi.FieldStorage`
:result: The created instance.
:rtype: :class:`kotti.resources.File`
"""
if not cls.type_info.is_uploadable_mimetype(fs.type):
raise ValueError(f"Unsupported MIME type: {fs.type}")
return cls(data=fs)
def __init__(
self,
data: Optional[Union[bytes, BufferedReader, FieldStorage]] = None, # noqa
filename: Optional[str] = None,
mimetype: Optional[str] = None,
size: Optional[int] = None,
**kwargs,
) -> None:
super().__init__(**kwargs)
self.filename = filename
self.mimetype = mimetype
self.size = size
self.data = data
[docs] def copy(self, **kwargs) -> "File":
"""Same as `Content.copy` with additional data support. ``data`` needs
some special attention, because we don't want the same depot file to be
assigned to multiple content nodes.
"""
_copy = super().copy(**kwargs)
_copy.data = self.data.file.read()
return _copy
[docs]@implementer(IFile)
class File(SaveDataMixin, Content):
"""File adds some attributes to :class:`~kotti.resources.Content` that are
useful for storing binary data.
"""
#: Primary key column in the DB
#: (:class:`sqlalchemy.types.Integer`)
id = Column(ForeignKey(Content.id), primary_key=True)
type_info = Content.type_info.copy(
name="File",
title=_("File"),
add_view="add_file",
addable_to=["Document"],
selectable_default_views=[],
uploadable_mimetypes=["*"],
)
[docs]def get_root(request: Optional[Request] = None) -> Node:
"""Call the function defined by the ``kotti.root_factory`` setting and
return its result.
:param request: current request (optional)
:type request: :class:`kotti.request.Request`
:result: a node in the node tree
:rtype: :class:`~kotti.resources.Node` or descendant;
"""
return get_settings()["kotti.root_factory"][0](request)
[docs]class DefaultRootCache:
""" Default implementation for :func:`~kotti.resources.get_root` """
_root = None
# noinspection PyComparisonWithNone,PyPep8
@reify
def root_id(self) -> int:
"""Query for the one node without a parent and return its id.
:result: The root node's id.
:rtype: int
"""
query = bakery(
lambda session: session.query(Node)
.with_polymorphic(Node)
.add_columns(Node.id)
.enable_eagerloads(False)
.filter(Node.parent_id == None)
)
return query(DBSession()).one().id
[docs] def get_root(self) -> Node:
"""Query for the root node by its id. This enables SQLAlchemy's
session cache (query is executed only once per session).
:result: The root node.
:rtype: :class:`Node`.
"""
if self._root is None or inspect(self._root).detached:
self._root = (
DBSession.query(Node)
.with_polymorphic(Node)
.enable_eagerloads(False)
.enable_eagerloads("local_groups")
.filter(Node.id == self.root_id)
.one()
)
return self._root
def __call__(self, request: Optional[Request] = None) -> Node:
"""Default implementation for :func:`~kotti.resources.get_root`
:param request: Current request (optional)
:type request: :class:`kotti.request.Request`
:result: Node in the object tree that has no parent.
:rtype: :class:`~kotti.resources.Node` or descendant;
in a fresh Kotti site with Kotti's
:func:`default populator <kotti.populate.populate>` this will
be an instance of :class:`~kotti.resources.Document`.
"""
return self.get_root()
default_get_root = DefaultRootCache()
def _adjust_for_engine(engine: Engine) -> None:
if engine.dialect.name == "mysql": # pragma: no cover
# We disable the Node.path index for Mysql; in some conditions
# the index can't be created for columns even with 767 bytes,
# the maximum default size for column indexes
Node.__table__.indexes = {
index for index in Node.__table__.indexes if index.name != "ix_nodes_path"
}
def initialize_sql(engine: Engine, drop_all: bool = False) -> scoped_session:
DBSession.registry.clear()
DBSession.configure(bind=engine)
metadata.bind = engine
if drop_all or os.environ.get("KOTTI_TEST_DB_STRING"):
metadata.reflect()
metadata.drop_all(engine)
# Allow users of Kotti to cherry pick the tables that they want to use:
settings = _resolve_dotted(get_settings())
tables = settings["kotti.use_tables"].strip() or None
if tables:
tables = [metadata.tables[name] for name in tables.split()]
_adjust_for_engine(engine)
# Allow migrations to set the 'head' stamp in case the database is
# initialized freshly:
if not engine.table_names():
stamp_heads()
metadata.create_all(engine, tables=tables)
if os.environ.get("KOTTI_DISABLE_POPULATORS", "0") not in TRUE_VALUES:
for populate in settings["kotti.populators"]:
populate()
commit()
return DBSession