Source code for kotti.security

from __future__ import with_statement
from contextlib import contextmanager
from datetime import datetime
from UserDict import DictMixin

import bcrypt
from sqlalchemy import Boolean
from sqlalchemy import Column
from sqlalchemy import DateTime
from sqlalchemy import Integer
from sqlalchemy import Unicode
from sqlalchemy import func
from sqlalchemy.sql.expression import and_
from sqlalchemy.sql.expression import or_
from sqlalchemy.orm.exc import NoResultFound
from pyramid.location import lineage
from pyramid.security import view_execution_permitted
from zope.deprecation.deprecation import deprecated

from kotti import get_settings
from kotti import DBSession
from kotti import Base
from kotti.sqla import MutationList
from kotti.sqla import JsonType
from kotti.util import _
from kotti.util import request_cache
from kotti.util import DontCache


def get_principals():
    return get_settings()['kotti.principals_factory'][0]()


@request_cache(lambda request: None)
def get_user(request):
    userid = request.unauthenticated_userid
    return get_principals().get(userid)


[docs]def has_permission(permission, context, request): """ Check if the current request has a permission on the given context. .. deprecated:: 0.9 :param permission: permission to check for :type permission: str :param context: context that should be checked for the given permission :type context: :class:``kotti.resources.Node`` :param request: current request :type request: :class:`kotti.request.Request` :result: ``True`` if request has the permission, ``False`` else :rtype: bool """ return request.has_permission(permission, context)
deprecated(u'has_permission', u"kotti.security.has_permission is deprecated as of Kotti 1.0 and " u"will be no longer available starting with Kotti 2.0. " u"Please use the has_permission method of request instead.")
[docs]class Principal(Base): """A minimal 'Principal' implementation. The attributes on this object correspond to what one ought to implement to get full support by the system. You're free to add additional attributes. - As convenience, when passing 'password' in the initializer, it is hashed using 'get_principals().hash_password' - The boolean 'active' attribute defines whether a principal may log in. This allows the deactivation of accounts without deleting them. - The 'confirm_token' attribute is set whenever a user has forgotten their password. This token is used to identify the receiver of the email. This attribute should be set to 'None' once confirmation has succeeded. """ id = Column(Integer, primary_key=True) name = Column(Unicode(100), unique=True) password = Column(Unicode(100)) active = Column(Boolean) confirm_token = Column(Unicode(100)) title = Column(Unicode(100), nullable=False) email = Column(Unicode(100), unique=True) groups = Column(MutationList.as_mutable(JsonType), nullable=False) creation_date = Column(DateTime(), nullable=False) last_login_date = Column(DateTime()) __tablename__ = 'principals' __mapper_args__ = dict( order_by=name, ) def __init__(self, name, password=None, active=True, confirm_token=None, title=u"", email=None, groups=None): self.name = name if password is not None: password = get_principals().hash_password(password) self.password = password self.active = active self.confirm_token = confirm_token self.title = title self.email = email if groups is None: groups = [] self.groups = groups self.creation_date = datetime.now() self.last_login_date = None def __repr__(self): # pragma: no cover return u'<Principal {0!r}>'.format(self.name)
[docs]class AbstractPrincipals(object): """This class serves as documentation and defines what methods are expected from a Principals database. Principals mostly provides dict-like access to the principal objects in the database. In addition, there's the 'search' method which allows searching users and groups. 'hash_password' is for initial hashing of a clear text password, while 'validate_password' is used by the login to see if the entered password matches the hashed password that's already in the database. Use the 'kotti.principals' settings variable to override Kotti's default Principals implementation with your own. """ def __getitem__(self, name): """Return the Principal object with the id 'name'. """ def __setitem__(self, name, principal): """Add a given Principal object to the database. 'name' is expected to the the same as 'principal.name'. 'principal' may also be a dict of attributes. """ def __delitem__(self, name): """Remove the principal with the given name from the database. """
[docs] def keys(self): """Return a list of principal ids that are in the database. """
[docs] def search(self, **kwargs): """Return an iterable with principal objects that correspond to the search arguments passed in. This example would return all principals with the id 'bob': get_principals().search(name=u'bob') Here, we ask for all principals that have 'bob' in either their 'name' or their 'title'. We pass '*bob*' instead of 'bob' to indicate that we want case-insensitive substring matching: get_principals().search(name=u'*bob*', title=u'*bob*') This call should fail with AttributeError unless there's a 'foo' attribute on principal objects that supports search: get_principals().search(name=u'bob', foo=u'bar') """
[docs] def hash_password(self, password): """Return a hash of the given password. This is what's stored in the database as 'principal.password'. """
[docs] def validate_password(self, clear, hashed): """Returns True if the clear text password matches the hash. """
ROLES = { u'role:viewer': Principal(u'role:viewer', title=_(u'Viewer')), u'role:editor': Principal(u'role:editor', title=_(u'Editor')), u'role:owner': Principal(u'role:owner', title=_(u'Owner')), u'role:admin': Principal(u'role:admin', title=_(u'Admin')), } _DEFAULT_ROLES = ROLES.copy() # These roles are visible in the sharing tab SHARING_ROLES = [u'role:viewer', u'role:editor', u'role:owner'] USER_MANAGEMENT_ROLES = SHARING_ROLES + ['role:admin'] _DEFAULT_SHARING_ROLES = SHARING_ROLES[:] _DEFAULT_USER_MANAGEMENT_ROLES = USER_MANAGEMENT_ROLES[:] # This is the ACL that gets set on the site root on creation. Note # that this is only really useful if you're _not_ using workflow. If # you are, then you should look at the permissions in workflow.zcml. SITE_ACL = [ ['Allow', 'system.Everyone', ['view']], ['Allow', 'role:viewer', ['view']], ['Allow', 'role:editor', ['view', 'add', 'edit', 'state_change']], ['Allow', 'role:owner', ['view', 'add', 'edit', 'manage', 'state_change']], ] def set_roles(roles_dict): ROLES.clear() ROLES.update(roles_dict) def set_sharing_roles(role_names): SHARING_ROLES[:] = role_names def set_user_management_roles(role_names): USER_MANAGEMENT_ROLES[:] = role_names def reset_roles(): ROLES.clear() ROLES.update(_DEFAULT_ROLES) def reset_sharing_roles(): SHARING_ROLES[:] = _DEFAULT_SHARING_ROLES def reset_user_management_roles(): USER_MANAGEMENT_ROLES[:] = _DEFAULT_USER_MANAGEMENT_ROLES def reset(): reset_roles() reset_sharing_roles() reset_user_management_roles() class PersistentACLMixin(object): def _get_acl(self): if self._acl is None: raise AttributeError('__acl__') return self._acl def _set_acl(self, value): self._acl = value def _del_acl(self): self._acl = None __acl__ = property(_get_acl, _set_acl, _del_acl) def _cachekey_list_groups_raw(name, context): context_id = context is not None and getattr(context, 'id', id(context)) return (name, context_id) @request_cache(_cachekey_list_groups_raw) def list_groups_raw(name, context): """A set of group names in given ``context`` for ``name``. Only groups defined in context will be considered, therefore no global or inherited groups are returned. """ from kotti.resources import Node if isinstance(context, Node): return set( r.group_name for r in context.local_groups if r.principal_name == name ) return set()
[docs]def list_groups(name, context=None): """List groups for principal with a given ``name``. The optional ``context`` argument may be passed to check the list of groups in a given context. """ return list_groups_ext(name, context)[0]
def _cachekey_list_groups_ext(name, context=None, _seen=None, _inherited=None): if _seen is not None or _inherited is not None: raise DontCache else: context_id = getattr(context, 'id', id(context)) return (unicode(name), context_id) @request_cache(_cachekey_list_groups_ext) def list_groups_ext(name, context=None, _seen=None, _inherited=None): name = unicode(name) groups = set() recursing = _inherited is not None _inherited = _inherited or set() # Add groups from principal db: principal = get_principals().get(name) if principal is not None: groups.update(principal.groups) if context is not None or (context is None and _seen is not None): _inherited.update(principal.groups) if _seen is None: _seen = set([name]) # Add local groups: if context is not None: items = lineage(context) for idx, item in enumerate(items): group_names = [i for i in list_groups_raw(name, item) if i not in _seen] groups.update(group_names) if recursing or idx != 0: _inherited.update(group_names) new_groups = groups - _seen _seen.update(new_groups) for group_name in new_groups: g, i = list_groups_ext( group_name, context, _seen=_seen, _inherited=_inherited) groups.update(g) _inherited.update(i) return list(groups), list(_inherited)
[docs]def set_groups(name, context, groups_to_set=()): """Set the list of groups for principal with given ``name`` and in given ``context``. """ from kotti.resources import LocalGroup name = unicode(name) context.local_groups = [ # keep groups for "other" principals lg for lg in context.local_groups if lg.principal_name != name ] + [ # reset groups for given principal LocalGroup(context, name, unicode(group_name)) for group_name in groups_to_set ]
[docs]def list_groups_callback(name, request): """ List the groups for the principal identified by ``name``. Consider ``authz_context`` to support assigment of local roles to groups. """ if not is_user(name): return None # Disallow logging in with groups if name in get_principals(): context = request.environ.get( 'authz_context', getattr(request, 'context', None)) if context is None: # SA events don't have request.context available from kotti.resources import get_root context = get_root(request) return list_groups(name, context)
@contextmanager def authz_context(context, request): before = request.environ.pop('authz_context', None) request.environ['authz_context'] = context try: yield finally: del request.environ['authz_context'] if before is not None: request.environ['authz_context'] = before @contextmanager def request_method(request, method): before = request.method request.method = method try: yield finally: request.method = before def view_permitted(context, request, name='', method='GET'): with authz_context(context, request): with request_method(request, method): return view_execution_permitted(context, request, name)
[docs]def principals_with_local_roles(context, inherit=True): """Return a list of principal names that have local roles in the context. """ principals = set() items = [context] if inherit: items = lineage(context) for item in items: principals.update( r.principal_name for r in item.local_groups if not r.principal_name.startswith('role:') ) return list(principals)
def map_principals_with_local_roles(context): principals = get_principals() value = [] for principal_name in principals_with_local_roles(context): try: principal = principals[principal_name] except KeyError: continue else: all, inherited = list_groups_ext(principal_name, context) value.append((principal, (all, inherited))) return sorted(value, key=lambda t: t[0].name) def is_user(principal): if not isinstance(principal, basestring): principal = principal.name return ':' not in principal
[docs]class Principals(DictMixin): """Kotti's default principal database. Look at 'AbstractPrincipals' for documentation. This is a default implementation that may be replaced by using the 'kotti.principals' settings variable. """ factory = Principal @request_cache(lambda self, name: unicode(name)) def __getitem__(self, name): name = unicode(name) # avoid calls to the DB for roles # (they're not stored in the ``principals`` table) if name.startswith('role:'): raise KeyError(name) try: return DBSession.query( self.factory).filter(self.factory.name == name).one() except NoResultFound: raise KeyError(name) def __setitem__(self, name, principal): name = unicode(name) if isinstance(principal, dict): principal = self.factory(**principal) DBSession.add(principal) def __delitem__(self, name): name = unicode(name) try: principal = DBSession.query( self.factory).filter(self.factory.name == name).one() DBSession.delete(principal) except NoResultFound: raise KeyError(name) def iterkeys(self): for (principal_name,) in DBSession.query(self.factory.name): yield principal_name def keys(self): return list(self.iterkeys())
[docs] def search(self, match='any', **kwargs): """ Search the principal database. :param match: ``any`` to return all principals matching any search param, ``all`` to return only principals matching all params :type match: str :param kwargs: Search conditions, e.g. ``name='bob', active=True``. :type kwargs: varying. :result: SQLAlchemy query object :rtype: :class:`sqlalchemy.orm.query.Query`` """ if not kwargs: return [] filters = [] for key, value in kwargs.items(): col = getattr(self.factory, key) if isinstance(value, basestring) and '*' in value: value = value.replace('*', '%').lower() filters.append(func.lower(col).like(value)) else: filters.append(col == value) query = DBSession.query(self.factory) if match == 'any': query = query.filter(or_(*filters)) elif match == 'all': query = query.filter(and_(*filters)) else: raise ValueError('match must be either "any" or "all".') return query
log_rounds = 10 def hash_password(self, password, hashed=None): if hashed is None: hashed = bcrypt.gensalt(self.log_rounds) return unicode( bcrypt.hashpw(password.encode('utf-8'), hashed.encode('utf-8'))) def validate_password(self, clear, hashed): try: return self.hash_password(clear, hashed) == hashed except ValueError: return False
def principals_factory(): return Principals()