1from __future__ import annotations
  2
  3from datetime import datetime
  4from typing import TYPE_CHECKING, Any
  5
  6from plain import models
  7from plain.models import types
  8from plain.runtime import settings
  9from plain.utils import timezone
 10
 11from .params import extract_tracking_params
 12
 13if TYPE_CHECKING:
 14    from plain.http import Request
 15
 16try:
 17    from plain.auth import get_request_user
 18except ImportError:
 19    get_request_user: Any = None
 20
 21try:
 22    from plain.sessions import get_request_session
 23except ImportError:
 24    get_request_session: Any = None
 25
 26try:
 27    from plain.admin.impersonate import get_request_impersonator
 28except ImportError:
 29    get_request_impersonator: Any = None
 30
 31__all__ = ["Pageview"]
 32
 33
 34@models.register_model
 35class Pageview(models.Model):
 36    url: str = types.URLField(max_length=2048)
 37    timestamp: datetime = types.DateTimeField(auto_now_add=True)
 38
 39    title: str = types.CharField(max_length=512, required=False)
 40    # Referrers may not always be valid URLs (e.g. `android-app://...`).
 41    # Use a plain CharField so we don't validate the scheme or format.
 42    referrer: str = types.CharField(max_length=1024, required=False)
 43
 44    user_id: str = types.CharField(max_length=255, required=False)
 45    session_id: str = types.CharField(max_length=255, required=False)
 46
 47    # Attribution tracking
 48    source: str = types.CharField(max_length=200, required=False)
 49    medium: str = types.CharField(max_length=200, required=False)
 50    campaign: str = types.CharField(max_length=200, required=False)
 51
 52    query: models.QuerySet[Pageview] = models.QuerySet()
 53
 54    model_options = models.Options(
 55        ordering=["-timestamp"],
 56        indexes=[
 57            models.Index(fields=["timestamp"]),
 58            models.Index(fields=["user_id"]),
 59            models.Index(fields=["session_id"]),
 60            models.Index(fields=["url"]),
 61            models.Index(fields=["source"]),
 62            models.Index(fields=["medium"]),
 63        ],
 64    )
 65
 66    def __str__(self) -> str:
 67        return self.url
 68
 69    @classmethod
 70    def create_from_request(
 71        cls,
 72        request: Request,
 73        *,
 74        url: str | None = None,
 75        title: str | None = None,
 76        referrer: str | None = None,
 77        timestamp: datetime | None = None,
 78        source: str | None = None,
 79        medium: str | None = None,
 80        campaign: str | None = None,
 81    ) -> Pageview | None:
 82        """Create a pageview from a request object.
 83
 84        Args:
 85            request: The HTTP request object
 86            url: Page URL (defaults to request.build_absolute_uri())
 87            title: Page title (defaults to empty string)
 88            referrer: Referring URL (defaults to Referer header)
 89            timestamp: Page visit time (defaults to current server time)
 90            source: Traffic source (auto-extracted from URL if not provided)
 91            medium: Traffic medium (auto-extracted from URL if not provided)
 92            campaign: Campaign name (auto-extracted from URL if not provided)
 93
 94        Returns:
 95            Pageview instance or None if user is being impersonated
 96        """
 97        if get_request_impersonator and get_request_impersonator(request):
 98            return None
 99
100        if url is None:
101            url = request.build_absolute_uri()
102
103        if title is None:
104            title = ""
105
106        if referrer is None:
107            referrer = request.headers.get("Referer", "")
108
109        if timestamp is None:
110            timestamp = timezone.now()
111
112        # Extract tracking parameters if not provided
113        if source is None or medium is None or campaign is None:
114            extracted_source, extracted_medium, extracted_campaign = (
115                extract_tracking_params(url)
116            )
117            if source is None:
118                source = extracted_source
119            if medium is None:
120                medium = extracted_medium
121            if campaign is None:
122                campaign = extracted_campaign
123
124        user = get_request_user(request) if get_request_user else None
125        user_id = user.id if user else ""
126
127        if get_request_session:
128            session = get_request_session(request)
129        else:
130            session = None
131
132        if session:
133            session_instance = session.model_instance
134            session_id = str(session_instance.id) if session_instance else ""
135
136            if settings.PAGEVIEWS_ASSOCIATE_ANONYMOUS_SESSIONS:
137                if not user_id:
138                    if not session_id:
139                        # Make sure we have a key to use
140                        session.create()
141                        session_instance = session.model_instance
142                        session_id = (
143                            str(session_instance.id) if session_instance else ""
144                        )
145
146                    # The user hasn't logged in yet but might later. When they do log in,
147                    # the session key itself will be cycled (session fixation attacks),
148                    # so we'll store the anonymous session id in the data which will be preserved
149                    # when the key cycles, then remove it immediately after.
150                    session["pageviews_anonymous_session_id"] = session_id
151                elif user_id and "pageviews_anonymous_session_id" in session:
152                    # Associate the previously anonymous pageviews with the user
153                    cls.query.filter(
154                        user_id="",
155                        session_id=session["pageviews_anonymous_session_id"],
156                    ).update(user_id=user_id)
157
158                    # Remove it so we don't keep trying to associate it
159                    del session["pageviews_anonymous_session_id"]
160        else:
161            session_id = ""
162
163        return cls.query.create(
164            user_id=user_id,
165            session_id=session_id,
166            url=url,
167            title=title,
168            referrer=referrer,
169            timestamp=timestamp,
170            source=source,
171            medium=medium,
172            campaign=campaign,
173        )