Plain is headed towards 1.0! Subscribe for development updates →

  1import hmac
  2
  3from plain.exceptions import ImproperlyConfigured
  4from plain.models import models_registry
  5from plain.runtime import settings
  6from plain.utils.crypto import salted_hmac
  7from plain.utils.encoding import force_bytes
  8
  9USER_ID_SESSION_KEY = "_auth_user_id"
 10USER_HASH_SESSION_KEY = "_auth_user_hash"
 11
 12
 13def get_session_auth_hash(user):
 14    """
 15    Return an HMAC of the password field.
 16    """
 17    return _get_session_auth_hash(user)
 18
 19
 20def update_session_auth_hash(request, user):
 21    """
 22    Updating a user's password (for example) logs out all sessions for the user.
 23
 24    Take the current request and the updated user object from which the new
 25    session hash will be derived and update the session hash appropriately to
 26    prevent a password change from logging out the session from which the
 27    password was changed.
 28    """
 29    request.session.cycle_key()
 30    if request.user == user:
 31        request.session[USER_HASH_SESSION_KEY] = get_session_auth_hash(user)
 32
 33
 34def get_session_auth_fallback_hash(user):
 35    for fallback_secret in settings.SECRET_KEY_FALLBACKS:
 36        yield _get_session_auth_hash(user, secret=fallback_secret)
 37
 38
 39def _get_session_auth_hash(user, secret=None):
 40    key_salt = "plain.auth.get_session_auth_hash"
 41    return salted_hmac(
 42        key_salt,
 43        getattr(user, settings.AUTH_USER_SESSION_HASH_FIELD),
 44        secret=secret,
 45        algorithm="sha256",
 46    ).hexdigest()
 47
 48
 49def login(request, user):
 50    """
 51    Persist a user id and a backend in the request. This way a user doesn't
 52    have to reauthenticate on every request. Note that data set during
 53    the anonymous session is retained when the user logs in.
 54    """
 55    if settings.AUTH_USER_SESSION_HASH_FIELD:
 56        session_auth_hash = get_session_auth_hash(user)
 57    else:
 58        session_auth_hash = ""
 59
 60    if USER_ID_SESSION_KEY in request.session:
 61        if int(request.session[USER_ID_SESSION_KEY]) != user.id:
 62            # To avoid reusing another user's session, create a new, empty
 63            # session if the existing session corresponds to a different
 64            # authenticated user.
 65            request.session.flush()
 66        elif session_auth_hash and not hmac.compare_digest(
 67            force_bytes(request.session.get(USER_HASH_SESSION_KEY, "")),
 68            force_bytes(session_auth_hash),
 69        ):
 70            # If the session hash does not match the current hash, reset the
 71            # session. Most likely this means the password was changed.
 72            request.session.flush()
 73    else:
 74        # Invalidate the current session key and generate a new one to enhance security,
 75        # typically done after user login to prevent session fixation attacks.
 76        request.session.cycle_key()
 77
 78    request.session[USER_ID_SESSION_KEY] = user.id
 79    request.session[USER_HASH_SESSION_KEY] = session_auth_hash
 80    if hasattr(request, "user"):
 81        request.user = user
 82
 83
 84def logout(request):
 85    """
 86    Remove the authenticated user's ID from the request and flush their session
 87    data.
 88    """
 89    # Dispatch the signal before the user is logged out so the receivers have a
 90    # chance to find out *who* logged out.
 91    request.session.flush()
 92    if hasattr(request, "user"):
 93        request.user = None
 94
 95
 96def get_user_model():
 97    """
 98    Return the User model that is active in this project.
 99    """
100    try:
101        return models_registry.get_model(settings.AUTH_USER_MODEL, require_ready=False)
102    except ValueError:
103        raise ImproperlyConfigured(
104            "AUTH_USER_MODEL must be of the form 'package_label.model_name'"
105        )
106    except LookupError:
107        raise ImproperlyConfigured(
108            f"AUTH_USER_MODEL refers to model '{settings.AUTH_USER_MODEL}' that has not been installed"
109        )
110
111
112def get_user(request):
113    """
114    Return the user model instance associated with the given request session.
115    If no user is retrieved, return None.
116    """
117    if USER_ID_SESSION_KEY not in request.session:
118        return None
119
120    UserModel = get_user_model()
121    try:
122        user = UserModel.query.get(id=request.session[USER_ID_SESSION_KEY])
123    except UserModel.DoesNotExist:
124        return None
125
126    # If the user models defines a specific field to also hash and compare
127    # (like password), then we verify that the hash of that field is still
128    # the same as when the session was created.
129    #
130    # If it has changed (i.e. password changed), then the session
131    # is no longer valid and cleared out.
132    if settings.AUTH_USER_SESSION_HASH_FIELD:
133        session_hash = request.session.get(USER_HASH_SESSION_KEY)
134        if not session_hash:
135            session_hash_verified = False
136        else:
137            session_auth_hash = get_session_auth_hash(user)
138            session_hash_verified = hmac.compare_digest(
139                force_bytes(session_hash), force_bytes(session_auth_hash)
140            )
141        if not session_hash_verified:
142            # If the current secret does not verify the session, try
143            # with the fallback secrets and stop when a matching one is
144            # found.
145            if session_hash and any(
146                hmac.compare_digest(
147                    force_bytes(session_hash), force_bytes(fallback_auth_hash)
148                )
149                for fallback_auth_hash in get_session_auth_fallback_hash(user)
150            ):
151                request.session.cycle_key()
152                request.session[USER_HASH_SESSION_KEY] = session_auth_hash
153            else:
154                request.session.flush()
155                user = None
156
157    return user