1from __future__ import annotations
2
3import mimetypes
4from email import charset as Charset
5from email import encoders as Encoders
6from email import generator, message_from_string
7from email.errors import HeaderParseError
8from email.header import Header
9from email.headerregistry import Address
10from email.headerregistry import (
11 parser as headerregistry_parser, # ty: ignore[unresolved-import]
12)
13from email.message import Message
14from email.mime.base import MIMEBase
15from email.mime.message import MIMEMessage
16from email.mime.multipart import MIMEMultipart
17from email.mime.text import MIMEText
18from email.utils import formataddr, formatdate, getaddresses, make_msgid
19from io import BytesIO, StringIO
20from pathlib import Path
21from typing import TYPE_CHECKING, Any
22
23from plain.runtime import settings
24from plain.templates import Template, TemplateFileMissing
25from plain.utils.encoding import force_str, punycode
26from plain.utils.html import strip_tags
27
28from .utils import _DNS_NAME
29
30if TYPE_CHECKING:
31 from os import PathLike
32
33 from .backends.base import BaseEmailBackend
34
35# Don't BASE64-encode UTF-8 messages so that we avoid unwanted attention from
36# some spam filters.
37_utf8_charset = Charset.Charset("utf-8")
38_utf8_charset.body_encoding = None # ty: ignore[invalid-assignment] # Python defaults to BASE64
39_utf8_charset_qp = Charset.Charset("utf-8")
40_utf8_charset_qp.body_encoding = Charset.QP
41
42# Default MIME type to use on attachments (if it is not explicitly given
43# and cannot be guessed).
44_DEFAULT_ATTACHMENT_MIME_TYPE = "application/octet-stream"
45
46_RFC5322_EMAIL_LINE_LENGTH_LIMIT = 998
47
48
49class BadHeaderError(ValueError):
50 pass
51
52
53# Header names that contain structured address data (RFC 5322).
54_ADDRESS_HEADERS = {
55 "from",
56 "sender",
57 "reply-to",
58 "to",
59 "cc",
60 "bcc",
61 "resent-from",
62 "resent-sender",
63 "resent-to",
64 "resent-cc",
65 "resent-bcc",
66}
67
68
69def _forbid_multi_line_headers(
70 name: str, val: str, encoding: str | None
71) -> tuple[str, str]:
72 """Forbid multi-line headers to prevent header injection."""
73 encoding = encoding or "utf-8"
74 val = str(val) # val may be lazy
75 if "\n" in val or "\r" in val:
76 raise BadHeaderError(
77 f"Header values can't contain newlines (got {val!r} for header {name!r})"
78 )
79 try:
80 val.encode("ascii")
81 except UnicodeEncodeError:
82 if name.lower() in _ADDRESS_HEADERS:
83 val = ", ".join(
84 _sanitize_address(addr, encoding) for addr in getaddresses((val,))
85 )
86 else:
87 val = Header(val, encoding).encode()
88 else:
89 if name.lower() == "subject":
90 val = Header(val).encode()
91 return name, val
92
93
94def _sanitize_address(addr: str | tuple[str, str], encoding: str) -> str:
95 """
96 Format a pair of (name, address) or an email address string.
97 """
98 address = None
99 if not isinstance(addr, tuple):
100 addr = force_str(addr)
101 try:
102 token, rest = headerregistry_parser.get_mailbox(addr)
103 except (HeaderParseError, ValueError, IndexError):
104 raise ValueError(f'Invalid address "{addr}"')
105 else:
106 if rest:
107 # The entire email address must be parsed.
108 raise ValueError(
109 f'Invalid address; only {token} could be parsed from "{addr}"'
110 )
111 nm = token.display_name or ""
112 localpart = token.local_part
113 domain = token.domain or ""
114 else:
115 nm, address = addr
116 localpart, domain = address.rsplit("@", 1)
117
118 address_parts = nm + localpart + domain
119 if "\n" in address_parts or "\r" in address_parts:
120 raise ValueError("Invalid address; address parts cannot contain newlines.")
121
122 # Avoid UTF-8 encode, if it's possible.
123 try:
124 nm.encode("ascii")
125 nm = Header(nm).encode()
126 except UnicodeEncodeError:
127 nm = Header(nm, encoding).encode()
128 try:
129 localpart.encode("ascii")
130 except UnicodeEncodeError:
131 localpart = Header(localpart, encoding).encode()
132 domain = punycode(domain)
133
134 parsed_address = Address(username=localpart, domain=domain)
135 return formataddr((nm, parsed_address.addr_spec))
136
137
138class MIMEMixin:
139 def as_string(self, unixfrom: bool = False, linesep: str = "\n") -> str:
140 """Return the entire formatted message as a string.
141 Optional `unixfrom' when True, means include the Unix From_ envelope
142 header.
143
144 This overrides the default as_string() implementation to not mangle
145 lines that begin with 'From '. See bug #13433 for details.
146 """
147 fp = StringIO()
148 g = generator.Generator(fp, mangle_from_=False)
149 g.flatten(self, unixfrom=unixfrom, linesep=linesep)
150 return fp.getvalue()
151
152 def as_bytes(self, unixfrom: bool = False, linesep: str = "\n") -> bytes:
153 """Return the entire formatted message as bytes.
154 Optional `unixfrom' when True, means include the Unix From_ envelope
155 header.
156
157 This overrides the default as_bytes() implementation to not mangle
158 lines that begin with 'From '. See bug #13433 for details.
159 """
160 fp = BytesIO()
161 g = generator.BytesGenerator(fp, mangle_from_=False)
162 g.flatten(self, unixfrom=unixfrom, linesep=linesep)
163 return fp.getvalue()
164
165
166class SafeMIMEMessage(MIMEMixin, MIMEMessage):
167 def __setitem__(self, name: str, val: str) -> None:
168 # message/rfc822 attachments must be ASCII
169 name, val = _forbid_multi_line_headers(name, val, "ascii")
170 MIMEMessage.__setitem__(self, name, val)
171
172
173class SafeMIMEText(MIMEMixin, MIMEText):
174 def __init__(
175 self, _text: str, _subtype: str = "plain", _charset: str | None = None
176 ) -> None:
177 self.encoding = _charset
178 MIMEText.__init__(self, _text, _subtype=_subtype, _charset=_charset)
179
180 def __setitem__(self, name: str, val: str) -> None:
181 name, val = _forbid_multi_line_headers(name, val, self.encoding)
182 MIMEText.__setitem__(self, name, val)
183
184 def set_payload( # ty: ignore[invalid-method-override]
185 self, payload: str, charset: str | Charset.Charset | None = None
186 ) -> None:
187 if charset == "utf-8" and not isinstance(charset, Charset.Charset):
188 has_long_lines = any(
189 len(line.encode()) > _RFC5322_EMAIL_LINE_LENGTH_LIMIT
190 for line in payload.splitlines()
191 )
192 # Quoted-Printable encoding has the side effect of shortening long
193 # lines, if any (#22561).
194 charset = _utf8_charset_qp if has_long_lines else _utf8_charset
195 MIMEText.set_payload(self, payload, charset=charset)
196
197
198class SafeMIMEMultipart(MIMEMixin, MIMEMultipart):
199 def __init__(
200 self,
201 _subtype: str = "mixed",
202 boundary: str | None = None,
203 _subparts: list[Message] | None = None,
204 encoding: str | None = None,
205 **_params: Any,
206 ) -> None:
207 self.encoding = encoding
208 MIMEMultipart.__init__(self, _subtype, boundary, _subparts, **_params)
209
210 def __setitem__(self, name: str, val: str) -> None:
211 name, val = _forbid_multi_line_headers(name, val, self.encoding)
212 MIMEMultipart.__setitem__(self, name, val)
213
214
215class EmailMessage:
216 """A container for email information."""
217
218 content_subtype = "plain"
219 mixed_subtype = "mixed"
220 encoding: str | None = None # None => use settings default
221
222 def __init__(
223 self,
224 subject: str = "",
225 body: str = "",
226 from_email: str | None = None,
227 to: list[str] | tuple[str, ...] | None = None,
228 bcc: list[str] | tuple[str, ...] | None = None,
229 connection: BaseEmailBackend | None = None,
230 attachments: list[MIMEBase | tuple[str, str, str]] | None = None,
231 headers: dict[str, str] | None = None,
232 cc: list[str] | tuple[str, ...] | None = None,
233 reply_to: list[str] | tuple[str, ...] | None = None,
234 ) -> None:
235 """
236 Initialize a single email message (which can be sent to multiple
237 recipients).
238 """
239 if to:
240 if isinstance(to, str):
241 raise TypeError('"to" argument must be a list or tuple')
242 self.to = list(to)
243 else:
244 self.to = []
245 if cc:
246 if isinstance(cc, str):
247 raise TypeError('"cc" argument must be a list or tuple')
248 self.cc = list(cc)
249 else:
250 self.cc = []
251 if bcc:
252 if isinstance(bcc, str):
253 raise TypeError('"bcc" argument must be a list or tuple')
254 self.bcc = list(bcc)
255 else:
256 self.bcc = []
257 if reply_to:
258 if isinstance(reply_to, str):
259 raise TypeError('"reply_to" argument must be a list or tuple')
260 self.reply_to = list(reply_to)
261 else:
262 self.reply_to = settings.EMAIL_DEFAULT_REPLY_TO or []
263 self.from_email = from_email or settings.EMAIL_DEFAULT_FROM
264 self.subject = subject
265 self.body = body or ""
266 self.attachments = []
267 if attachments:
268 for attachment in attachments:
269 if isinstance(attachment, MIMEBase):
270 self.attach(attachment)
271 else:
272 self.attach(*attachment)
273 self.extra_headers = headers or {}
274 self.connection = connection
275
276 def get_connection(self) -> BaseEmailBackend:
277 from . import get_connection
278
279 if not self.connection:
280 self.connection = get_connection()
281 return self.connection
282
283 def message(self) -> SafeMIMEText | SafeMIMEMultipart:
284 encoding = self.encoding or "utf-8"
285 msg = SafeMIMEText(self.body, self.content_subtype, encoding)
286 msg = self._create_message(msg)
287 msg["Subject"] = self.subject
288 msg["From"] = self.extra_headers.get("From", self.from_email)
289 self._set_list_header_if_not_empty(msg, "To", self.to)
290 self._set_list_header_if_not_empty(msg, "Cc", self.cc)
291 self._set_list_header_if_not_empty(msg, "Reply-To", self.reply_to)
292
293 # Email header names are case-insensitive (RFC 2045), so we have to
294 # accommodate that when doing comparisons.
295 header_names = [key.lower() for key in self.extra_headers]
296 if "date" not in header_names:
297 # formatdate() uses stdlib methods to format the date, which use
298 # the stdlib/OS concept of a timezone, however, Plain sets the
299 # TZ environment variable based on the TIME_ZONE setting which
300 # will get picked up by formatdate().
301 msg["Date"] = formatdate(localtime=settings.EMAIL_USE_LOCALTIME)
302 if "message-id" not in header_names:
303 # Use cached _DNS_NAME for performance
304 msg["Message-ID"] = make_msgid(domain=str(_DNS_NAME))
305 for name, value in self.extra_headers.items():
306 if name.lower() != "from": # From is already handled
307 msg[name] = value
308 return msg
309
310 def recipients(self) -> list[str]:
311 """
312 Return a list of all recipients of the email (includes direct
313 addressees as well as Cc and Bcc entries).
314 """
315 return [email for email in (self.to + self.cc + self.bcc) if email]
316
317 def send(self) -> int:
318 """Send the email message."""
319 if not self.recipients():
320 # Don't bother creating the network connection if there's nobody to
321 # send to.
322 return 0
323 return self.get_connection().send_messages([self])
324
325 def attach(
326 self,
327 filename: MIMEBase | str | None = None,
328 content: str | bytes | None = None,
329 mimetype: str | None = None,
330 ) -> None:
331 """
332 Attach a file with the given filename and content. The filename can
333 be omitted and the mimetype is guessed, if not provided.
334
335 If the first parameter is a MIMEBase subclass, insert it directly
336 into the resulting message attachments.
337
338 For a text/* mimetype (guessed or specified), when a bytes object is
339 specified as content, decode it as UTF-8. If that fails, set the
340 mimetype to _DEFAULT_ATTACHMENT_MIME_TYPE and don't decode the content.
341 """
342 if isinstance(filename, MIMEBase):
343 if content is not None or mimetype is not None:
344 raise ValueError(
345 "content and mimetype must not be given when a MIMEBase "
346 "instance is provided."
347 )
348 self.attachments.append(filename)
349 elif content is None:
350 raise ValueError("content must be provided.")
351 else:
352 if filename is not None and mimetype is None:
353 mimetype = mimetypes.guess_type(filename)[0]
354 if mimetype is None:
355 mimetype = _DEFAULT_ATTACHMENT_MIME_TYPE
356 basetype, subtype = mimetype.split("/", 1)
357
358 if basetype == "text":
359 if isinstance(content, bytes):
360 try:
361 content = content.decode()
362 except UnicodeDecodeError:
363 # If mimetype suggests the file is text but it's
364 # actually binary, read() raises a UnicodeDecodeError.
365 mimetype = _DEFAULT_ATTACHMENT_MIME_TYPE
366
367 self.attachments.append((filename, content, mimetype))
368
369 def attach_file(
370 self, path: str | PathLike[str], mimetype: str | None = None
371 ) -> None:
372 """
373 Attach a file from the filesystem.
374
375 Set the mimetype to _DEFAULT_ATTACHMENT_MIME_TYPE if it isn't specified
376 and cannot be guessed.
377
378 For a text/* mimetype (guessed or specified), decode the file's content
379 as UTF-8. If that fails, set the mimetype to
380 _DEFAULT_ATTACHMENT_MIME_TYPE and don't decode the content.
381 """
382 path = Path(path)
383 with path.open("rb") as file:
384 content = file.read()
385 self.attach(path.name, content, mimetype)
386
387 def _create_message(self, msg: SafeMIMEText) -> SafeMIMEText | SafeMIMEMultipart:
388 return self._create_attachments(msg)
389
390 def _create_attachments(
391 self, msg: SafeMIMEText | SafeMIMEMultipart
392 ) -> SafeMIMEText | SafeMIMEMultipart:
393 if self.attachments:
394 encoding = self.encoding or "utf-8"
395 body_msg = msg
396 msg = SafeMIMEMultipart(_subtype=self.mixed_subtype, encoding=encoding)
397 if self.body or body_msg.is_multipart():
398 msg.attach(body_msg)
399 for attachment in self.attachments:
400 if isinstance(attachment, MIMEBase):
401 msg.attach(attachment)
402 else:
403 msg.attach(self._create_attachment(*attachment))
404 return msg
405
406 def _create_mime_attachment(
407 self, content: str | bytes | EmailMessage | Message, mimetype: str
408 ) -> SafeMIMEText | SafeMIMEMessage | MIMEBase:
409 """
410 Convert the content, mimetype pair into a MIME attachment object.
411
412 If the mimetype is message/rfc822, content may be an
413 email.Message or EmailMessage object, as well as a str.
414 """
415 basetype, subtype = mimetype.split("/", 1)
416 if basetype == "text":
417 encoding = self.encoding or "utf-8"
418 if not isinstance(content, str):
419 content = force_str(content)
420 attachment = SafeMIMEText(content, subtype, encoding)
421 elif basetype == "message" and subtype == "rfc822":
422 # Bug #18967: Per RFC 2046 Section 5.2.1, message/rfc822
423 # attachments must not be base64 encoded.
424 if isinstance(content, EmailMessage):
425 # convert content into an email.Message first
426 content = content.message()
427 elif not isinstance(content, Message):
428 # For compatibility with existing code, parse the message
429 # into an email.Message object if it is not one already.
430 content = message_from_string(force_str(content))
431
432 attachment = SafeMIMEMessage(content, subtype)
433 else:
434 # Encode non-text attachments with base64.
435 attachment = MIMEBase(basetype, subtype)
436 assert isinstance(content, bytes), "Non-text attachments must be bytes"
437 attachment.set_payload(content)
438 Encoders.encode_base64(attachment)
439 return attachment
440
441 def _create_attachment(
442 self, filename: str | None, content: str | bytes, mimetype: str
443 ) -> SafeMIMEText | SafeMIMEMessage | MIMEBase:
444 """
445 Convert the filename, content, mimetype triple into a MIME attachment
446 object.
447 """
448 attachment = self._create_mime_attachment(content, mimetype)
449 if filename:
450 try:
451 filename.encode("ascii")
452 encoded_filename: str | tuple[str, str, str] = filename
453 except UnicodeEncodeError:
454 encoded_filename = ("utf-8", "", filename)
455 attachment.add_header(
456 "Content-Disposition", "attachment", filename=encoded_filename
457 )
458 return attachment
459
460 def _set_list_header_if_not_empty(
461 self,
462 msg: SafeMIMEText | SafeMIMEMultipart,
463 header: str,
464 values: list[str],
465 ) -> None:
466 """
467 Set msg's header, either from self.extra_headers, if present, or from
468 the values argument.
469 """
470 if values:
471 try:
472 value = self.extra_headers[header]
473 except KeyError:
474 value = ", ".join(str(v) for v in values)
475 msg[header] = value
476
477
478class EmailMultiAlternatives(EmailMessage):
479 """
480 A version of EmailMessage that makes it easy to send multipart/alternative
481 messages. For example, including text and HTML versions of the text is
482 made easier.
483 """
484
485 alternative_subtype = "alternative"
486
487 def __init__(
488 self,
489 subject: str = "",
490 body: str = "",
491 from_email: str | None = None,
492 to: list[str] | tuple[str, ...] | None = None,
493 bcc: list[str] | tuple[str, ...] | None = None,
494 connection: BaseEmailBackend | None = None,
495 attachments: list[MIMEBase | tuple[str, str, str]] | None = None,
496 headers: dict[str, str] | None = None,
497 alternatives: list[tuple[str, str]] | None = None,
498 cc: list[str] | tuple[str, ...] | None = None,
499 reply_to: list[str] | tuple[str, ...] | None = None,
500 ) -> None:
501 """
502 Initialize a single email message (which can be sent to multiple
503 recipients).
504 """
505 super().__init__(
506 subject,
507 body,
508 from_email,
509 to,
510 bcc,
511 connection,
512 attachments,
513 headers,
514 cc,
515 reply_to,
516 )
517 self.alternatives = alternatives or []
518
519 def attach_alternative(self, content: str, mimetype: str) -> None:
520 """Attach an alternative content representation."""
521 if content is None or mimetype is None:
522 raise ValueError("Both content and mimetype must be provided.")
523 self.alternatives.append((content, mimetype))
524
525 def _create_message(self, msg: SafeMIMEText) -> SafeMIMEText | SafeMIMEMultipart:
526 return self._create_attachments(self._create_alternatives(msg))
527
528 def _create_alternatives(
529 self, msg: SafeMIMEText | SafeMIMEMultipart
530 ) -> SafeMIMEText | SafeMIMEMultipart:
531 encoding = self.encoding or "utf-8"
532 if self.alternatives:
533 body_msg = msg
534 msg = SafeMIMEMultipart(
535 _subtype=self.alternative_subtype, encoding=encoding
536 )
537 if self.body:
538 msg.attach(body_msg)
539 for alternative in self.alternatives:
540 msg.attach(self._create_mime_attachment(*alternative))
541 return msg
542
543
544class TemplateEmail(EmailMultiAlternatives):
545 def __init__(
546 self,
547 *,
548 template: str,
549 context: dict[str, Any] | None = None,
550 subject: str = "",
551 from_email: str | None = None,
552 to: list[str] | tuple[str, ...] | None = None,
553 bcc: list[str] | tuple[str, ...] | None = None,
554 connection: BaseEmailBackend | None = None,
555 attachments: list[MIMEBase | tuple[str, str, str]] | None = None,
556 headers: dict[str, str] | None = None,
557 alternatives: list[tuple[str, str]] | None = None,
558 cc: list[str] | tuple[str, ...] | None = None,
559 reply_to: list[str] | tuple[str, ...] | None = None,
560 ) -> None:
561 self.template = template
562 self.context = context or {}
563
564 # Run this once for all uses of the context
565 render_context = self.get_template_context()
566
567 self.body_html, body = self.render_content(render_context)
568
569 if not subject:
570 subject = self.render_subject(render_context)
571
572 super().__init__(
573 subject=subject,
574 body=body,
575 from_email=from_email,
576 to=to,
577 bcc=bcc,
578 connection=connection,
579 attachments=attachments,
580 headers=headers,
581 alternatives=alternatives,
582 cc=cc,
583 reply_to=reply_to,
584 )
585
586 self.attach_alternative(self.body_html, "text/html")
587
588 def get_template_context(self) -> dict[str, Any]:
589 """Subclasses can override this method to add context data."""
590 return self.context
591
592 def render_content(self, context: dict[str, Any]) -> tuple[str, str]:
593 html_content = self.render_html(context)
594
595 try:
596 plain_content = self.render_plain(context)
597 except TemplateFileMissing:
598 plain_content = strip_tags(html_content)
599
600 return html_content, plain_content
601
602 def render_plain(self, context: dict[str, Any]) -> str:
603 return Template(self.get_plain_template_name()).render(context)
604
605 def render_html(self, context: dict[str, Any]) -> str:
606 return Template(self.get_html_template_name()).render(context)
607
608 def render_subject(self, context: dict[str, Any]) -> str:
609 try:
610 subject = Template(self.get_subject_template_name()).render(context)
611 return subject.strip()
612 except TemplateFileMissing:
613 return ""
614
615 def get_plain_template_name(self) -> str:
616 return f"email/{self.template}.txt"
617
618 def get_html_template_name(self) -> str:
619 return f"email/{self.template}.html"
620
621 def get_subject_template_name(self) -> str:
622 return f"email/{self.template}.subject.txt"