1from __future__ import annotations
2
3#
4#
5# This file is part of gunicorn released under the MIT license.
6# See the LICENSE for more information.
7#
8# Vendored and modified for Plain.
9import errno
10import os
11import select
12import socket
13import ssl
14import sys
15from datetime import datetime
16from typing import Any
17
18from .. import http, sock, util
19from ..http import wsgi
20from . import base
21
22
23class StopWaiting(Exception):
24 """exception raised to stop waiting for a connection"""
25
26
27class SyncWorker(base.Worker):
28 def accept(self, listener: sock.BaseSocket) -> None:
29 client, addr = listener.accept()
30 client.setblocking(True)
31 util.close_on_exec(client.fileno())
32 self.handle(listener, client, addr)
33
34 def wait(self, timeout: float) -> list[Any] | None:
35 try:
36 self.notify()
37 ret = select.select(self.wait_fds, [], [], timeout)
38 if ret[0]:
39 if self.PIPE[0] in ret[0]:
40 os.read(self.PIPE[0], 1)
41 return ret[0]
42 return None
43
44 except OSError as e:
45 if e.args[0] == errno.EINTR:
46 return self.sockets
47 if e.args[0] == errno.EBADF:
48 if self.nr < 0:
49 return self.sockets
50 else:
51 raise StopWaiting
52 raise
53
54 def is_parent_alive(self) -> bool:
55 # If our parent changed then we shut down.
56 if self.ppid != os.getppid():
57 self.log.info("Parent changed, shutting down: %s", self)
58 return False
59 return True
60
61 def run_for_one(self, timeout: float) -> None:
62 listener = self.sockets[0]
63 while self.alive:
64 self.notify()
65
66 # Accept a connection. If we get an error telling us
67 # that no connection is waiting we fall down to the
68 # select which is where we'll wait for a bit for new
69 # workers to come give us some love.
70 try:
71 self.accept(listener)
72 # Keep processing clients until no one is waiting. This
73 # prevents the need to select() for every client that we
74 # process.
75 continue
76
77 except OSError as e:
78 if e.errno not in (errno.EAGAIN, errno.ECONNABORTED, errno.EWOULDBLOCK):
79 raise
80
81 if not self.is_parent_alive():
82 return None
83
84 try:
85 self.wait(timeout)
86 except StopWaiting:
87 return None
88
89 def run_for_multiple(self, timeout: float) -> None:
90 while self.alive:
91 self.notify()
92
93 try:
94 ready = self.wait(timeout)
95 except StopWaiting:
96 return None
97
98 if ready is not None:
99 for listener in ready:
100 if listener == self.PIPE[0]:
101 continue
102
103 try:
104 self.accept(listener)
105 except OSError as e:
106 if e.errno not in (
107 errno.EAGAIN,
108 errno.ECONNABORTED,
109 errno.EWOULDBLOCK,
110 ):
111 raise
112
113 if not self.is_parent_alive():
114 return None
115
116 def run(self) -> None:
117 # if no timeout is given the worker will never wait and will
118 # use the CPU for nothing. This minimal timeout prevent it.
119 timeout = self.timeout or 0.5
120
121 # self.socket appears to lose its blocking status after
122 # we fork in the arbiter. Reset it here.
123 for s in self.sockets:
124 s.setblocking(False)
125
126 if len(self.sockets) > 1:
127 self.run_for_multiple(timeout)
128 else:
129 self.run_for_one(timeout)
130
131 def handle(
132 self, listener: sock.BaseSocket, client: socket.socket, addr: Any
133 ) -> None:
134 req = None
135 try:
136 if self.cfg.is_ssl:
137 client = sock.ssl_wrap_socket(client, self.cfg)
138 parser = http.RequestParser(self.cfg, client, addr)
139 req = next(parser)
140 self.handle_request(listener, req, client, addr)
141 except http.errors.NoMoreData as e:
142 self.log.debug("Ignored premature client disconnection. %s", e)
143 except StopIteration as e:
144 self.log.debug("Closing connection. %s", e)
145 except ssl.SSLError as e:
146 if e.args[0] == ssl.SSL_ERROR_EOF:
147 self.log.debug("ssl connection closed")
148 client.close()
149 else:
150 self.log.debug("Error processing SSL request.")
151 self.handle_error(req, client, addr, e)
152 except OSError as e:
153 if e.errno not in (errno.EPIPE, errno.ECONNRESET, errno.ENOTCONN):
154 self.log.exception("Socket error processing request.")
155 else:
156 if e.errno == errno.ECONNRESET:
157 self.log.debug("Ignoring connection reset")
158 elif e.errno == errno.ENOTCONN:
159 self.log.debug("Ignoring socket not connected")
160 else:
161 self.log.debug("Ignoring EPIPE")
162 except BaseException as e:
163 self.handle_error(req, client, addr, e)
164 finally:
165 util.close(client)
166
167 def handle_request(
168 self, listener: sock.BaseSocket, req: Any, client: socket.socket, addr: Any
169 ) -> None:
170 environ = {}
171 resp = None
172 try:
173 request_start = datetime.now()
174 resp, environ = wsgi.create(
175 req, client, addr, listener.getsockname(), self.cfg
176 )
177 # Force the connection closed until someone shows
178 # a buffering proxy that supports Keep-Alive to
179 # the backend.
180 resp.force_close()
181 self.nr += 1
182 if self.nr >= self.max_requests:
183 self.log.info("Autorestarting worker after current request.")
184 self.alive = False
185 respiter = self.wsgi(environ, resp.start_response)
186 try:
187 if isinstance(respiter, environ["wsgi.file_wrapper"]):
188 resp.write_file(respiter)
189 else:
190 for item in respiter:
191 resp.write(item)
192 resp.close()
193 finally:
194 request_time = datetime.now() - request_start
195 self.log.access(resp, req, environ, request_time)
196 if hasattr(respiter, "close"):
197 respiter.close()
198 except OSError:
199 # pass to next try-except level
200 util.reraise(*sys.exc_info())
201 except Exception:
202 if resp and resp.headers_sent:
203 # If the requests have already been sent, we should close the
204 # connection to indicate the error.
205 self.log.exception("Error handling request")
206 try:
207 client.shutdown(socket.SHUT_RDWR)
208 client.close()
209 except OSError:
210 pass
211 raise StopIteration()
212 raise