plain.mail
Everything you need to send email.
1import mimetypes
2from email import charset as Charset
3from email import encoders as Encoders
4from email import generator, message_from_string
5from email.errors import HeaderParseError
6from email.header import Header
7from email.headerregistry import Address, parser
8from email.message import Message
9from email.mime.base import MIMEBase
10from email.mime.message import MIMEMessage
11from email.mime.multipart import MIMEMultipart
12from email.mime.text import MIMEText
13from email.utils import formataddr, formatdate, getaddresses, make_msgid
14from io import BytesIO, StringIO
15from pathlib import Path
16
17from plain.runtime import settings
18from plain.templates import Template, TemplateFileMissing
19from plain.utils.encoding import force_str, punycode
20from plain.utils.html import strip_tags
21
22from .utils import DNS_NAME
23
24# Don't BASE64-encode UTF-8 messages so that we avoid unwanted attention from
25# some spam filters.
26utf8_charset = Charset.Charset("utf-8")
27utf8_charset.body_encoding = None # Python defaults to BASE64
28utf8_charset_qp = Charset.Charset("utf-8")
29utf8_charset_qp.body_encoding = Charset.QP
30
31# Default MIME type to use on attachments (if it is not explicitly given
32# and cannot be guessed).
33DEFAULT_ATTACHMENT_MIME_TYPE = "application/octet-stream"
34
35RFC5322_EMAIL_LINE_LENGTH_LIMIT = 998
36
37
38class BadHeaderError(ValueError):
39 pass
40
41
42# Header names that contain structured address data (RFC 5322).
43ADDRESS_HEADERS = {
44 "from",
45 "sender",
46 "reply-to",
47 "to",
48 "cc",
49 "bcc",
50 "resent-from",
51 "resent-sender",
52 "resent-to",
53 "resent-cc",
54 "resent-bcc",
55}
56
57
58def forbid_multi_line_headers(name, val, encoding):
59 """Forbid multi-line headers to prevent header injection."""
60 encoding = encoding or settings.DEFAULT_CHARSET
61 val = str(val) # val may be lazy
62 if "\n" in val or "\r" in val:
63 raise BadHeaderError(
64 f"Header values can't contain newlines (got {val!r} for header {name!r})"
65 )
66 try:
67 val.encode("ascii")
68 except UnicodeEncodeError:
69 if name.lower() in ADDRESS_HEADERS:
70 val = ", ".join(
71 sanitize_address(addr, encoding) for addr in getaddresses((val,))
72 )
73 else:
74 val = Header(val, encoding).encode()
75 else:
76 if name.lower() == "subject":
77 val = Header(val).encode()
78 return name, val
79
80
81def sanitize_address(addr, encoding):
82 """
83 Format a pair of (name, address) or an email address string.
84 """
85 address = None
86 if not isinstance(addr, tuple):
87 addr = force_str(addr)
88 try:
89 token, rest = parser.get_mailbox(addr)
90 except (HeaderParseError, ValueError, IndexError):
91 raise ValueError('Invalid address "%s"' % addr)
92 else:
93 if rest:
94 # The entire email address must be parsed.
95 raise ValueError(
96 f'Invalid address; only {token} could be parsed from "{addr}"'
97 )
98 nm = token.display_name or ""
99 localpart = token.local_part
100 domain = token.domain or ""
101 else:
102 nm, address = addr
103 localpart, domain = address.rsplit("@", 1)
104
105 address_parts = nm + localpart + domain
106 if "\n" in address_parts or "\r" in address_parts:
107 raise ValueError("Invalid address; address parts cannot contain newlines.")
108
109 # Avoid UTF-8 encode, if it's possible.
110 try:
111 nm.encode("ascii")
112 nm = Header(nm).encode()
113 except UnicodeEncodeError:
114 nm = Header(nm, encoding).encode()
115 try:
116 localpart.encode("ascii")
117 except UnicodeEncodeError:
118 localpart = Header(localpart, encoding).encode()
119 domain = punycode(domain)
120
121 parsed_address = Address(username=localpart, domain=domain)
122 return formataddr((nm, parsed_address.addr_spec))
123
124
125class MIMEMixin:
126 def as_string(self, unixfrom=False, linesep="\n"):
127 """Return the entire formatted message as a string.
128 Optional `unixfrom' when True, means include the Unix From_ envelope
129 header.
130
131 This overrides the default as_string() implementation to not mangle
132 lines that begin with 'From '. See bug #13433 for details.
133 """
134 fp = StringIO()
135 g = generator.Generator(fp, mangle_from_=False)
136 g.flatten(self, unixfrom=unixfrom, linesep=linesep)
137 return fp.getvalue()
138
139 def as_bytes(self, unixfrom=False, linesep="\n"):
140 """Return the entire formatted message as bytes.
141 Optional `unixfrom' when True, means include the Unix From_ envelope
142 header.
143
144 This overrides the default as_bytes() implementation to not mangle
145 lines that begin with 'From '. See bug #13433 for details.
146 """
147 fp = BytesIO()
148 g = generator.BytesGenerator(fp, mangle_from_=False)
149 g.flatten(self, unixfrom=unixfrom, linesep=linesep)
150 return fp.getvalue()
151
152
153class SafeMIMEMessage(MIMEMixin, MIMEMessage):
154 def __setitem__(self, name, val):
155 # message/rfc822 attachments must be ASCII
156 name, val = forbid_multi_line_headers(name, val, "ascii")
157 MIMEMessage.__setitem__(self, name, val)
158
159
160class SafeMIMEText(MIMEMixin, MIMEText):
161 def __init__(self, _text, _subtype="plain", _charset=None):
162 self.encoding = _charset
163 MIMEText.__init__(self, _text, _subtype=_subtype, _charset=_charset)
164
165 def __setitem__(self, name, val):
166 name, val = forbid_multi_line_headers(name, val, self.encoding)
167 MIMEText.__setitem__(self, name, val)
168
169 def set_payload(self, payload, charset=None):
170 if charset == "utf-8" and not isinstance(charset, Charset.Charset):
171 has_long_lines = any(
172 len(line.encode()) > RFC5322_EMAIL_LINE_LENGTH_LIMIT
173 for line in payload.splitlines()
174 )
175 # Quoted-Printable encoding has the side effect of shortening long
176 # lines, if any (#22561).
177 charset = utf8_charset_qp if has_long_lines else utf8_charset
178 MIMEText.set_payload(self, payload, charset=charset)
179
180
181class SafeMIMEMultipart(MIMEMixin, MIMEMultipart):
182 def __init__(
183 self, _subtype="mixed", boundary=None, _subparts=None, encoding=None, **_params
184 ):
185 self.encoding = encoding
186 MIMEMultipart.__init__(self, _subtype, boundary, _subparts, **_params)
187
188 def __setitem__(self, name, val):
189 name, val = forbid_multi_line_headers(name, val, self.encoding)
190 MIMEMultipart.__setitem__(self, name, val)
191
192
193class EmailMessage:
194 """A container for email information."""
195
196 content_subtype = "plain"
197 mixed_subtype = "mixed"
198 encoding = None # None => use settings default
199
200 def __init__(
201 self,
202 subject="",
203 body="",
204 from_email=None,
205 to=None,
206 bcc=None,
207 connection=None,
208 attachments=None,
209 headers=None,
210 cc=None,
211 reply_to=None,
212 ):
213 """
214 Initialize a single email message (which can be sent to multiple
215 recipients).
216 """
217 if to:
218 if isinstance(to, str):
219 raise TypeError('"to" argument must be a list or tuple')
220 self.to = list(to)
221 else:
222 self.to = []
223 if cc:
224 if isinstance(cc, str):
225 raise TypeError('"cc" argument must be a list or tuple')
226 self.cc = list(cc)
227 else:
228 self.cc = []
229 if bcc:
230 if isinstance(bcc, str):
231 raise TypeError('"bcc" argument must be a list or tuple')
232 self.bcc = list(bcc)
233 else:
234 self.bcc = []
235 if reply_to:
236 if isinstance(reply_to, str):
237 raise TypeError('"reply_to" argument must be a list or tuple')
238 self.reply_to = list(reply_to)
239 else:
240 self.reply_to = []
241 self.from_email = from_email or settings.DEFAULT_FROM_EMAIL
242 self.subject = subject
243 self.body = body or ""
244 self.attachments = []
245 if attachments:
246 for attachment in attachments:
247 if isinstance(attachment, MIMEBase):
248 self.attach(attachment)
249 else:
250 self.attach(*attachment)
251 self.extra_headers = headers or {}
252 self.connection = connection
253
254 def get_connection(self, fail_silently=False):
255 from . import get_connection
256
257 if not self.connection:
258 self.connection = get_connection(fail_silently=fail_silently)
259 return self.connection
260
261 def message(self):
262 encoding = self.encoding or settings.DEFAULT_CHARSET
263 msg = SafeMIMEText(self.body, self.content_subtype, encoding)
264 msg = self._create_message(msg)
265 msg["Subject"] = self.subject
266 msg["From"] = self.extra_headers.get("From", self.from_email)
267 self._set_list_header_if_not_empty(msg, "To", self.to)
268 self._set_list_header_if_not_empty(msg, "Cc", self.cc)
269 self._set_list_header_if_not_empty(msg, "Reply-To", self.reply_to)
270
271 # Email header names are case-insensitive (RFC 2045), so we have to
272 # accommodate that when doing comparisons.
273 header_names = [key.lower() for key in self.extra_headers]
274 if "date" not in header_names:
275 # formatdate() uses stdlib methods to format the date, which use
276 # the stdlib/OS concept of a timezone, however, Plain sets the
277 # TZ environment variable based on the TIME_ZONE setting which
278 # will get picked up by formatdate().
279 msg["Date"] = formatdate(localtime=settings.EMAIL_USE_LOCALTIME)
280 if "message-id" not in header_names:
281 # Use cached DNS_NAME for performance
282 msg["Message-ID"] = make_msgid(domain=DNS_NAME)
283 for name, value in self.extra_headers.items():
284 if name.lower() != "from": # From is already handled
285 msg[name] = value
286 return msg
287
288 def recipients(self):
289 """
290 Return a list of all recipients of the email (includes direct
291 addressees as well as Cc and Bcc entries).
292 """
293 return [email for email in (self.to + self.cc + self.bcc) if email]
294
295 def send(self, fail_silently=False):
296 """Send the email message."""
297 if not self.recipients():
298 # Don't bother creating the network connection if there's nobody to
299 # send to.
300 return 0
301 return self.get_connection(fail_silently).send_messages([self])
302
303 def attach(self, filename=None, content=None, mimetype=None):
304 """
305 Attach a file with the given filename and content. The filename can
306 be omitted and the mimetype is guessed, if not provided.
307
308 If the first parameter is a MIMEBase subclass, insert it directly
309 into the resulting message attachments.
310
311 For a text/* mimetype (guessed or specified), when a bytes object is
312 specified as content, decode it as UTF-8. If that fails, set the
313 mimetype to DEFAULT_ATTACHMENT_MIME_TYPE and don't decode the content.
314 """
315 if isinstance(filename, MIMEBase):
316 if content is not None or mimetype is not None:
317 raise ValueError(
318 "content and mimetype must not be given when a MIMEBase "
319 "instance is provided."
320 )
321 self.attachments.append(filename)
322 elif content is None:
323 raise ValueError("content must be provided.")
324 else:
325 mimetype = (
326 mimetype
327 or mimetypes.guess_type(filename)[0]
328 or DEFAULT_ATTACHMENT_MIME_TYPE
329 )
330 basetype, subtype = mimetype.split("/", 1)
331
332 if basetype == "text":
333 if isinstance(content, bytes):
334 try:
335 content = content.decode()
336 except UnicodeDecodeError:
337 # If mimetype suggests the file is text but it's
338 # actually binary, read() raises a UnicodeDecodeError.
339 mimetype = DEFAULT_ATTACHMENT_MIME_TYPE
340
341 self.attachments.append((filename, content, mimetype))
342
343 def attach_file(self, path, mimetype=None):
344 """
345 Attach a file from the filesystem.
346
347 Set the mimetype to DEFAULT_ATTACHMENT_MIME_TYPE if it isn't specified
348 and cannot be guessed.
349
350 For a text/* mimetype (guessed or specified), decode the file's content
351 as UTF-8. If that fails, set the mimetype to
352 DEFAULT_ATTACHMENT_MIME_TYPE and don't decode the content.
353 """
354 path = Path(path)
355 with path.open("rb") as file:
356 content = file.read()
357 self.attach(path.name, content, mimetype)
358
359 def _create_message(self, msg):
360 return self._create_attachments(msg)
361
362 def _create_attachments(self, msg):
363 if self.attachments:
364 encoding = self.encoding or settings.DEFAULT_CHARSET
365 body_msg = msg
366 msg = SafeMIMEMultipart(_subtype=self.mixed_subtype, encoding=encoding)
367 if self.body or body_msg.is_multipart():
368 msg.attach(body_msg)
369 for attachment in self.attachments:
370 if isinstance(attachment, MIMEBase):
371 msg.attach(attachment)
372 else:
373 msg.attach(self._create_attachment(*attachment))
374 return msg
375
376 def _create_mime_attachment(self, content, mimetype):
377 """
378 Convert the content, mimetype pair into a MIME attachment object.
379
380 If the mimetype is message/rfc822, content may be an
381 email.Message or EmailMessage object, as well as a str.
382 """
383 basetype, subtype = mimetype.split("/", 1)
384 if basetype == "text":
385 encoding = self.encoding or settings.DEFAULT_CHARSET
386 attachment = SafeMIMEText(content, subtype, encoding)
387 elif basetype == "message" and subtype == "rfc822":
388 # Bug #18967: Per RFC 2046 Section 5.2.1, message/rfc822
389 # attachments must not be base64 encoded.
390 if isinstance(content, EmailMessage):
391 # convert content into an email.Message first
392 content = content.message()
393 elif not isinstance(content, Message):
394 # For compatibility with existing code, parse the message
395 # into an email.Message object if it is not one already.
396 content = message_from_string(force_str(content))
397
398 attachment = SafeMIMEMessage(content, subtype)
399 else:
400 # Encode non-text attachments with base64.
401 attachment = MIMEBase(basetype, subtype)
402 attachment.set_payload(content)
403 Encoders.encode_base64(attachment)
404 return attachment
405
406 def _create_attachment(self, filename, content, mimetype=None):
407 """
408 Convert the filename, content, mimetype triple into a MIME attachment
409 object.
410 """
411 attachment = self._create_mime_attachment(content, mimetype)
412 if filename:
413 try:
414 filename.encode("ascii")
415 except UnicodeEncodeError:
416 filename = ("utf-8", "", filename)
417 attachment.add_header(
418 "Content-Disposition", "attachment", filename=filename
419 )
420 return attachment
421
422 def _set_list_header_if_not_empty(self, msg, header, values):
423 """
424 Set msg's header, either from self.extra_headers, if present, or from
425 the values argument.
426 """
427 if values:
428 try:
429 value = self.extra_headers[header]
430 except KeyError:
431 value = ", ".join(str(v) for v in values)
432 msg[header] = value
433
434
435class EmailMultiAlternatives(EmailMessage):
436 """
437 A version of EmailMessage that makes it easy to send multipart/alternative
438 messages. For example, including text and HTML versions of the text is
439 made easier.
440 """
441
442 alternative_subtype = "alternative"
443
444 def __init__(
445 self,
446 subject="",
447 body="",
448 from_email=None,
449 to=None,
450 bcc=None,
451 connection=None,
452 attachments=None,
453 headers=None,
454 alternatives=None,
455 cc=None,
456 reply_to=None,
457 ):
458 """
459 Initialize a single email message (which can be sent to multiple
460 recipients).
461 """
462 super().__init__(
463 subject,
464 body,
465 from_email,
466 to,
467 bcc,
468 connection,
469 attachments,
470 headers,
471 cc,
472 reply_to,
473 )
474 self.alternatives = alternatives or []
475
476 def attach_alternative(self, content, mimetype):
477 """Attach an alternative content representation."""
478 if content is None or mimetype is None:
479 raise ValueError("Both content and mimetype must be provided.")
480 self.alternatives.append((content, mimetype))
481
482 def _create_message(self, msg):
483 return self._create_attachments(self._create_alternatives(msg))
484
485 def _create_alternatives(self, msg):
486 encoding = self.encoding or settings.DEFAULT_CHARSET
487 if self.alternatives:
488 body_msg = msg
489 msg = SafeMIMEMultipart(
490 _subtype=self.alternative_subtype, encoding=encoding
491 )
492 if self.body:
493 msg.attach(body_msg)
494 for alternative in self.alternatives:
495 msg.attach(self._create_mime_attachment(*alternative))
496 return msg
497
498
499class TemplateEmail(EmailMultiAlternatives):
500 def __init__(
501 self,
502 *,
503 template,
504 context=None,
505 subject="",
506 from_email=None,
507 to=None,
508 bcc=None,
509 connection=None,
510 attachments=None,
511 headers=None,
512 alternatives=None,
513 cc=None,
514 reply_to=None,
515 ):
516 self.template = template
517 self.context = context or {}
518
519 self.body_html, body = self.render_content()
520
521 super().__init__(
522 subject=subject,
523 body=body,
524 from_email=from_email,
525 to=to,
526 bcc=bcc,
527 connection=connection,
528 attachments=attachments,
529 headers=headers,
530 alternatives=alternatives,
531 cc=cc,
532 reply_to=reply_to,
533 )
534
535 self.attach_alternative(self.body_html, "text/html")
536
537 def get_template_context(self):
538 return self.context
539
540 def render_content(self):
541 context = self.get_template_context()
542 html_content = self.render_html(context)
543
544 try:
545 plain_content = self.render_plain(context)
546 except TemplateFileMissing:
547 plain_content = strip_tags(html_content)
548
549 return html_content, plain_content
550
551 def render_plain(self, context):
552 return Template(self.get_plain_template_name()).render(context)
553
554 def render_html(self, context):
555 return Template(self.get_html_template_name()).render(context)
556
557 def get_plain_template_name(self):
558 return f"mail/{self.template}.txt"
559
560 def get_html_template_name(self):
561 return f"mail/{self.template}.html"