Plain is headed towards 1.0! Subscribe for development updates →

  1from __future__ import annotations
  2
  3import string
  4from collections.abc import Iterator, MutableMapping
  5from datetime import timedelta
  6from typing import Any
  7
  8from plain.models import transaction
  9from plain.runtime import settings
 10from plain.utils import timezone
 11from plain.utils.crypto import get_random_string
 12
 13
 14class SessionStore(MutableMapping):
 15    """
 16    The actual session object that gets attached to a request,
 17    backed by the underlying Session model for the storage.
 18    """
 19
 20    def __init__(self, session_key: str | None = None) -> None:
 21        self.session_key = session_key
 22        self.accessed = False
 23        self.modified = False
 24        self._session_cache: dict | None = None
 25        self._session_instance = None
 26
 27        # Lazy import
 28        from .models import Session
 29
 30        self._model = Session
 31
 32    def __contains__(self, key: object) -> bool:
 33        return key in self._session
 34
 35    def __getitem__(self, key: str) -> Any:
 36        return self._session[key]
 37
 38    def __setitem__(self, key: str, value: Any) -> None:
 39        self._session[key] = value
 40        self.modified = True
 41
 42    def __delitem__(self, key: str) -> None:
 43        del self._session[key]
 44        self.modified = True
 45
 46    def __iter__(self) -> Iterator[str]:
 47        return iter(self._session)
 48
 49    def __len__(self) -> int:
 50        return len(self._session)
 51
 52    def clear(self) -> None:
 53        # To avoid unnecessary persistent storage accesses, we set up the
 54        # internals directly (loading data wastes time, since we are going to
 55        # set it to an empty dict anyway).
 56        self._session_cache = {}
 57        self._session_instance = None
 58        self.accessed = True
 59        self.modified = True
 60
 61    def is_empty(self) -> bool:
 62        "Return True when there is no session_key and the session is empty."
 63        return not self.session_key and not self._session_cache
 64
 65    def _get_new_session_key(self) -> str:
 66        "Return session key that isn't being used."
 67        while True:
 68            session_key = get_random_string(32, string.ascii_lowercase + string.digits)
 69            if not self._model.query.filter(session_key=session_key).exists():
 70                return session_key
 71
 72    def _get_session_data(self, no_load: bool = False) -> dict:
 73        """
 74        Lazily load session from storage (unless "no_load" is True, when only
 75        an empty dict is stored) and store it in the current instance.
 76        """
 77        self.accessed = True
 78
 79        # If the cache has already been populated (even with an empty dict),
 80        # simply return it.
 81        if self._session_cache is not None:
 82            return self._session_cache
 83
 84        # The cache hasn't been populated yet so either initialise it to an
 85        # empty dictionary (when "no_load" is True or there is no session
 86        # key) or fetch the data from the database.
 87        if self.session_key is None or no_load:
 88            self._session_cache = {}
 89            return self._session_cache
 90
 91        try:
 92            session = self._model.query.get(
 93                session_key=self.session_key, expires_at__gt=timezone.now()
 94            )
 95            self._session_instance = session
 96            self._session_cache = session.session_data
 97            return self._session_cache
 98        except self._model.DoesNotExist:
 99            self.session_key = None
100            self._session_instance = None
101            self._session_cache = {}
102            return self._session_cache
103
104    @property
105    def _session(self) -> dict:
106        """
107        Property to access the session data, ensuring it is loaded.
108        """
109        return self._get_session_data()
110
111    @property
112    def model_instance(self) -> Any:
113        """
114        Return the underlying Session model instance, or None if no session exists.
115        """
116        if self._session_instance is not None:
117            return self._session_instance
118
119        # Trigger loading of session data which will populate _session_instance
120        self._get_session_data()
121        return self._session_instance
122
123    def flush(self) -> None:
124        """
125        Remove the current session data from the database and regenerate the
126        key.
127        """
128        self.clear()
129        try:
130            self._model.query.get(session_key=self.session_key).delete()
131        except self._model.DoesNotExist:
132            pass
133        self.session_key = None
134        self._session_instance = None
135
136    def cycle_key(self) -> None:
137        """
138        Create a new session key, while retaining the current session data.
139        """
140        data = self._session
141        key = self.session_key
142        self.create()
143        self._session_cache = data
144        if key:
145            try:
146                self._model.query.get(session_key=key).delete()
147            except self._model.DoesNotExist:
148                pass
149
150    def create(self) -> None:
151        self.session_key = self._get_new_session_key()
152        data = self._get_session_data(no_load=True)
153        with transaction.atomic():
154            self._session_instance = self._model.query.create(
155                session_key=self.session_key,
156                session_data=data,
157                expires_at=timezone.now()
158                + timedelta(seconds=settings.SESSION_COOKIE_AGE),
159            )
160        self.modified = True
161
162    def save(self) -> None:
163        """
164        Save the current session data to the database using update_or_create.
165        """
166        data = self._get_session_data(no_load=False)
167
168        with transaction.atomic():
169            if self.session_key is None:
170                self.session_key = self._get_new_session_key()
171
172            self._session_instance, created = self._model.query.update_or_create(
173                session_key=self.session_key,
174                defaults={
175                    "session_data": data,
176                    "expires_at": timezone.now()
177                    + timedelta(seconds=settings.SESSION_COOKIE_AGE),
178                },
179            )
180
181        if created:
182            self.modified = True