Plain is headed towards 1.0! Subscribe for development updates →

  1from __future__ import annotations
  2
  3from datetime import datetime
  4from typing import TYPE_CHECKING, Any
  5
  6from plain import models
  7from plain.runtime import settings
  8from plain.utils import timezone
  9
 10from .params import extract_tracking_params
 11
 12if TYPE_CHECKING:
 13    from plain.http import Request
 14
 15try:
 16    from plain.auth import get_request_user
 17except ImportError:
 18    get_request_user: Any = None
 19
 20try:
 21    from plain.sessions import get_request_session
 22except ImportError:
 23    get_request_session: Any = None
 24
 25
 26@models.register_model
 27class Pageview(models.Model):
 28    # A full URL can be thousands of characters, but MySQL has a 3072-byte limit
 29    # on indexed columns (when using the default ``utf8mb4`` character set that
 30    # stores up to 4 bytes per character). The ``url`` field is indexed below,
 31    # so we keep the length at 768 characters (768 × 4 = 3072 bytes) to ensure
 32    # the index can be created on all supported database backends.
 33    url = models.URLField(max_length=768)
 34    timestamp = models.DateTimeField(auto_now_add=True)
 35
 36    title = models.CharField(max_length=512, required=False)
 37    # Referrers may not always be valid URLs (e.g. `android-app://...`).
 38    # Use a plain CharField so we don't validate the scheme or format.
 39    referrer = models.CharField(max_length=1024, required=False)
 40
 41    user_id = models.CharField(max_length=255, required=False)
 42    session_id = models.CharField(max_length=255, required=False)
 43
 44    # Attribution tracking
 45    source = models.CharField(max_length=200, required=False)
 46    medium = models.CharField(max_length=200, required=False)
 47    campaign = models.CharField(max_length=200, required=False)
 48
 49    model_options = models.Options(
 50        ordering=["-timestamp"],
 51        indexes=[
 52            models.Index(fields=["timestamp"]),
 53            models.Index(fields=["user_id"]),
 54            models.Index(fields=["session_id"]),
 55            models.Index(fields=["url"]),
 56            models.Index(fields=["source"]),
 57            models.Index(fields=["medium"]),
 58        ],
 59    )
 60
 61    def __str__(self) -> str:
 62        return self.url
 63
 64    @classmethod
 65    def create_from_request(
 66        cls,
 67        request: Request,
 68        *,
 69        url: str | None = None,
 70        title: str | None = None,
 71        referrer: str | None = None,
 72        timestamp: datetime | None = None,
 73        source: str | None = None,
 74        medium: str | None = None,
 75        campaign: str | None = None,
 76    ) -> Pageview | None:
 77        """Create a pageview from a request object.
 78
 79        Args:
 80            request: The HTTP request object
 81            url: Page URL (defaults to request.build_absolute_uri())
 82            title: Page title (defaults to empty string)
 83            referrer: Referring URL (defaults to Referer header)
 84            timestamp: Page visit time (defaults to current server time)
 85            source: Traffic source (auto-extracted from URL if not provided)
 86            medium: Traffic medium (auto-extracted from URL if not provided)
 87            campaign: Campaign name (auto-extracted from URL if not provided)
 88
 89        Returns:
 90            Pageview instance or None if user is being impersonated
 91        """
 92        if getattr(request, "impersonator", None):
 93            return None
 94
 95        if url is None:
 96            url = request.build_absolute_uri()
 97
 98        if title is None:
 99            title = ""
100
101        if referrer is None:
102            referrer = request.headers.get("Referer", "")
103
104        if timestamp is None:
105            timestamp = timezone.now()
106
107        # Extract tracking parameters if not provided
108        if source is None or medium is None or campaign is None:
109            extracted_source, extracted_medium, extracted_campaign = (
110                extract_tracking_params(url)
111            )
112            if source is None:
113                source = extracted_source
114            if medium is None:
115                medium = extracted_medium
116            if campaign is None:
117                campaign = extracted_campaign
118
119        user = get_request_user(request) if get_request_user else None
120        user_id = user.id if user else ""
121
122        if get_request_session:
123            session = get_request_session(request)
124        else:
125            session = None
126
127        if session:
128            session_instance = session.model_instance
129            session_id = str(session_instance.id) if session_instance else ""
130
131            if settings.PAGEVIEWS_ASSOCIATE_ANONYMOUS_SESSIONS:
132                if not user_id:
133                    if not session_id:
134                        # Make sure we have a key to use
135                        session.create()
136                        session_instance = session.model_instance
137                        session_id = (
138                            str(session_instance.id) if session_instance else ""
139                        )
140
141                    # The user hasn't logged in yet but might later. When they do log in,
142                    # the session key itself will be cycled (session fixation attacks),
143                    # so we'll store the anonymous session id in the data which will be preserved
144                    # when the key cycles, then remove it immediately after.
145                    session["pageviews_anonymous_session_id"] = session_id
146                elif user_id and "pageviews_anonymous_session_id" in session:
147                    # Associate the previously anonymous pageviews with the user
148                    cls.query.filter(
149                        user_id="",
150                        session_id=session["pageviews_anonymous_session_id"],
151                    ).update(user_id=user_id)
152
153                    # Remove it so we don't keep trying to associate it
154                    del session["pageviews_anonymous_session_id"]
155        else:
156            session_id = ""
157
158        return cls.query.create(
159            user_id=user_id,
160            session_id=session_id,
161            url=url,
162            title=title,
163            referrer=referrer,
164            timestamp=timestamp,
165            source=source,
166            medium=medium,
167            campaign=campaign,
168        )