v0.150.0
  1from __future__ import annotations
  2
  3import mimetypes
  4from email import charset as Charset
  5from email import encoders as Encoders
  6from email import generator, message_from_string
  7from email.errors import HeaderParseError
  8from email.header import Header
  9from email.headerregistry import Address
 10from email.headerregistry import (
 11    parser as headerregistry_parser,  # ty: ignore[unresolved-import]
 12)
 13from email.message import Message
 14from email.mime.base import MIMEBase
 15from email.mime.message import MIMEMessage
 16from email.mime.multipart import MIMEMultipart
 17from email.mime.text import MIMEText
 18from email.utils import formataddr, formatdate, getaddresses, make_msgid
 19from io import BytesIO, StringIO
 20from pathlib import Path
 21from typing import TYPE_CHECKING, Any
 22
 23from plain.runtime import settings
 24from plain.templates import Template, TemplateFileMissing
 25from plain.utils.encoding import force_str, punycode
 26from plain.utils.html import strip_tags
 27
 28from .utils import _DNS_NAME
 29
 30if TYPE_CHECKING:
 31    from os import PathLike
 32
 33    from .backends.base import BaseEmailBackend
 34
 35# Don't BASE64-encode UTF-8 messages so that we avoid unwanted attention from
 36# some spam filters.
 37_utf8_charset = Charset.Charset("utf-8")
 38_utf8_charset.body_encoding = None  # ty: ignore[invalid-assignment]  # Python defaults to BASE64
 39_utf8_charset_qp = Charset.Charset("utf-8")
 40_utf8_charset_qp.body_encoding = Charset.QP
 41
 42# Default MIME type to use on attachments (if it is not explicitly given
 43# and cannot be guessed).
 44_DEFAULT_ATTACHMENT_MIME_TYPE = "application/octet-stream"
 45
 46_RFC5322_EMAIL_LINE_LENGTH_LIMIT = 998
 47
 48
 49class BadHeaderError(ValueError):
 50    pass
 51
 52
 53# Header names that contain structured address data (RFC 5322).
 54_ADDRESS_HEADERS = {
 55    "from",
 56    "sender",
 57    "reply-to",
 58    "to",
 59    "cc",
 60    "bcc",
 61    "resent-from",
 62    "resent-sender",
 63    "resent-to",
 64    "resent-cc",
 65    "resent-bcc",
 66}
 67
 68
 69def _forbid_multi_line_headers(
 70    name: str, val: str, encoding: str | None
 71) -> tuple[str, str]:
 72    """Forbid multi-line headers to prevent header injection."""
 73    encoding = encoding or "utf-8"
 74    val = str(val)  # val may be lazy
 75    if "\n" in val or "\r" in val:
 76        raise BadHeaderError(
 77            f"Header values can't contain newlines (got {val!r} for header {name!r})"
 78        )
 79    try:
 80        val.encode("ascii")
 81    except UnicodeEncodeError:
 82        if name.lower() in _ADDRESS_HEADERS:
 83            val = ", ".join(
 84                _sanitize_address(addr, encoding) for addr in getaddresses((val,))
 85            )
 86        else:
 87            val = Header(val, encoding).encode()
 88    else:
 89        if name.lower() == "subject":
 90            val = Header(val).encode()
 91    return name, val
 92
 93
 94def _sanitize_address(addr: str | tuple[str, str], encoding: str) -> str:
 95    """
 96    Format a pair of (name, address) or an email address string.
 97    """
 98    address = None
 99    if not isinstance(addr, tuple):
