Plain is headed towards 1.0! Subscribe for development updates →

  1from datetime import datetime, timedelta
  2from functools import cached_property
  3
  4from opentelemetry import trace
  5from opentelemetry.semconv.attributes.db_attributes import (
  6    DB_NAMESPACE,
  7    DB_OPERATION_NAME,
  8    DB_SYSTEM_NAME,
  9)
 10from opentelemetry.trace import SpanKind
 11
 12from plain.models import IntegrityError
 13from plain.utils import timezone
 14
 15tracer = trace.get_tracer("plain.cache")
 16
 17
 18class Cached:
 19    """Store and retrieve cached items."""
 20
 21    def __init__(self, key: str) -> None:
 22        self.key = key
 23
 24        # So we can import Cached in __init__.py
 25        # without getting the packages not ready error...
 26        from .models import CachedItem
 27
 28        self._model_class = CachedItem
 29
 30    @cached_property
 31    def _model_instance(self):
 32        try:
 33            return self._model_class.query.get(key=self.key)
 34        except self._model_class.DoesNotExist:
 35            return None
 36
 37    def reload(self) -> None:
 38        if hasattr(self, "_model_instance"):
 39            del self._model_instance
 40
 41    def _is_expired(self):
 42        if not self._model_instance:
 43            return True
 44
 45        if not self._model_instance.expires_at:
 46            return False
 47
 48        return self._model_instance.expires_at < timezone.now()
 49
 50    def exists(self) -> bool:
 51        with tracer.start_as_current_span(
 52            "cache.exists",
 53            kind=SpanKind.CLIENT,
 54            attributes={
 55                DB_SYSTEM_NAME: "plain.cache",
 56                DB_OPERATION_NAME: "get",
 57                DB_NAMESPACE: "cache",
 58                "cache.key": self.key,
 59            },
 60        ) as span:
 61            span.set_status(trace.StatusCode.OK)
 62
 63            if self._model_instance is None:
 64                return False
 65
 66            return not self._is_expired()
 67
 68    @property
 69    def value(self):
 70        with tracer.start_as_current_span(
 71            "cache.get",
 72            kind=SpanKind.CLIENT,
 73            attributes={
 74                DB_SYSTEM_NAME: "plain.cache",
 75                DB_OPERATION_NAME: "get",
 76                DB_NAMESPACE: "cache",
 77                "cache.key": self.key,
 78            },
 79        ) as span:
 80            if self._model_instance and self._model_instance.expires_at:
 81                span.set_attribute(
 82                    "cache.item.expires_at", self._model_instance.expires_at.isoformat()
 83                )
 84
 85            exists = self.exists()
 86
 87            span.set_attribute("cache.hit", exists)
 88            span.set_status(trace.StatusCode.OK if exists else trace.StatusCode.UNSET)
 89
 90            if not exists:
 91                return None
 92
 93            return self._model_instance.value
 94
 95    def set(self, value, expiration: datetime | timedelta | int | float | None = None):
 96        defaults = {
 97            "value": value,
 98        }
 99
100        if isinstance(expiration, int | float):
101            defaults["expires_at"] = timezone.now() + timedelta(seconds=expiration)
102        elif isinstance(expiration, timedelta):
103            defaults["expires_at"] = timezone.now() + expiration
104        elif isinstance(expiration, datetime):
105            defaults["expires_at"] = expiration
106        else:
107            # Keep existing expires_at value or None
108            pass
109
110        # Make sure expires_at is timezone aware
111        if (
112            "expires_at" in defaults
113            and defaults["expires_at"]
114            and not timezone.is_aware(defaults["expires_at"])
115        ):
116            defaults["expires_at"] = timezone.make_aware(defaults["expires_at"])
117
118        with tracer.start_as_current_span(
119            "cache.set",
120            kind=SpanKind.CLIENT,
121            attributes={
122                DB_SYSTEM_NAME: "plain.cache",
123                DB_OPERATION_NAME: "set",
124                DB_NAMESPACE: "cache",
125                "cache.key": self.key,
126            },
127        ) as span:
128            if expires_at := defaults.get("expires_at"):
129                span.set_attribute("cache.item.expires_at", expires_at.isoformat())
130
131            try:
132                item, _ = self._model_class.query.update_or_create(
133                    key=self.key, defaults=defaults
134                )
135            except IntegrityError:
136                # Most likely a race condition in creating the item,
137                # so trying again should do an update
138                item, _ = self._model_class.query.update_or_create(
139                    key=self.key, defaults=defaults
140                )
141
142            self.reload()
143            span.set_status(trace.StatusCode.OK)
144            return item.value
145
146    def delete(self) -> bool:
147        with tracer.start_as_current_span(
148            "cache.delete",
149            kind=SpanKind.CLIENT,
150            attributes={
151                DB_SYSTEM_NAME: "plain.cache",
152                DB_OPERATION_NAME: "delete",
153                DB_NAMESPACE: "cache",
154                "cache.key": self.key,
155            },
156        ) as span:
157            span.set_status(trace.StatusCode.OK)
158            if not self._model_instance:
159                # A no-op, but a return value you can use to know whether it did anything
160                return False
161
162            self._model_instance.delete()
163            self.reload()
164            return True