Plain is headed towards 1.0! Subscribe for development updates →

  1from __future__ import annotations
  2
  3from datetime import datetime
  4from typing import TYPE_CHECKING, Any
  5
  6from plain import models
  7from plain.models import types
  8from plain.runtime import settings
  9from plain.utils import timezone
 10
 11from .params import extract_tracking_params
 12
 13if TYPE_CHECKING:
 14    from plain.http import Request
 15
 16try:
 17    from plain.auth import get_request_user
 18except ImportError:
 19    get_request_user: Any = None
 20
 21try:
 22    from plain.sessions import get_request_session
 23except ImportError:
 24    get_request_session: Any = None
 25
 26try:
 27    from plain.admin.impersonate import get_request_impersonator
 28except ImportError:
 29    get_request_impersonator: Any = None
 30
 31
 32@models.register_model
 33class Pageview(models.Model):
 34    # A full URL can be thousands of characters, but MySQL has a 3072-byte limit
 35    # on indexed columns (when using the default ``utf8mb4`` character set that
 36    # stores up to 4 bytes per character). The ``url`` field is indexed below,
 37    # so we keep the length at 768 characters (768 × 4 = 3072 bytes) to ensure
 38    # the index can be created on all supported database backends.
 39    url: str = types.URLField(max_length=768)
 40    timestamp: datetime = types.DateTimeField(auto_now_add=True)
 41
 42    title: str = types.CharField(max_length=512, required=False)
 43    # Referrers may not always be valid URLs (e.g. `android-app://...`).
 44    # Use a plain CharField so we don't validate the scheme or format.
 45    referrer: str = types.CharField(max_length=1024, required=False)
 46
 47    user_id: str = types.CharField(max_length=255, required=False)
 48    session_id: str = types.CharField(max_length=255, required=False)
 49
 50    # Attribution tracking
 51    source: str = types.CharField(max_length=200, required=False)
 52    medium: str = types.CharField(max_length=200, required=False)
 53    campaign: str = types.CharField(max_length=200, required=False)
 54
 55    query: models.QuerySet[Pageview] = models.QuerySet()
 56
 57    model_options = models.Options(
 58        ordering=["-timestamp"],
 59        indexes=[
 60            models.Index(fields=["timestamp"]),
 61            models.Index(fields=["user_id"]),
 62            models.Index(fields=["session_id"]),
 63            models.Index(fields=["url"]),
 64            models.Index(fields=["source"]),
 65            models.Index(fields=["medium"]),
 66        ],
 67    )
 68
 69    def __str__(self) -> str:
 70        return self.url
 71
 72    @classmethod
 73    def create_from_request(
 74        cls,
 75        request: Request,
 76        *,
 77        url: str | None = None,
 78        title: str | None = None,
 79        referrer: str | None = None,
 80        timestamp: datetime | None = None,
 81        source: str | None = None,
 82        medium: str | None = None,
 83        campaign: str | None = None,
 84    ) -> Pageview | None:
 85        """Create a pageview from a request object.
 86
 87        Args:
 88            request: The HTTP request object
 89            url: Page URL (defaults to request.build_absolute_uri())
 90            title: Page title (defaults to empty string)
 91            referrer: Referring URL (defaults to Referer header)
 92            timestamp: Page visit time (defaults to current server time)
 93            source: Traffic source (auto-extracted from URL if not provided)
 94            medium: Traffic medium (auto-extracted from URL if not provided)
 95            campaign: Campaign name (auto-extracted from URL if not provided)
 96
 97        Returns:
 98            Pageview instance or None if user is being impersonated
 99        """
100        if get_request_impersonator and get_request_impersonator(request):
101            return None
102
103        if url is None:
104            url = request.build_absolute_uri()
105
106        if title is None:
107            title = ""
108
109        if referrer is None:
110            referrer = request.headers.get("Referer", "")
111
112        if timestamp is None:
113            timestamp = timezone.now()
114
115        # Extract tracking parameters if not provided
116        if source is None or medium is None or campaign is None:
117            extracted_source, extracted_medium, extracted_campaign = (
118                extract_tracking_params(url)
119            )
120            if source is None:
121                source = extracted_source
122            if medium is None:
123                medium = extracted_medium
124            if campaign is None:
125                campaign = extracted_campaign
126
127        user = get_request_user(request) if get_request_user else None
128        user_id = user.id if user else ""
129
130        if get_request_session:
131            session = get_request_session(request)
132        else:
133            session = None
134
135        if session:
136            session_instance = session.model_instance
137            session_id = str(session_instance.id) if session_instance else ""
138
139            if settings.PAGEVIEWS_ASSOCIATE_ANONYMOUS_SESSIONS:
140                if not user_id:
141                    if not session_id:
142                        # Make sure we have a key to use
143                        session.create()
144                        session_instance = session.model_instance
145                        session_id = (
146                            str(session_instance.id) if session_instance else ""
147                        )
148
149                    # The user hasn't logged in yet but might later. When they do log in,
150                    # the session key itself will be cycled (session fixation attacks),
151                    # so we'll store the anonymous session id in the data which will be preserved
152                    # when the key cycles, then remove it immediately after.
153                    session["pageviews_anonymous_session_id"] = session_id
154                elif user_id and "pageviews_anonymous_session_id" in session:
155                    # Associate the previously anonymous pageviews with the user
156                    cls.query.filter(
157                        user_id="",
158                        session_id=session["pageviews_anonymous_session_id"],
159                    ).update(user_id=user_id)
160
161                    # Remove it so we don't keep trying to associate it
162                    del session["pageviews_anonymous_session_id"]
163        else:
164            session_id = ""
165
166        return cls.query.create(
167            user_id=user_id,
168            session_id=session_id,
169            url=url,
170            title=title,
171            referrer=referrer,
172            timestamp=timestamp,
173            source=source,
174            medium=medium,
175            campaign=campaign,
176        )