1from __future__ import annotations
2
3from datetime import datetime
4from typing import TYPE_CHECKING, Any
5
6from plain import models
7from plain.runtime import settings
8from plain.utils import timezone
9
10from .params import extract_tracking_params
11
12if TYPE_CHECKING:
13 from plain.http import Request
14
15try:
16 from plain.auth import get_request_user
17except ImportError:
18 get_request_user: Any = None
19
20try:
21 from plain.sessions import get_request_session
22except ImportError:
23 get_request_session: Any = None
24
25
26@models.register_model
27class Pageview(models.Model):
28 # A full URL can be thousands of characters, but MySQL has a 3072-byte limit
29 # on indexed columns (when using the default ``utf8mb4`` character set that
30 # stores up to 4 bytes per character). The ``url`` field is indexed below,
31 # so we keep the length at 768 characters (768 × 4 = 3072 bytes) to ensure
32 # the index can be created on all supported database backends.
33 url = models.URLField(max_length=768)
34 timestamp = models.DateTimeField(auto_now_add=True)
35
36 title = models.CharField(max_length=512, required=False)
37 # Referrers may not always be valid URLs (e.g. `android-app://...`).
38 # Use a plain CharField so we don't validate the scheme or format.
39 referrer = models.CharField(max_length=1024, required=False)
40
41 user_id = models.CharField(max_length=255, required=False)
42 session_id = models.CharField(max_length=255, required=False)
43
44 # Attribution tracking
45 source = models.CharField(max_length=200, required=False)
46 medium = models.CharField(max_length=200, required=False)
47 campaign = models.CharField(max_length=200, required=False)
48
49 model_options = models.Options(
50 ordering=["-timestamp"],
51 indexes=[
52 models.Index(fields=["timestamp"]),
53 models.Index(fields=["user_id"]),
54 models.Index(fields=["session_id"]),
55 models.Index(fields=["url"]),
56 models.Index(fields=["source"]),
57 models.Index(fields=["medium"]),
58 ],
59 )
60
61 def __str__(self) -> str:
62 return self.url
63
64 @classmethod
65 def create_from_request(
66 cls,
67 request: Request,
68 *,
69 url: str | None = None,
70 title: str | None = None,
71 referrer: str | None = None,
72 timestamp: datetime | None = None,
73 source: str | None = None,
74 medium: str | None = None,
75 campaign: str | None = None,
76 ) -> Pageview | None:
77 """Create a pageview from a request object.
78
79 Args:
80 request: The HTTP request object
81 url: Page URL (defaults to request.build_absolute_uri())
82 title: Page title (defaults to empty string)
83 referrer: Referring URL (defaults to Referer header)
84 timestamp: Page visit time (defaults to current server time)
85 source: Traffic source (auto-extracted from URL if not provided)
86 medium: Traffic medium (auto-extracted from URL if not provided)
87 campaign: Campaign name (auto-extracted from URL if not provided)
88
89 Returns:
90 Pageview instance or None if user is being impersonated
91 """
92 if getattr(request, "impersonator", None):
93 return None
94
95 if url is None:
96 url = request.build_absolute_uri()
97
98 if title is None:
99 title = ""
100
101 if referrer is None:
102 referrer = request.headers.get("Referer", "")
103
104 if timestamp is None:
105 timestamp = timezone.now()
106
107 # Extract tracking parameters if not provided
108 if source is None or medium is None or campaign is None:
109 extracted_source, extracted_medium, extracted_campaign = (
110 extract_tracking_params(url)
111 )
112 if source is None:
113 source = extracted_source
114 if medium is None:
115 medium = extracted_medium
116 if campaign is None:
117 campaign = extracted_campaign
118
119 user = get_request_user(request) if get_request_user else None
120 user_id = user.id if user else ""
121
122 if get_request_session:
123 session = get_request_session(request)
124 else:
125 session = None
126
127 if session:
128 session_instance = session.model_instance
129 session_id = str(session_instance.id) if session_instance else ""
130
131 if settings.PAGEVIEWS_ASSOCIATE_ANONYMOUS_SESSIONS:
132 if not user_id:
133 if not session_id:
134 # Make sure we have a key to use
135 session.create()
136 session_instance = session.model_instance
137 session_id = (
138 str(session_instance.id) if session_instance else ""
139 )
140
141 # The user hasn't logged in yet but might later. When they do log in,
142 # the session key itself will be cycled (session fixation attacks),
143 # so we'll store the anonymous session id in the data which will be preserved
144 # when the key cycles, then remove it immediately after.
145 session["pageviews_anonymous_session_id"] = session_id
146 elif user_id and "pageviews_anonymous_session_id" in session:
147 # Associate the previously anonymous pageviews with the user
148 cls.query.filter(
149 user_id="",
150 session_id=session["pageviews_anonymous_session_id"],
151 ).update(user_id=user_id)
152
153 # Remove it so we don't keep trying to associate it
154 del session["pageviews_anonymous_session_id"]
155 else:
156 session_id = ""
157
158 return cls.query.create(
159 user_id=user_id,
160 session_id=session_id,
161 url=url,
162 title=title,
163 referrer=referrer,
164 timestamp=timestamp,
165 source=source,
166 medium=medium,
167 campaign=campaign,
168 )