Plain is headed towards 1.0! Subscribe for development updates →

 1"""Email backend that writes messages to a file."""
 2
 3import datetime
 4import os
 5
 6from plain.exceptions import ImproperlyConfigured
 7from plain.runtime import settings
 8
 9from .console import EmailBackend as ConsoleEmailBackend
10
11
12class EmailBackend(ConsoleEmailBackend):
13    def __init__(self, *args, file_path=None, **kwargs):
14        self._fname = None
15        if file_path is not None:
16            self.file_path = file_path
17        else:
18            self.file_path = getattr(settings, "EMAIL_FILE_PATH", None)
19        self.file_path = os.path.abspath(self.file_path)
20        try:
21            os.makedirs(self.file_path, exist_ok=True)
22        except FileExistsError:
23            raise ImproperlyConfigured(
24                f"Path for saving email messages exists, but is not a directory: {self.file_path}"
25            )
26        except OSError as err:
27            raise ImproperlyConfigured(
28                f"Could not create directory for saving email messages: {self.file_path} ({err})"
29            )
30        # Make sure that self.file_path is writable.
31        if not os.access(self.file_path, os.W_OK):
32            raise ImproperlyConfigured(
33                f"Could not write to directory: {self.file_path}"
34            )
35        # Finally, call super().
36        # Since we're using the console-based backend as a base,
37        # force the stream to be None, so we don't default to stdout
38        kwargs["stream"] = None
39        super().__init__(*args, **kwargs)
40
41    def write_message(self, message):
42        self.stream.write(message.message().as_bytes() + b"\n")
43        self.stream.write(b"-" * 79)
44        self.stream.write(b"\n")
45
46    def _get_filename(self):
47        """Return a unique file name."""
48        if self._fname is None:
49            timestamp = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
50            fname = f"{timestamp}-{abs(id(self))}.log"
51            self._fname = os.path.join(self.file_path, fname)
52        return self._fname
53
54    def open(self):
55        if self.stream is None:
56            self.stream = open(self._get_filename(), "ab")
57            return True
58        return False
59
60    def close(self):
61        try:
62            if self.stream is not None:
63                self.stream.close()
64        finally:
65            self.stream = None