1from __future__ import annotations
2
3import string
4from collections.abc import Iterator, MutableMapping
5from datetime import timedelta
6from typing import Any
7
8from plain.models import transaction
9from plain.runtime import settings
10from plain.utils import timezone
11from plain.utils.crypto import get_random_string
12
13
14class SessionStore(MutableMapping):
15 """
16 The actual session object that gets attached to a request,
17 backed by the underlying Session model for the storage.
18 """
19
20 def __init__(self, session_key: str | None = None) -> None:
21 self.session_key = session_key
22 self.accessed = False
23 self.modified = False
24 self._session_cache: dict | None = None
25 self._session_instance = None
26
27 # Lazy import
28 from .models import Session
29
30 self._model = Session
31
32 def __contains__(self, key: object) -> bool:
33 return key in self._session
34
35 def __getitem__(self, key: str) -> Any:
36 return self._session[key]
37
38 def __setitem__(self, key: str, value: Any) -> None:
39 self._session[key] = value
40 self.modified = True
41
42 def __delitem__(self, key: str) -> None:
43 del self._session[key]
44 self.modified = True
45
46 def __iter__(self) -> Iterator[str]:
47 return iter(self._session)
48
49 def __len__(self) -> int:
50 return len(self._session)
51
52 def clear(self) -> None:
53 # To avoid unnecessary persistent storage accesses, we set up the
54 # internals directly (loading data wastes time, since we are going to
55 # set it to an empty dict anyway).
56 self._session_cache = {}
57 self._session_instance = None
58 self.accessed = True
59 self.modified = True
60
61 def is_empty(self) -> bool:
62 "Return True when there is no session_key and the session is empty."
63 return not self.session_key and not self._session_cache
64
65 def _get_new_session_key(self) -> str:
66 "Return session key that isn't being used."
67 while True:
68 session_key = get_random_string(32, string.ascii_lowercase + string.digits)
69 if not self._model.query.filter(session_key=session_key).exists():
70 return session_key
71
72 def _get_session_data(self, no_load: bool = False) -> dict:
73 """
74 Lazily load session from storage (unless "no_load" is True, when only
75 an empty dict is stored) and store it in the current instance.
76 """
77 self.accessed = True
78
79 # If the cache has already been populated (even with an empty dict),
80 # simply return it.
81 if self._session_cache is not None:
82 return self._session_cache
83
84 # The cache hasn't been populated yet so either initialise it to an
85 # empty dictionary (when "no_load" is True or there is no session
86 # key) or fetch the data from the database.
87 if self.session_key is None or no_load:
88 self._session_cache = {}
89 return self._session_cache
90
91 try:
92 session = self._model.query.get(
93 session_key=self.session_key, expires_at__gt=timezone.now()
94 )
95 self._session_instance = session
96 self._session_cache = session.session_data
97 return self._session_cache
98 except self._model.DoesNotExist:
99 self.session_key = None
100 self._session_instance = None
101 self._session_cache = {}
102 return self._session_cache
103
104 @property
105 def _session(self) -> dict:
106 """
107 Property to access the session data, ensuring it is loaded.
108 """
109 return self._get_session_data()
110
111 @property
112 def model_instance(self) -> Any:
113 """
114 Return the underlying Session model instance, or None if no session exists.
115 """
116 if self._session_instance is not None:
117 return self._session_instance
118
119 # Trigger loading of session data which will populate _session_instance
120 self._get_session_data()
121 return self._session_instance
122
123 def flush(self) -> None:
124 """
125 Remove the current session data from the database and regenerate the
126 key.
127 """
128 self.clear()
129 try:
130 self._model.query.get(session_key=self.session_key).delete()
131 except self._model.DoesNotExist:
132 pass
133 self.session_key = None
134 self._session_instance = None
135
136 def cycle_key(self) -> None:
137 """
138 Create a new session key, while retaining the current session data.
139 """
140 data = self._session
141 key = self.session_key
142 self.create()
143 self._session_cache = data
144 if key:
145 try:
146 self._model.query.get(session_key=key).delete()
147 except self._model.DoesNotExist:
148 pass
149
150 def create(self) -> None:
151 self.session_key = self._get_new_session_key()
152 data = self._get_session_data(no_load=True)
153 with transaction.atomic():
154 self._session_instance = self._model.query.create(
155 session_key=self.session_key,
156 session_data=data,
157 expires_at=timezone.now()
158 + timedelta(seconds=settings.SESSION_COOKIE_AGE),
159 )
160 self.modified = True
161
162 def save(self) -> None:
163 """
164 Save the current session data to the database using update_or_create.
165 """
166 data = self._get_session_data(no_load=False)
167
168 with transaction.atomic():
169 if self.session_key is None:
170 self.session_key = self._get_new_session_key()
171
172 self._session_instance, created = self._model.query.update_or_create(
173 session_key=self.session_key,
174 defaults={
175 "session_data": data,
176 "expires_at": timezone.now()
177 + timedelta(seconds=settings.SESSION_COOKIE_AGE),
178 },
179 )
180
181 if created:
182 self.modified = True