1from __future__ import annotations
  2
  3from datetime import datetime
  4from typing import TYPE_CHECKING, Any
  5
  6from plain import postgres
  7from plain.postgres import types
  8from plain.runtime import settings
  9from plain.utils import timezone
 10
 11from .params import extract_tracking_params
 12
 13if TYPE_CHECKING:
 14    from plain.http import Request
 15
 16try:
 17    from plain.auth import get_request_user
 18except ImportError:
 19    get_request_user: Any = None
 20
 21try:
 22    from plain.sessions import get_request_session
 23except ImportError:
 24    get_request_session: Any = None
 25
 26try:
 27    from plain.admin.impersonate import get_request_impersonator
 28except ImportError:
 29    get_request_impersonator: Any = None
 30
 31__all__ = ["Pageview"]
 32
 33
 34@postgres.register_model
 35class Pageview(postgres.Model):
 36    url: str = types.URLField(max_length=2048)
 37    timestamp: datetime = types.DateTimeField(create_now=True)
 38
 39    title: str = types.TextField(max_length=512, required=False)
 40    # Referrers may not always be valid URLs (e.g. `android-app://...`).
 41    # Use a plain TextField so we don't validate the scheme or format.
 42    referrer: str = types.TextField(max_length=1024, required=False)
 43
 44    user_id: str = types.TextField(max_length=255, required=False)
 45    session_id: str = types.TextField(max_length=255, required=False)
 46
 47    # Attribution tracking
 48    source: str = types.TextField(max_length=200, required=False)
 49    medium: str = types.TextField(max_length=200, required=False)
 50    campaign: str = types.TextField(max_length=200, required=False)
 51
 52    query: postgres.QuerySet[Pageview] = postgres.QuerySet()
 53
 54    model_options = postgres.Options(
 55        ordering=["-timestamp"],
 56        indexes=[
 57            postgres.Index(
 58                name="plainpageviews_pageview_timestamp_idx", fields=["timestamp"]
 59            ),
 60            postgres.Index(
 61                name="plainpageviews_pageview_user_id_idx", fields=["user_id"]
 62            ),
 63            postgres.Index(
 64                name="plainpageviews_pageview_session_id_idx", fields=["session_id"]
 65            ),
 66            postgres.Index(name="plainpageviews_pageview_url_idx", fields=["url"]),
 67            postgres.Index(
 68                name="plainpageviews_pageview_source_idx", fields=["source"]
 69            ),
 70            postgres.Index(
 71                name="plainpageviews_pageview_medium_idx", fields=["medium"]
 72            ),
 73        ],
 74    )
 75
 76    def __str__(self) -> str:
 77        return self.url
 78
 79    @classmethod
 80    def create_from_request(
 81        cls,
 82        request: Request,
 83        *,
 84        url: str | None = None,
 85        title: str | None = None,
 86        referrer: str | None = None,
 87        timestamp: datetime | None = None,
 88        source: str | None = None,
 89        medium: str | None = None,
 90        campaign: str | None = None,
 91    ) -> Pageview | None:
 92        """Create a pageview from a request object.
 93
 94        Args:
 95            request: The HTTP request object
 96            url: Page URL (defaults to request.build_absolute_uri())
 97            title: Page title (defaults to empty string)
 98            referrer: Referring URL (defaults to Referer header)
 99            timestamp: Page visit time (defaults to current server time)
100            source: Traffic source (auto-extracted from URL if not provided)
101            medium: Traffic medium (auto-extracted from URL if not provided)
102            campaign: Campaign name (auto-extracted from URL if not provided)
103
104        Returns:
105            Pageview instance or None if user is being impersonated
106        """
107        if get_request_impersonator and get_request_impersonator(request):
108            return None
109
110        if url is None:
111            url = request.build_absolute_uri()
112
113        if title is None:
114            title = ""
115
116        if referrer is None:
117            referrer = request.headers.get("Referer", "")
118
119        if timestamp is None:
120            timestamp = timezone.now()
121
122        # Extract tracking parameters if not provided
123        if source is None or medium is None or campaign is None:
124            extracted_source, extracted_medium, extracted_campaign = (
125                extract_tracking_params(url)
126            )
127            if source is None:
128                source = extracted_source
129            if medium is None:
130                medium = extracted_medium
131            if campaign is None:
132                campaign = extracted_campaign
133
134        user = get_request_user(request) if get_request_user else None
135        user_id = user.id if user else ""
136
137        if get_request_session:
138            session = get_request_session(request)
139        else:
140            session = None
141
142        if session:
143            session_instance = session.model_instance
144            session_id = str(session_instance.id) if session_instance else ""
145
146            if settings.PAGEVIEWS_ASSOCIATE_ANONYMOUS_SESSIONS:
147                if not user_id:
148                    if not session_id:
149                        # Make sure we have a key to use
150                        session.create()
151                        session_instance = session.model_instance
152                        session_id = (
153                            str(session_instance.id) if session_instance else ""
154                        )
155
156                    # The user hasn't logged in yet but might later. When they do log in,
157                    # the session key itself will be cycled (session fixation attacks),
158                    # so we'll store the anonymous session id in the data which will be preserved
159                    # when the key cycles, then remove it immediately after.
160                    session["pageviews_anonymous_session_id"] = session_id
161                elif user_id and "pageviews_anonymous_session_id" in session:
162                    # Associate the previously anonymous pageviews with the user
163                    cls.query.filter(
164                        user_id="",
165                        session_id=session["pageviews_anonymous_session_id"],
166                    ).update(user_id=user_id)
167
168                    # Remove it so we don't keep trying to associate it
169                    del session["pageviews_anonymous_session_id"]
170        else:
171            session_id = ""
172
173        return cls.query.create(
174            user_id=user_id,
175            session_id=session_id,
176            url=url,
177            title=title,
178            referrer=referrer,
179            timestamp=timestamp,
180            source=source,
181            medium=medium,
182            campaign=campaign,
183        )