100        addr = force_str(addr)
101        try:
102            token, rest = headerregistry_parser.get_mailbox(addr)
103        except (HeaderParseError, ValueError, IndexError):
104            raise ValueError(f'Invalid address "{addr}"')
105        else:
106            if rest:
107                # The entire email address must be parsed.
108                raise ValueError(
109                    f'Invalid address; only {token} could be parsed from "{addr}"'
110                )
111            nm = token.display_name or ""
112            localpart = token.local_part
113            domain = token.domain or ""
114    else:
115        nm, address = addr
116        localpart, domain = address.rsplit("@", 1)
117
118    address_parts = nm + localpart + domain
119    if "\n" in address_parts or "\r" in address_parts:
120        raise ValueError("Invalid address; address parts cannot contain newlines.")
121
122    # Avoid UTF-8 encode, if it's possible.
123    try:
124        nm.encode("ascii")
125        nm = Header(nm).encode()
126    except UnicodeEncodeError:
127        nm = Header(nm, encoding).encode()
128    try:
129        localpart.encode("ascii")
130    except UnicodeEncodeError:
131        localpart = Header(localpart, encoding).encode()
132    domain = punycode(domain)
133
134    parsed_address = Address(username=localpart, domain=domain)
135    return formataddr((nm, parsed_address.addr_spec))
136
137
138class MIMEMixin:
139    def as_string(self, unixfrom: bool = False, linesep: str = "\n") -> str:
140        """Return the entire formatted message as a string.
141        Optional `unixfrom' when True, means include the Unix From_ envelope
142        header.
143
144        This overrides the default as_string() implementation to not mangle
145        lines that begin with 'From '. See bug #13433 for details.
146        """
147        fp = StringIO()
148        g = generator.Generator(fp, mangle_from_=False)
149        g.flatten(self, unixfrom=unixfrom, linesep=linesep)
150        return fp.getvalue()
151
152    def as_bytes(self, unixfrom: bool = False, linesep: str = "\n") -> bytes:
153        """Return the entire formatted message as bytes.
154        Optional `unixfrom' when True, means include the Unix From_ envelope
155        header.
156
157        This overrides the default as_bytes() implementation to not mangle
158        lines that begin with 'From '. See bug #13433 for details.
159        """
160        fp = BytesIO()
161        g = generator.BytesGenerator(fp, mangle_from_=False)
162        g.flatten(self, unixfrom=unixfrom, linesep=linesep)
163        return fp.getvalue()
164
165
166class SafeMIMEMessage(MIMEMixin, MIMEMessage):
167    def __setitem__(self, name: str, val: str) -> None:
168        # message/rfc822 attachments must be ASCII
169        name, val = _forbid_multi_line_headers(name, val, "ascii")
170        MIMEMessage.__setitem__(self, name, val)
171
172
173class SafeMIMEText(MIMEMixin, MIMEText):
174    def __init__(
175        self, _text: str, _subtype: str = "plain", _charset: str | None = None
176    ) -> None:
177        self.encoding = _charset
178        MIMEText.__init__(self, _text, _subtype=_subtype, _charset=_charset)
179
180    def __setitem__(self, name: str, val: str) -> None:
181        name, val = _forbid_multi_line_headers(name, val, self.encoding)
182        MIMEText.__setitem__(self, name, val)
183
184    def set_payload(  # ty: ignore[invalid-method-override]
185        self, payload: str, charset: str | Charset.Charset | None = None
186    ) -> None:
187        if charset == "utf-8" and not isinstance(charset, Charset.Charset):
188            has_long_lines = any(
189                len(line.encode()) > _RFC5322_EMAIL_LINE_LENGTH_LIMIT
190                for line in payload.splitlines()
191            )
192            # Quoted-Printable encoding has the side effect of shortening long
193            # lines, if any (#22561).
194            charset = _utf8_charset_qp if has_long_lines else _utf8_charset
195        MIMEText.set_payload(self, payload, charset=charset)
196
197
198class SafeMIMEMultipart(MIMEMixin, MIMEMultipart):
199    def __init__(
200        self,
201        _subtype: str = "mixed",
202        boundary: str | None = None,
203        _subparts: list[Message] | None = None,
204        encoding: str | None = None,
205        **_params: Any,
206    ) -> None:
207        self.encoding = encoding
208        MIMEMultipart.__init__(self, _subtype, boundary, _subparts, **_params)
209
210    def __setitem__(self, name: str, val: str) -> None:
211        name, val = _forbid_multi_line_headers(name, val, self.encoding)
212        MIMEMultipart.__setitem__(self, name, val)
213
214
215class EmailMessage:
216    """A container for email information."""
217
218    content_subtype = "plain"
219    mixed_subtype = "mixed"
220    encoding: str | None = None  # None => use settings default
221
222    def __init__(
223        self,
224        subject: str = "",
225        body: str = "",
226        from_email: str | None = None,
227        to: list[str] | tuple[str, ...] | None = None,
228        bcc: list[str] | tuple[str, ...] | None = None,
229        connection: BaseEmailBackend | None = None,
230        attachments: list[MIMEBase | tuple[str, str, str]] | None = None,
231        headers: dict[str, str] | None = None,
232        cc: list[str] | tuple[str, ...] | None = None,
233        reply_to: list[str] | tuple[str, ...] | None = None,
234    ) -> None:
235        """
236        Initialize a single email message (which can be sent to multiple
237        recipients).
238        """
239        if to:
240            if isinstance(to, str):
241                raise TypeError('"to" argument must be a list or tuple')
242            self.to = list(to)
243        else:
244            self.to = []
245        if cc:
246            if isinstance(cc, str):
247                raise TypeError('"cc" argument must be a list or tuple')
248            self.cc = list(cc)
249        else:
250            self.cc = []
251        if bcc:
252            if isinstance(bcc, str):
253                raise TypeError('"bcc" argument must be a list or tuple')
254            self.bcc = list(bcc)
255        else:
256            self.bcc = []
257        if reply_to:
258            if isinstance(reply_to, str):
259                raise TypeError('"reply_to" argument must be a list or tuple')
260            self.reply_to = list(reply_to)
261        else:
262            self.reply_to = settings.EMAIL_DEFAULT_REPLY_TO or []
263        self.from_email = from_email or settings.EMAIL_DEFAULT_FROM
264        self.subject = subject
265        self.body = body or ""
266        self.attachments = []
267        if attachments:
268            for attachment in attachments:
269                if isinstance(attachment, MIMEBase):
270                    self.attach(attachment)
271                else:
272                    self.attach(*attachment)
273        self.extra_headers = headers or {}
274        self.connection = connection
275
276    def get_connection(self) -> BaseEmailBackend:
277        from . import get_connection
278
279        if not self.connection:
280            self.connection = get_connection()
281        return self.connection
282
283    def message(self) -> SafeMIMEText | SafeMIMEMultipart:
284        encoding = self.encoding or "utf-8"
285        msg = SafeMIMEText(self.body, self.content_subtype, encoding)
286        msg = self._create_message(msg)
287        msg["Subject"] = self.subject
288        msg["From"] = self.extra_headers.get("From", self.from_email)
289        self._set_list_header_if_not_empty(msg, "To", self.to)
290        self._set_list_header_if_not_empty(msg, "Cc", self.cc)
291        self._set_list_header_if_not_empty(msg, "Reply-To", self.reply_to)
292
293        # Email header names are case-insensitive (RFC 2045), so we have to
294        # accommodate that when doing comparisons.
295        header_names = [key.lower() for key in self.extra_headers]
296        if "date" not in header_names:
297            # formatdate() uses stdlib methods to format the date, which use
298            # the stdlib/OS concept of a timezone, however, Plain sets the
299            # TZ environment variable based on the TIME_ZONE setting which
300            # will get picked up by formatdate().
301            msg["Date"] = formatdate(localtime=settings.EMAIL_USE_LOCALTIME)
302        if "message-id" not in header_names:
303            # Use cached _DNS_NAME for performance
304            msg["Message-ID"] = make_msgid(domain=str(_DNS_NAME))
305        for name, value in self.extra_headers.items():
306            if name.lower() != "from":  # From is already handled
307                msg[name] = value
308        return msg
309
310    def recipients(self) -> list[str]:
311        """
312        Return a list of all recipients of the email (includes direct
313        addressees as well as Cc and Bcc entries).
314        """
315        return [email for email in (self.to + self.cc + self.bcc) if email]
316
317    def send(self) -> int:
318        """Send the email message."""
319        if not self.recipients():
320            # Don't bother creating the network connection if there's nobody to
321            # send to.
322            return 0
323        return self.get_connection().send_messages([self])
324
325    def attach(
326        self,
327        filename: MIMEBase | str | None = None,
328        content: str | bytes | None = None,
329        mimetype: str | None = None,
330    ) -> None:
331        """
332        Attach a file with the given filename and content. The filename can
333        be omitted and the mimetype is guessed, if not provided.
334
335        If the first parameter is a MIMEBase subclass, insert it directly
336        into the resulting message attachments.
337
338        For a text/* mimetype (guessed or specified), when a bytes object is
339        specified as content, decode it as UTF-8. If that fails, set the
340        mimetype to _DEFAULT_ATTACHMENT_MIME_TYPE and don't decode the content.
341        """
342        if isinstance(filename, MIMEBase):
343            if content is not None or mimetype is not None:
344                raise ValueError(
345                    "content and mimetype must not be given when a MIMEBase "
346                    "instance is provided."
347                )
348            self.attachments.append(filename)
349        elif content is None:
350            raise ValueError("content must be provided.")
351        else:
352            if filename is not None and mimetype is None:
353                mimetype = mimetypes.guess_type(filename)[0]
354            if mimetype is None:
355                mimetype = _DEFAULT_ATTACHMENT_MIME_TYPE
356            basetype, subtype = mimetype.split("/", 1)
357
358            if basetype == "text":
359                if isinstance(content, bytes):
360                    try:
361                        content = content.decode()
362                    except UnicodeDecodeError:
363                        # If mimetype suggests the file is text but it's
364                        # actually binary, read() raises a UnicodeDecodeError.
365                        mimetype = _DEFAULT_ATTACHMENT_MIME_TYPE
366
367            self.attachments.append((filename, content, mimetype))
368
369    def attach_file(
370        self, path: str | PathLike[str], mimetype: str | None = None
371    ) -> None:
372        """
373        Attach a file from the filesystem.
374
375        Set the mimetype to _DEFAULT_ATTACHMENT_MIME_TYPE if it isn't specified
376        and cannot be guessed.
377
378        For a text/* mimetype (guessed or specified), decode the file's content
379        as UTF-8. If that fails, set the mimetype to
380        _DEFAULT_ATTACHMENT_MIME_TYPE and don't decode the content.
381        """
382        path = Path(path)
383        with path.open("rb") as file:
384            content = file.read()
385            self.attach(path.name, content, mimetype)
386
387    def _create_message(self, msg: SafeMIMEText) -> SafeMIMEText | SafeMIMEMultipart:
388        return self._create_attachments(msg)
389
390    def _create_attachments(
391        self, msg: SafeMIMEText | SafeMIMEMultipart
392    ) -> SafeMIMEText | SafeMIMEMultipart:
393        if self.attachments:
394            encoding = self.encoding or "utf-8"
395            body_msg = msg
396            msg = SafeMIMEMultipart(_subtype=self.mixed_subtype, encoding=encoding)
397            if self.body or body_msg.is_multipart():
398                msg.attach(body_msg)
399            for attachment in self.attachments:
400                if isinstance(attachment, MIMEBase):
401                    msg.attach(attachment)
402                else:
403                    msg.attach(self._create_attachment(*attachment))
404        return msg
405
406    def _create_mime_attachment(
407        self, content: str | bytes | EmailMessage | Message, mimetype: str
408    ) -> SafeMIMEText | SafeMIMEMessage | MIMEBase:
409        """
410        Convert the content, mimetype pair into a MIME attachment object.
411
412        If the mimetype is message/rfc822, content may be an
413        email.Message or EmailMessage object, as well as a str.
414        """
415        basetype, subtype = mimetype.split("/", 1)
416        if basetype == "text":
417            encoding = self.encoding or "utf-8"
418            if not isinstance(content, str):
419                content = force_str(content)
420            attachment = SafeMIMEText(content, subtype, encoding)
421        elif basetype == "message" and subtype == "rfc822":
422            # Bug #18967: Per RFC 2046 Section 5.2.1, message/rfc822
423            # attachments must not be base64 encoded.
424            if isinstance(content, EmailMessage):
425                # convert content into an email.Message first
426                content = content.message()
427            elif not isinstance(content, Message):
428                # For compatibility with existing code, parse the message
429                # into an email.Message object if it is not one already.
430                content = message_from_string(force_str(content))
431
432            attachment = SafeMIMEMessage(content, subtype)
433        else:
434            # Encode non-text attachments with base64.
435            attachment = MIMEBase(basetype, subtype)
436            assert isinstance(content, bytes), "Non-text attachments must be bytes"
437            attachment.set_payload(content)
438            Encoders.encode_base64(attachment)
439        return attachment
440
441    def _create_attachment(
442        self, filename: str | None, content: str | bytes, mimetype: str
443    ) -> SafeMIMEText | SafeMIMEMessage | MIMEBase:
444        """
445        Convert the filename, content, mimetype triple into a MIME attachment
446        object.
447        """
448        attachment = self._create_mime_attachment(content, mimetype)
449        if filename:
450            try:
451                filename.encode("ascii")
452                encoded_filename: str | tuple[str, str, str] = filename
453            except UnicodeEncodeError:
454                encoded_filename = ("utf-8", "", filename)
455            attachment.add_header(
456                "Content-Disposition", "attachment", filename=encoded_filename
457            )
458        return attachment
459
460    def _set_list_header_if_not_empty(
461        self,
462        msg: SafeMIMEText | SafeMIMEMultipart,
463        header: str,
464        values: list[str],
465    ) -> None:
466        """
467        Set msg's header, either from self.extra_headers, if present, or from
468        the values argument.
469        """
470        if values:
471            try:
472                value = self.extra_headers[header]
473            except KeyError:
474                value = ", ".join(str(v) for v in values)
475            msg[header] = value
476
477
478class EmailMultiAlternatives(EmailMessage):
479    """
480    A version of EmailMessage that makes it easy to send multipart/alternative
481    messages. For example, including text and HTML versions of the text is
482    made easier.
483    """
484
485    alternative_subtype = "alternative"
486
487    def __init__(
488        self,
489        subject: str = "",
490        body: str = "",
491        from_email: str | None = None,
492        to: list[str] | tuple[str, ...] | None = None,
493        bcc: list[str] | tuple[str, ...] | None = None,
494        connection: BaseEmailBackend | None = None,
495        attachments: list[MIMEBase | tuple[str, str, str]] | None = None,
496        headers: dict[str, str] | None = None,
497        alternatives: list[tuple[str, str]] | None = None,
498        cc: list[str] | tuple[str, ...] | None = None,
499        reply_to: list[str] | tuple[str, ...] | None = None,
500    ) -> None:
501        """
502        Initialize a single email message (which can be sent to multiple
503        recipients).
504        """
505        super().__init__(
506            subject,
507            body,
508            from_email,
509            to,
510            bcc,
511            connection,
512            attachments,
513            headers,
514            cc,
515            reply_to,
516        )
517        self.alternatives = alternatives or []
518
519    def attach_alternative(self, content: str, mimetype: str) -> None:
520        """Attach an alternative content representation."""
521        if content is None or mimetype is None:
522            raise ValueError("Both content and mimetype must be provided.")
523        self.alternatives.append((content, mimetype))
524
525    def _create_message(self, msg: SafeMIMEText) -> SafeMIMEText | SafeMIMEMultipart:
526        return self._create_attachments(self._create_alternatives(msg))
527
528    def _create_alternatives(
529        self, msg: SafeMIMEText | SafeMIMEMultipart
530    ) -> SafeMIMEText | SafeMIMEMultipart:
531        encoding = self.encoding or "utf-8"
532        if self.alternatives:
533            body_msg = msg
534            msg = SafeMIMEMultipart(
535                _subtype=self.alternative_subtype, encoding=encoding
536            )
537            if self.body:
538                msg.attach(body_msg)
539            for alternative in self.alternatives:
540                msg.attach(self._create_mime_attachment(*alternative))
541        return msg
542
543
544class TemplateEmail(EmailMultiAlternatives):
545    def __init__(
546        self,
547        *,
548        template: str,
549        context: dict[str, Any] | None = None,
550        subject: str = "",
551        from_email: str | None = None,
552        to: list[str] | tuple[str, ...] | None = None,
553        bcc: list[str] | tuple[str, ...] | None = None,
554        connection: BaseEmailBackend | None = None,
555        attachments: list[MIMEBase | tuple[str, str, str]] | None = None,
556        headers: dict[str, str] | None = None,
557        alternatives: list[tuple[str, str]] | None = None,
558        cc: list[str] | tuple[str, ...] | None = None,
559        reply_to: list[str] | tuple[str, ...] | None = None,
560    ) -> None:
561        self.template = template
562        self.context = context or {}
563
564        # Run this once for all uses of the context
565        render_context = self.get_template_context()
566
567        self.body_html, body = self.render_content(render_context)
568
569        if not subject:
570            subject = self.render_subject(render_context)
571
572        super().__init__(
573            subject=subject,
574            body=body,
575            from_email=from_email,
576            to=to,
577            bcc=bcc,
578            connection=connection,
579            attachments=attachments,
580            headers=headers,
581            alternatives=alternatives,
582            cc=cc,
583            reply_to=reply_to,
584        )
585
586        self.attach_alternative(self.body_html, "text/html")
587
588    def get_template_context(self) -> dict[str, Any]:
589        """Subclasses can override this method to add context data."""
590        return self.context
591
592    def render_content(self, context: dict[str, Any]) -> tuple[str, str]:
593        html_content = self.render_html(context)
594
595        try:
596            plain_content = self.render_plain(context)
597        except TemplateFileMissing:
598            plain_content = strip_tags(html_content)
599
600        return html_content, plain_content
601
602    def render_plain(self, context: dict[str, Any]) -> str:
603        return Template(self.get_plain_template_name()).render(context)
604
605    def render_html(self, context: dict[str, Any]) -> str:
606        return Template(self.get_html_template_name()).render(context)
607
608    def render_subject(self, context: dict[str, Any]) -> str:
609        try:
610            subject = Template(self.get_subject_template_name()).render(context)
611            return subject.strip()
612        except TemplateFileMissing:
613            return ""
614
615    def get_plain_template_name(self) -> str:
616        return f"email/{self.template}.txt"
617
618    def get_html_template_name(self) -> str:
619        return f"email/{self.template}.html"
620
621    def get_subject_template_name(self) -> str:
622        return f"email/{self.template}.subject.txt"