1from __future__ import annotations
2
3from datetime import datetime
4from typing import TYPE_CHECKING, Any
5
6from plain import models
7from plain.models import types
8from plain.runtime import settings
9from plain.utils import timezone
10
11from .params import extract_tracking_params
12
13if TYPE_CHECKING:
14 from plain.http import Request
15
16try:
17 from plain.auth import get_request_user
18except ImportError:
19 get_request_user: Any = None
20
21try:
22 from plain.sessions import get_request_session
23except ImportError:
24 get_request_session: Any = None
25
26try:
27 from plain.admin.impersonate import get_request_impersonator
28except ImportError:
29 get_request_impersonator: Any = None
30
31
32@models.register_model
33class Pageview(models.Model):
34 # A full URL can be thousands of characters, but MySQL has a 3072-byte limit
35 # on indexed columns (when using the default ``utf8mb4`` character set that
36 # stores up to 4 bytes per character). The ``url`` field is indexed below,
37 # so we keep the length at 768 characters (768 × 4 = 3072 bytes) to ensure
38 # the index can be created on all supported database backends.
39 url: str = types.URLField(max_length=768)
40 timestamp: datetime = types.DateTimeField(auto_now_add=True)
41
42 title: str = types.CharField(max_length=512, required=False)
43 # Referrers may not always be valid URLs (e.g. `android-app://...`).
44 # Use a plain CharField so we don't validate the scheme or format.
45 referrer: str = types.CharField(max_length=1024, required=False)
46
47 user_id: str = types.CharField(max_length=255, required=False)
48 session_id: str = types.CharField(max_length=255, required=False)
49
50 # Attribution tracking
51 source: str = types.CharField(max_length=200, required=False)
52 medium: str = types.CharField(max_length=200, required=False)
53 campaign: str = types.CharField(max_length=200, required=False)
54
55 query: models.QuerySet[Pageview] = models.QuerySet()
56
57 model_options = models.Options(
58 ordering=["-timestamp"],
59 indexes=[
60 models.Index(fields=["timestamp"]),
61 models.Index(fields=["user_id"]),
62 models.Index(fields=["session_id"]),
63 models.Index(fields=["url"]),
64 models.Index(fields=["source"]),
65 models.Index(fields=["medium"]),
66 ],
67 )
68
69 def __str__(self) -> str:
70 return self.url
71
72 @classmethod
73 def create_from_request(
74 cls,
75 request: Request,
76 *,
77 url: str | None = None,
78 title: str | None = None,
79 referrer: str | None = None,
80 timestamp: datetime | None = None,
81 source: str | None = None,
82 medium: str | None = None,
83 campaign: str | None = None,
84 ) -> Pageview | None:
85 """Create a pageview from a request object.
86
87 Args:
88 request: The HTTP request object
89 url: Page URL (defaults to request.build_absolute_uri())
90 title: Page title (defaults to empty string)
91 referrer: Referring URL (defaults to Referer header)
92 timestamp: Page visit time (defaults to current server time)
93 source: Traffic source (auto-extracted from URL if not provided)
94 medium: Traffic medium (auto-extracted from URL if not provided)
95 campaign: Campaign name (auto-extracted from URL if not provided)
96
97 Returns:
98 Pageview instance or None if user is being impersonated
99 """
100 if get_request_impersonator and get_request_impersonator(request):
101 return None
102
103 if url is None:
104 url = request.build_absolute_uri()
105
106 if title is None:
107 title = ""
108
109 if referrer is None:
110 referrer = request.headers.get("Referer", "")
111
112 if timestamp is None:
113 timestamp = timezone.now()
114
115 # Extract tracking parameters if not provided
116 if source is None or medium is None or campaign is None:
117 extracted_source, extracted_medium, extracted_campaign = (
118 extract_tracking_params(url)
119 )
120 if source is None:
121 source = extracted_source
122 if medium is None:
123 medium = extracted_medium
124 if campaign is None:
125 campaign = extracted_campaign
126
127 user = get_request_user(request) if get_request_user else None
128 user_id = user.id if user else ""
129
130 if get_request_session:
131 session = get_request_session(request)
132 else:
133 session = None
134
135 if session:
136 session_instance = session.model_instance
137 session_id = str(session_instance.id) if session_instance else ""
138
139 if settings.PAGEVIEWS_ASSOCIATE_ANONYMOUS_SESSIONS:
140 if not user_id:
141 if not session_id:
142 # Make sure we have a key to use
143 session.create()
144 session_instance = session.model_instance
145 session_id = (
146 str(session_instance.id) if session_instance else ""
147 )
148
149 # The user hasn't logged in yet but might later. When they do log in,
150 # the session key itself will be cycled (session fixation attacks),
151 # so we'll store the anonymous session id in the data which will be preserved
152 # when the key cycles, then remove it immediately after.
153 session["pageviews_anonymous_session_id"] = session_id
154 elif user_id and "pageviews_anonymous_session_id" in session:
155 # Associate the previously anonymous pageviews with the user
156 cls.query.filter(
157 user_id="",
158 session_id=session["pageviews_anonymous_session_id"],
159 ).update(user_id=user_id)
160
161 # Remove it so we don't keep trying to associate it
162 del session["pageviews_anonymous_session_id"]
163 else:
164 session_id = ""
165
166 return cls.query.create(
167 user_id=user_id,
168 session_id=session_id,
169 url=url,
170 title=title,
171 referrer=referrer,
172 timestamp=timestamp,
173 source=source,
174 medium=medium,
175 campaign=campaign,
176 )