1from __future__ import annotations
2
3from datetime import datetime
4from typing import TYPE_CHECKING, Any
5
6from plain import models
7from plain.models import types
8from plain.runtime import settings
9from plain.utils import timezone
10
11from .params import extract_tracking_params
12
13if TYPE_CHECKING:
14 from plain.http import Request
15
16try:
17 from plain.auth import get_request_user
18except ImportError:
19 get_request_user: Any = None
20
21try:
22 from plain.sessions import get_request_session
23except ImportError:
24 get_request_session: Any = None
25
26try:
27 from plain.admin.impersonate import get_request_impersonator
28except ImportError:
29 get_request_impersonator: Any = None
30
31__all__ = ["Pageview"]
32
33
34@models.register_model
35class Pageview(models.Model):
36 url: str = types.URLField(max_length=2048)
37 timestamp: datetime = types.DateTimeField(auto_now_add=True)
38
39 title: str = types.CharField(max_length=512, required=False)
40 # Referrers may not always be valid URLs (e.g. `android-app://...`).
41 # Use a plain CharField so we don't validate the scheme or format.
42 referrer: str = types.CharField(max_length=1024, required=False)
43
44 user_id: str = types.CharField(max_length=255, required=False)
45 session_id: str = types.CharField(max_length=255, required=False)
46
47 # Attribution tracking
48 source: str = types.CharField(max_length=200, required=False)
49 medium: str = types.CharField(max_length=200, required=False)
50 campaign: str = types.CharField(max_length=200, required=False)
51
52 query: models.QuerySet[Pageview] = models.QuerySet()
53
54 model_options = models.Options(
55 ordering=["-timestamp"],
56 indexes=[
57 models.Index(fields=["timestamp"]),
58 models.Index(fields=["user_id"]),
59 models.Index(fields=["session_id"]),
60 models.Index(fields=["url"]),
61 models.Index(fields=["source"]),
62 models.Index(fields=["medium"]),
63 ],
64 )
65
66 def __str__(self) -> str:
67 return self.url
68
69 @classmethod
70 def create_from_request(
71 cls,
72 request: Request,
73 *,
74 url: str | None = None,
75 title: str | None = None,
76 referrer: str | None = None,
77 timestamp: datetime | None = None,
78 source: str | None = None,
79 medium: str | None = None,
80 campaign: str | None = None,
81 ) -> Pageview | None:
82 """Create a pageview from a request object.
83
84 Args:
85 request: The HTTP request object
86 url: Page URL (defaults to request.build_absolute_uri())
87 title: Page title (defaults to empty string)
88 referrer: Referring URL (defaults to Referer header)
89 timestamp: Page visit time (defaults to current server time)
90 source: Traffic source (auto-extracted from URL if not provided)
91 medium: Traffic medium (auto-extracted from URL if not provided)
92 campaign: Campaign name (auto-extracted from URL if not provided)
93
94 Returns:
95 Pageview instance or None if user is being impersonated
96 """
97 if get_request_impersonator and get_request_impersonator(request):
98 return None
99
100 if url is None:
101 url = request.build_absolute_uri()
102
103 if title is None:
104 title = ""
105
106 if referrer is None:
107 referrer = request.headers.get("Referer", "")
108
109 if timestamp is None:
110 timestamp = timezone.now()
111
112 # Extract tracking parameters if not provided
113 if source is None or medium is None or campaign is None:
114 extracted_source, extracted_medium, extracted_campaign = (
115 extract_tracking_params(url)
116 )
117 if source is None:
118 source = extracted_source
119 if medium is None:
120 medium = extracted_medium
121 if campaign is None:
122 campaign = extracted_campaign
123
124 user = get_request_user(request) if get_request_user else None
125 user_id = user.id if user else ""
126
127 if get_request_session:
128 session = get_request_session(request)
129 else:
130 session = None
131
132 if session:
133 session_instance = session.model_instance
134 session_id = str(session_instance.id) if session_instance else ""
135
136 if settings.PAGEVIEWS_ASSOCIATE_ANONYMOUS_SESSIONS:
137 if not user_id:
138 if not session_id:
139 # Make sure we have a key to use
140 session.create()
141 session_instance = session.model_instance
142 session_id = (
143 str(session_instance.id) if session_instance else ""
144 )
145
146 # The user hasn't logged in yet but might later. When they do log in,
147 # the session key itself will be cycled (session fixation attacks),
148 # so we'll store the anonymous session id in the data which will be preserved
149 # when the key cycles, then remove it immediately after.
150 session["pageviews_anonymous_session_id"] = session_id
151 elif user_id and "pageviews_anonymous_session_id" in session:
152 # Associate the previously anonymous pageviews with the user
153 cls.query.filter(
154 user_id="",
155 session_id=session["pageviews_anonymous_session_id"],
156 ).update(user_id=user_id)
157
158 # Remove it so we don't keep trying to associate it
159 del session["pageviews_anonymous_session_id"]
160 else:
161 session_id = ""
162
163 return cls.query.create(
164 user_id=user_id,
165 session_id=session_id,
166 url=url,
167 title=title,
168 referrer=referrer,
169 timestamp=timestamp,
170 source=source,
171 medium=medium,
172 campaign=campaign,
173 )