plain.mail
Everything you need to send email.
1import mimetypes
2from email import charset as Charset
3from email import encoders as Encoders
4from email import generator, message_from_string
5from email.errors import HeaderParseError
6from email.header import Header
7from email.headerregistry import Address, parser
8from email.message import Message
9from email.mime.base import MIMEBase
10from email.mime.message import MIMEMessage
11from email.mime.multipart import MIMEMultipart
12from email.mime.text import MIMEText
13from email.utils import formataddr, formatdate, getaddresses, make_msgid
14from io import BytesIO, StringIO
15from pathlib import Path
16
17from plain.runtime import settings
18from plain.utils.encoding import force_str, punycode
19
20from .utils import DNS_NAME
21
22# Don't BASE64-encode UTF-8 messages so that we avoid unwanted attention from
23# some spam filters.
24utf8_charset = Charset.Charset("utf-8")
25utf8_charset.body_encoding = None # Python defaults to BASE64
26utf8_charset_qp = Charset.Charset("utf-8")
27utf8_charset_qp.body_encoding = Charset.QP
28
29# Default MIME type to use on attachments (if it is not explicitly given
30# and cannot be guessed).
31DEFAULT_ATTACHMENT_MIME_TYPE = "application/octet-stream"
32
33RFC5322_EMAIL_LINE_LENGTH_LIMIT = 998
34
35
36class BadHeaderError(ValueError):
37 pass
38
39
40# Header names that contain structured address data (RFC 5322).
41ADDRESS_HEADERS = {
42 "from",
43 "sender",
44 "reply-to",
45 "to",
46 "cc",
47 "bcc",
48 "resent-from",
49 "resent-sender",
50 "resent-to",
51 "resent-cc",
52 "resent-bcc",
53}
54
55
56def forbid_multi_line_headers(name, val, encoding):
57 """Forbid multi-line headers to prevent header injection."""
58 encoding = encoding or settings.DEFAULT_CHARSET
59 val = str(val) # val may be lazy
60 if "\n" in val or "\r" in val:
61 raise BadHeaderError(
62 f"Header values can't contain newlines (got {val!r} for header {name!r})"
63 )
64 try:
65 val.encode("ascii")
66 except UnicodeEncodeError:
67 if name.lower() in ADDRESS_HEADERS:
68 val = ", ".join(
69 sanitize_address(addr, encoding) for addr in getaddresses((val,))
70 )
71 else:
72 val = Header(val, encoding).encode()
73 else:
74 if name.lower() == "subject":
75 val = Header(val).encode()
76 return name, val
77
78
79def sanitize_address(addr, encoding):
80 """
81 Format a pair of (name, address) or an email address string.
82 """
83 address = None
84 if not isinstance(addr, tuple):
85 addr = force_str(addr)
86 try:
87 token, rest = parser.get_mailbox(addr)
88 except (HeaderParseError, ValueError, IndexError):
89 raise ValueError('Invalid address "%s"' % addr)
90 else:
91 if rest:
92 # The entire email address must be parsed.
93 raise ValueError(
94 f'Invalid address; only {token} could be parsed from "{addr}"'
95 )
96 nm = token.display_name or ""
97 localpart = token.local_part
98 domain = token.domain or ""
99 else:
100 nm, address = addr
101 localpart, domain = address.rsplit("@", 1)
102
103 address_parts = nm + localpart + domain
104 if "\n" in address_parts or "\r" in address_parts:
105 raise ValueError("Invalid address; address parts cannot contain newlines.")
106
107 # Avoid UTF-8 encode, if it's possible.
108 try:
109 nm.encode("ascii")
110 nm = Header(nm).encode()
111 except UnicodeEncodeError:
112 nm = Header(nm, encoding).encode()
113 try:
114 localpart.encode("ascii")
115 except UnicodeEncodeError:
116 localpart = Header(localpart, encoding).encode()
117 domain = punycode(domain)
118
119 parsed_address = Address(username=localpart, domain=domain)
120 return formataddr((nm, parsed_address.addr_spec))
121
122
123class MIMEMixin:
124 def as_string(self, unixfrom=False, linesep="\n"):
125 """Return the entire formatted message as a string.
126 Optional `unixfrom' when True, means include the Unix From_ envelope
127 header.
128
129 This overrides the default as_string() implementation to not mangle
130 lines that begin with 'From '. See bug #13433 for details.
131 """
132 fp = StringIO()
133 g = generator.Generator(fp, mangle_from_=False)
134 g.flatten(self, unixfrom=unixfrom, linesep=linesep)
135 return fp.getvalue()
136
137 def as_bytes(self, unixfrom=False, linesep="\n"):
138 """Return the entire formatted message as bytes.
139 Optional `unixfrom' when True, means include the Unix From_ envelope
140 header.
141
142 This overrides the default as_bytes() implementation to not mangle
143 lines that begin with 'From '. See bug #13433 for details.
144 """
145 fp = BytesIO()
146 g = generator.BytesGenerator(fp, mangle_from_=False)
147 g.flatten(self, unixfrom=unixfrom, linesep=linesep)
148 return fp.getvalue()
149
150
151class SafeMIMEMessage(MIMEMixin, MIMEMessage):
152 def __setitem__(self, name, val):
153 # message/rfc822 attachments must be ASCII
154 name, val = forbid_multi_line_headers(name, val, "ascii")
155 MIMEMessage.__setitem__(self, name, val)
156
157
158class SafeMIMEText(MIMEMixin, MIMEText):
159 def __init__(self, _text, _subtype="plain", _charset=None):
160 self.encoding = _charset
161 MIMEText.__init__(self, _text, _subtype=_subtype, _charset=_charset)
162
163 def __setitem__(self, name, val):
164 name, val = forbid_multi_line_headers(name, val, self.encoding)
165 MIMEText.__setitem__(self, name, val)
166
167 def set_payload(self, payload, charset=None):
168 if charset == "utf-8" and not isinstance(charset, Charset.Charset):
169 has_long_lines = any(
170 len(line.encode()) > RFC5322_EMAIL_LINE_LENGTH_LIMIT
171 for line in payload.splitlines()
172 )
173 # Quoted-Printable encoding has the side effect of shortening long
174 # lines, if any (#22561).
175 charset = utf8_charset_qp if has_long_lines else utf8_charset
176 MIMEText.set_payload(self, payload, charset=charset)
177
178
179class SafeMIMEMultipart(MIMEMixin, MIMEMultipart):
180 def __init__(
181 self, _subtype="mixed", boundary=None, _subparts=None, encoding=None, **_params
182 ):
183 self.encoding = encoding
184 MIMEMultipart.__init__(self, _subtype, boundary, _subparts, **_params)
185
186 def __setitem__(self, name, val):
187 name, val = forbid_multi_line_headers(name, val, self.encoding)
188 MIMEMultipart.__setitem__(self, name, val)
189
190
191class EmailMessage:
192 """A container for email information."""
193
194 content_subtype = "plain"
195 mixed_subtype = "mixed"
196 encoding = None # None => use settings default
197
198 def __init__(
199 self,
200 subject="",
201 body="",
202 from_email=None,
203 to=None,
204 bcc=None,
205 connection=None,
206 attachments=None,
207 headers=None,
208 cc=None,
209 reply_to=None,
210 ):
211 """
212 Initialize a single email message (which can be sent to multiple
213 recipients).
214 """
215 if to:
216 if isinstance(to, str):
217 raise TypeError('"to" argument must be a list or tuple')
218 self.to = list(to)
219 else:
220 self.to = []
221 if cc:
222 if isinstance(cc, str):
223 raise TypeError('"cc" argument must be a list or tuple')
224 self.cc = list(cc)
225 else:
226 self.cc = []
227 if bcc:
228 if isinstance(bcc, str):
229 raise TypeError('"bcc" argument must be a list or tuple')
230 self.bcc = list(bcc)
231 else:
232 self.bcc = []
233 if reply_to:
234 if isinstance(reply_to, str):
235 raise TypeError('"reply_to" argument must be a list or tuple')
236 self.reply_to = list(reply_to)
237 else:
238 self.reply_to = []
239 self.from_email = from_email or settings.DEFAULT_FROM_EMAIL
240 self.subject = subject
241 self.body = body or ""
242 self.attachments = []
243 if attachments:
244 for attachment in attachments:
245 if isinstance(attachment, MIMEBase):
246 self.attach(attachment)
247 else:
248 self.attach(*attachment)
249 self.extra_headers = headers or {}
250 self.connection = connection
251
252 def get_connection(self, fail_silently=False):
253 from . import get_connection
254
255 if not self.connection:
256 self.connection = get_connection(fail_silently=fail_silently)
257 return self.connection
258
259 def message(self):
260 encoding = self.encoding or settings.DEFAULT_CHARSET
261 msg = SafeMIMEText(self.body, self.content_subtype, encoding)
262 msg = self._create_message(msg)
263 msg["Subject"] = self.subject
264 msg["From"] = self.extra_headers.get("From", self.from_email)
265 self._set_list_header_if_not_empty(msg, "To", self.to)
266 self._set_list_header_if_not_empty(msg, "Cc", self.cc)
267 self._set_list_header_if_not_empty(msg, "Reply-To", self.reply_to)
268
269 # Email header names are case-insensitive (RFC 2045), so we have to
270 # accommodate that when doing comparisons.
271 header_names = [key.lower() for key in self.extra_headers]
272 if "date" not in header_names:
273 # formatdate() uses stdlib methods to format the date, which use
274 # the stdlib/OS concept of a timezone, however, Plain sets the
275 # TZ environment variable based on the TIME_ZONE setting which
276 # will get picked up by formatdate().
277 msg["Date"] = formatdate(localtime=settings.EMAIL_USE_LOCALTIME)
278 if "message-id" not in header_names:
279 # Use cached DNS_NAME for performance
280 msg["Message-ID"] = make_msgid(domain=DNS_NAME)
281 for name, value in self.extra_headers.items():
282 if name.lower() != "from": # From is already handled
283 msg[name] = value
284 return msg
285
286 def recipients(self):
287 """
288 Return a list of all recipients of the email (includes direct
289 addressees as well as Cc and Bcc entries).
290 """
291 return [email for email in (self.to + self.cc + self.bcc) if email]
292
293 def send(self, fail_silently=False):
294 """Send the email message."""
295 if not self.recipients():
296 # Don't bother creating the network connection if there's nobody to
297 # send to.
298 return 0
299 return self.get_connection(fail_silently).send_messages([self])
300
301 def attach(self, filename=None, content=None, mimetype=None):
302 """
303 Attach a file with the given filename and content. The filename can
304 be omitted and the mimetype is guessed, if not provided.
305
306 If the first parameter is a MIMEBase subclass, insert it directly
307 into the resulting message attachments.
308
309 For a text/* mimetype (guessed or specified), when a bytes object is
310 specified as content, decode it as UTF-8. If that fails, set the
311 mimetype to DEFAULT_ATTACHMENT_MIME_TYPE and don't decode the content.
312 """
313 if isinstance(filename, MIMEBase):
314 if content is not None or mimetype is not None:
315 raise ValueError(
316 "content and mimetype must not be given when a MIMEBase "
317 "instance is provided."
318 )
319 self.attachments.append(filename)
320 elif content is None:
321 raise ValueError("content must be provided.")
322 else:
323 mimetype = (
324 mimetype
325 or mimetypes.guess_type(filename)[0]
326 or DEFAULT_ATTACHMENT_MIME_TYPE
327 )
328 basetype, subtype = mimetype.split("/", 1)
329
330 if basetype == "text":
331 if isinstance(content, bytes):
332 try:
333 content = content.decode()
334 except UnicodeDecodeError:
335 # If mimetype suggests the file is text but it's
336 # actually binary, read() raises a UnicodeDecodeError.
337 mimetype = DEFAULT_ATTACHMENT_MIME_TYPE
338
339 self.attachments.append((filename, content, mimetype))
340
341 def attach_file(self, path, mimetype=None):
342 """
343 Attach a file from the filesystem.
344
345 Set the mimetype to DEFAULT_ATTACHMENT_MIME_TYPE if it isn't specified
346 and cannot be guessed.
347
348 For a text/* mimetype (guessed or specified), decode the file's content
349 as UTF-8. If that fails, set the mimetype to
350 DEFAULT_ATTACHMENT_MIME_TYPE and don't decode the content.
351 """
352 path = Path(path)
353 with path.open("rb") as file:
354 content = file.read()
355 self.attach(path.name, content, mimetype)
356
357 def _create_message(self, msg):
358 return self._create_attachments(msg)
359
360 def _create_attachments(self, msg):
361 if self.attachments:
362 encoding = self.encoding or settings.DEFAULT_CHARSET
363 body_msg = msg
364 msg = SafeMIMEMultipart(_subtype=self.mixed_subtype, encoding=encoding)
365 if self.body or body_msg.is_multipart():
366 msg.attach(body_msg)
367 for attachment in self.attachments:
368 if isinstance(attachment, MIMEBase):
369 msg.attach(attachment)
370 else:
371 msg.attach(self._create_attachment(*attachment))
372 return msg
373
374 def _create_mime_attachment(self, content, mimetype):
375 """
376 Convert the content, mimetype pair into a MIME attachment object.
377
378 If the mimetype is message/rfc822, content may be an
379 email.Message or EmailMessage object, as well as a str.
380 """
381 basetype, subtype = mimetype.split("/", 1)
382 if basetype == "text":
383 encoding = self.encoding or settings.DEFAULT_CHARSET
384 attachment = SafeMIMEText(content, subtype, encoding)
385 elif basetype == "message" and subtype == "rfc822":
386 # Bug #18967: Per RFC 2046 Section 5.2.1, message/rfc822
387 # attachments must not be base64 encoded.
388 if isinstance(content, EmailMessage):
389 # convert content into an email.Message first
390 content = content.message()
391 elif not isinstance(content, Message):
392 # For compatibility with existing code, parse the message
393 # into an email.Message object if it is not one already.
394 content = message_from_string(force_str(content))
395
396 attachment = SafeMIMEMessage(content, subtype)
397 else:
398 # Encode non-text attachments with base64.
399 attachment = MIMEBase(basetype, subtype)
400 attachment.set_payload(content)
401 Encoders.encode_base64(attachment)
402 return attachment
403
404 def _create_attachment(self, filename, content, mimetype=None):
405 """
406 Convert the filename, content, mimetype triple into a MIME attachment
407 object.
408 """
409 attachment = self._create_mime_attachment(content, mimetype)
410 if filename:
411 try:
412 filename.encode("ascii")
413 except UnicodeEncodeError:
414 filename = ("utf-8", "", filename)
415 attachment.add_header(
416 "Content-Disposition", "attachment", filename=filename
417 )
418 return attachment
419
420 def _set_list_header_if_not_empty(self, msg, header, values):
421 """
422 Set msg's header, either from self.extra_headers, if present, or from
423 the values argument.
424 """
425 if values:
426 try:
427 value = self.extra_headers[header]
428 except KeyError:
429 value = ", ".join(str(v) for v in values)
430 msg[header] = value
431
432
433class EmailMultiAlternatives(EmailMessage):
434 """
435 A version of EmailMessage that makes it easy to send multipart/alternative
436 messages. For example, including text and HTML versions of the text is
437 made easier.
438 """
439
440 alternative_subtype = "alternative"
441
442 def __init__(
443 self,
444 subject="",
445 body="",
446 from_email=None,
447 to=None,
448 bcc=None,
449 connection=None,
450 attachments=None,
451 headers=None,
452 alternatives=None,
453 cc=None,
454 reply_to=None,
455 ):
456 """
457 Initialize a single email message (which can be sent to multiple
458 recipients).
459 """
460 super().__init__(
461 subject,
462 body,
463 from_email,
464 to,
465 bcc,
466 connection,
467 attachments,
468 headers,
469 cc,
470 reply_to,
471 )
472 self.alternatives = alternatives or []
473
474 def attach_alternative(self, content, mimetype):
475 """Attach an alternative content representation."""
476 if content is None or mimetype is None:
477 raise ValueError("Both content and mimetype must be provided.")
478 self.alternatives.append((content, mimetype))
479
480 def _create_message(self, msg):
481 return self._create_attachments(self._create_alternatives(msg))
482
483 def _create_alternatives(self, msg):
484 encoding = self.encoding or settings.DEFAULT_CHARSET
485 if self.alternatives:
486 body_msg = msg
487 msg = SafeMIMEMultipart(
488 _subtype=self.alternative_subtype, encoding=encoding
489 )
490 if self.body:
491 msg.attach(body_msg)
492 for alternative in self.alternatives:
493 msg.attach(self._create_mime_attachment(*alternative))
494 return msg