1from __future__ import annotations
2
3from datetime import datetime
4from typing import TYPE_CHECKING, Any
5
6from plain import postgres
7from plain.postgres import types
8from plain.runtime import settings
9from plain.utils import timezone
10
11from .params import extract_tracking_params
12
13if TYPE_CHECKING:
14 from plain.http import Request
15
16try:
17 from plain.auth import get_request_user
18except ImportError:
19 get_request_user: Any = None
20
21try:
22 from plain.sessions import get_request_session
23except ImportError:
24 get_request_session: Any = None
25
26try:
27 from plain.admin.impersonate import get_request_impersonator
28except ImportError:
29 get_request_impersonator: Any = None
30
31__all__ = ["Pageview"]
32
33
34@postgres.register_model
35class Pageview(postgres.Model):
36 url: str = types.URLField(max_length=2048)
37 timestamp: datetime = types.DateTimeField(create_now=True)
38
39 title: str = types.TextField(max_length=512, required=False)
40 # Referrers may not always be valid URLs (e.g. `android-app://...`).
41 # Use a plain TextField so we don't validate the scheme or format.
42 referrer: str = types.TextField(max_length=1024, required=False)
43
44 user_id: str = types.TextField(max_length=255, required=False)
45 session_id: str = types.TextField(max_length=255, required=False)
46
47 # Attribution tracking
48 source: str = types.TextField(max_length=200, required=False)
49 medium: str = types.TextField(max_length=200, required=False)
50 campaign: str = types.TextField(max_length=200, required=False)
51
52 query: postgres.QuerySet[Pageview] = postgres.QuerySet()
53
54 model_options = postgres.Options(
55 ordering=["-timestamp"],
56 indexes=[
57 postgres.Index(
58 name="plainpageviews_pageview_timestamp_idx", fields=["timestamp"]
59 ),
60 postgres.Index(
61 name="plainpageviews_pageview_user_id_idx", fields=["user_id"]
62 ),
63 postgres.Index(
64 name="plainpageviews_pageview_session_id_idx", fields=["session_id"]
65 ),
66 postgres.Index(name="plainpageviews_pageview_url_idx", fields=["url"]),
67 postgres.Index(
68 name="plainpageviews_pageview_source_idx", fields=["source"]
69 ),
70 postgres.Index(
71 name="plainpageviews_pageview_medium_idx", fields=["medium"]
72 ),
73 ],
74 )
75
76 def __str__(self) -> str:
77 return self.url
78
79 @classmethod
80 def create_from_request(
81 cls,
82 request: Request,
83 *,
84 url: str | None = None,
85 title: str | None = None,
86 referrer: str | None = None,
87 timestamp: datetime | None = None,
88 source: str | None = None,
89 medium: str | None = None,
90 campaign: str | None = None,
91 ) -> Pageview | None:
92 """Create a pageview from a request object.
93
94 Args:
95 request: The HTTP request object
96 url: Page URL (defaults to request.build_absolute_uri())
97 title: Page title (defaults to empty string)
98 referrer: Referring URL (defaults to Referer header)
99 timestamp: Page visit time (defaults to current server time)
100 source: Traffic source (auto-extracted from URL if not provided)
101 medium: Traffic medium (auto-extracted from URL if not provided)
102 campaign: Campaign name (auto-extracted from URL if not provided)
103
104 Returns:
105 Pageview instance or None if user is being impersonated
106 """
107 if get_request_impersonator and get_request_impersonator(request):
108 return None
109
110 if url is None:
111 url = request.build_absolute_uri()
112
113 if title is None:
114 title = ""
115
116 if referrer is None:
117 referrer = request.headers.get("Referer", "")
118
119 if timestamp is None:
120 timestamp = timezone.now()
121
122 # Extract tracking parameters if not provided
123 if source is None or medium is None or campaign is None:
124 extracted_source, extracted_medium, extracted_campaign = (
125 extract_tracking_params(url)
126 )
127 if source is None:
128 source = extracted_source
129 if medium is None:
130 medium = extracted_medium
131 if campaign is None:
132 campaign = extracted_campaign
133
134 user = get_request_user(request) if get_request_user else None
135 user_id = user.id if user else ""
136
137 if get_request_session:
138 session = get_request_session(request)
139 else:
140 session = None
141
142 if session:
143 session_instance = session.model_instance
144 session_id = str(session_instance.id) if session_instance else ""
145
146 if settings.PAGEVIEWS_ASSOCIATE_ANONYMOUS_SESSIONS:
147 if not user_id:
148 if not session_id:
149 # Make sure we have a key to use
150 session.create()
151 session_instance = session.model_instance
152 session_id = (
153 str(session_instance.id) if session_instance else ""
154 )
155
156 # The user hasn't logged in yet but might later. When they do log in,
157 # the session key itself will be cycled (session fixation attacks),
158 # so we'll store the anonymous session id in the data which will be preserved
159 # when the key cycles, then remove it immediately after.
160 session["pageviews_anonymous_session_id"] = session_id
161 elif user_id and "pageviews_anonymous_session_id" in session:
162 # Associate the previously anonymous pageviews with the user
163 cls.query.filter(
164 user_id="",
165 session_id=session["pageviews_anonymous_session_id"],
166 ).update(user_id=user_id)
167
168 # Remove it so we don't keep trying to associate it
169 del session["pageviews_anonymous_session_id"]
170 else:
171 session_id = ""
172
173 return cls.query.create(
174 user_id=user_id,
175 session_id=session_id,
176 url=url,
177 title=title,
178 referrer=referrer,
179 timestamp=timestamp,
180 source=source,
181 medium=medium,
182 campaign=campaign,
183 )