Plain is headed towards 1.0! Subscribe for development updates →

  1from __future__ import annotations
  2
  3#
  4#
  5# This file is part of gunicorn released under the MIT license.
  6# See the LICENSE for more information.
  7#
  8# Vendored and modified for Plain.
  9import errno
 10import os
 11import select
 12import socket
 13import ssl
 14import sys
 15from datetime import datetime
 16from typing import Any
 17
 18from .. import http, sock, util
 19from ..http import wsgi
 20from . import base
 21
 22
 23class StopWaiting(Exception):
 24    """exception raised to stop waiting for a connection"""
 25
 26
 27class SyncWorker(base.Worker):
 28    def accept(self, listener: sock.BaseSocket) -> None:
 29        client, addr = listener.accept()
 30        client.setblocking(True)
 31        util.close_on_exec(client.fileno())
 32        self.handle(listener, client, addr)
 33
 34    def wait(self, timeout: float) -> list[Any] | None:
 35        try:
 36            self.notify()
 37            ret = select.select(self.wait_fds, [], [], timeout)
 38            if ret[0]:
 39                if self.PIPE[0] in ret[0]:
 40                    os.read(self.PIPE[0], 1)
 41                return ret[0]
 42            return None
 43
 44        except OSError as e:
 45            if e.args[0] == errno.EINTR:
 46                return self.sockets
 47            if e.args[0] == errno.EBADF:
 48                if self.nr < 0:
 49                    return self.sockets
 50                else:
 51                    raise StopWaiting
 52            raise
 53
 54    def is_parent_alive(self) -> bool:
 55        # If our parent changed then we shut down.
 56        if self.ppid != os.getppid():
 57            self.log.info("Parent changed, shutting down: %s", self)
 58            return False
 59        return True
 60
 61    def run_for_one(self, timeout: float) -> None:
 62        listener = self.sockets[0]
 63        while self.alive:
 64            self.notify()
 65
 66            # Accept a connection. If we get an error telling us
 67            # that no connection is waiting we fall down to the
 68            # select which is where we'll wait for a bit for new
 69            # workers to come give us some love.
 70            try:
 71                self.accept(listener)
 72                # Keep processing clients until no one is waiting. This
 73                # prevents the need to select() for every client that we
 74                # process.
 75                continue
 76
 77            except OSError as e:
 78                if e.errno not in (errno.EAGAIN, errno.ECONNABORTED, errno.EWOULDBLOCK):
 79                    raise
 80
 81            if not self.is_parent_alive():
 82                return None
 83
 84            try:
 85                self.wait(timeout)
 86            except StopWaiting:
 87                return None
 88
 89    def run_for_multiple(self, timeout: float) -> None:
 90        while self.alive:
 91            self.notify()
 92
 93            try:
 94                ready = self.wait(timeout)
 95            except StopWaiting:
 96                return None
 97
 98            if ready is not None:
 99                for listener in ready:
100                    if listener == self.PIPE[0]:
101                        continue
102
103                    try:
104                        self.accept(listener)
105                    except OSError as e:
106                        if e.errno not in (
107                            errno.EAGAIN,
108                            errno.ECONNABORTED,
109                            errno.EWOULDBLOCK,
110                        ):
111                            raise
112
113            if not self.is_parent_alive():
114                return None
115
116    def run(self) -> None:
117        # if no timeout is given the worker will never wait and will
118        # use the CPU for nothing. This minimal timeout prevent it.
119        timeout = self.timeout or 0.5
120
121        # self.socket appears to lose its blocking status after
122        # we fork in the arbiter. Reset it here.
123        for s in self.sockets:
124            s.setblocking(False)
125
126        if len(self.sockets) > 1:
127            self.run_for_multiple(timeout)
128        else:
129            self.run_for_one(timeout)
130
131    def handle(
132        self, listener: sock.BaseSocket, client: socket.socket, addr: Any
133    ) -> None:
134        req = None
135        try:
136            if self.cfg.is_ssl:
137                client = sock.ssl_wrap_socket(client, self.cfg)
138            parser = http.RequestParser(self.cfg, client, addr)
139            req = next(parser)
140            self.handle_request(listener, req, client, addr)
141        except http.errors.NoMoreData as e:
142            self.log.debug("Ignored premature client disconnection. %s", e)
143        except StopIteration as e:
144            self.log.debug("Closing connection. %s", e)
145        except ssl.SSLError as e:
146            if e.args[0] == ssl.SSL_ERROR_EOF:
147                self.log.debug("ssl connection closed")
148                client.close()
149            else:
150                self.log.debug("Error processing SSL request.")
151                self.handle_error(req, client, addr, e)
152        except OSError as e:
153            if e.errno not in (errno.EPIPE, errno.ECONNRESET, errno.ENOTCONN):
154                self.log.exception("Socket error processing request.")
155            else:
156                if e.errno == errno.ECONNRESET:
157                    self.log.debug("Ignoring connection reset")
158                elif e.errno == errno.ENOTCONN:
159                    self.log.debug("Ignoring socket not connected")
160                else:
161                    self.log.debug("Ignoring EPIPE")
162        except BaseException as e:
163            self.handle_error(req, client, addr, e)
164        finally:
165            util.close(client)
166
167    def handle_request(
168        self, listener: sock.BaseSocket, req: Any, client: socket.socket, addr: Any
169    ) -> None:
170        environ = {}
171        resp = None
172        try:
173            request_start = datetime.now()
174            resp, environ = wsgi.create(
175                req, client, addr, listener.getsockname(), self.cfg
176            )
177            # Force the connection closed until someone shows
178            # a buffering proxy that supports Keep-Alive to
179            # the backend.
180            resp.force_close()
181            self.nr += 1
182            if self.nr >= self.max_requests:
183                self.log.info("Autorestarting worker after current request.")
184                self.alive = False
185            respiter = self.wsgi(environ, resp.start_response)
186            try:
187                if isinstance(respiter, environ["wsgi.file_wrapper"]):
188                    resp.write_file(respiter)
189                else:
190                    for item in respiter:
191                        resp.write(item)
192                resp.close()
193            finally:
194                request_time = datetime.now() - request_start
195                self.log.access(resp, req, environ, request_time)
196                if hasattr(respiter, "close"):
197                    respiter.close()
198        except OSError:
199            # pass to next try-except level
200            util.reraise(*sys.exc_info())
201        except Exception:
202            if resp and resp.headers_sent:
203                # If the requests have already been sent, we should close the
204                # connection to indicate the error.
205                self.log.exception("Error handling request")
206                try:
207                    client.shutdown(socket.SHUT_RDWR)
208                    client.close()
209                except OSError:
210                    pass
211                raise StopIteration()
212            raise