You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
534 lines
18 KiB
534 lines
18 KiB
from __future__ import annotations |
|
|
|
import getpass |
|
import hashlib |
|
import json |
|
import os |
|
import pkgutil |
|
import re |
|
import sys |
|
import time |
|
import typing as t |
|
import uuid |
|
from contextlib import ExitStack |
|
from io import BytesIO |
|
from itertools import chain |
|
from os.path import basename |
|
from os.path import join |
|
from zlib import adler32 |
|
|
|
from .._internal import _log |
|
from ..exceptions import NotFound |
|
from ..http import parse_cookie |
|
from ..security import gen_salt |
|
from ..utils import send_file |
|
from ..wrappers.request import Request |
|
from ..wrappers.response import Response |
|
from .console import Console |
|
from .tbtools import DebugFrameSummary |
|
from .tbtools import DebugTraceback |
|
from .tbtools import render_console_html |
|
|
|
if t.TYPE_CHECKING: |
|
from _typeshed.wsgi import StartResponse |
|
from _typeshed.wsgi import WSGIApplication |
|
from _typeshed.wsgi import WSGIEnvironment |
|
|
|
# A week |
|
PIN_TIME = 60 * 60 * 24 * 7 |
|
|
|
|
|
def hash_pin(pin: str) -> str: |
|
return hashlib.sha1(f"{pin} added salt".encode("utf-8", "replace")).hexdigest()[:12] |
|
|
|
|
|
_machine_id: str | bytes | None = None |
|
|
|
|
|
def get_machine_id() -> str | bytes | None: |
|
global _machine_id |
|
|
|
if _machine_id is not None: |
|
return _machine_id |
|
|
|
def _generate() -> str | bytes | None: |
|
linux = b"" |
|
|
|
# machine-id is stable across boots, boot_id is not. |
|
for filename in "/etc/machine-id", "/proc/sys/kernel/random/boot_id": |
|
try: |
|
with open(filename, "rb") as f: |
|
value = f.readline().strip() |
|
except OSError: |
|
continue |
|
|
|
if value: |
|
linux += value |
|
break |
|
|
|
# Containers share the same machine id, add some cgroup |
|
# information. This is used outside containers too but should be |
|
# relatively stable across boots. |
|
try: |
|
with open("/proc/self/cgroup", "rb") as f: |
|
linux += f.readline().strip().rpartition(b"/")[2] |
|
except OSError: |
|
pass |
|
|
|
if linux: |
|
return linux |
|
|
|
# On OS X, use ioreg to get the computer's serial number. |
|
try: |
|
# subprocess may not be available, e.g. Google App Engine |
|
# https://github.com/pallets/werkzeug/issues/925 |
|
from subprocess import Popen, PIPE |
|
|
|
dump = Popen( |
|
["ioreg", "-c", "IOPlatformExpertDevice", "-d", "2"], stdout=PIPE |
|
).communicate()[0] |
|
match = re.search(b'"serial-number" = <([^>]+)', dump) |
|
|
|
if match is not None: |
|
return match.group(1) |
|
except (OSError, ImportError): |
|
pass |
|
|
|
# On Windows, use winreg to get the machine guid. |
|
if sys.platform == "win32": |
|
import winreg |
|
|
|
try: |
|
with winreg.OpenKey( |
|
winreg.HKEY_LOCAL_MACHINE, |
|
"SOFTWARE\\Microsoft\\Cryptography", |
|
0, |
|
winreg.KEY_READ | winreg.KEY_WOW64_64KEY, |
|
) as rk: |
|
guid: str | bytes |
|
guid_type: int |
|
guid, guid_type = winreg.QueryValueEx(rk, "MachineGuid") |
|
|
|
if guid_type == winreg.REG_SZ: |
|
return guid.encode("utf-8") |
|
|
|
return guid |
|
except OSError: |
|
pass |
|
|
|
return None |
|
|
|
_machine_id = _generate() |
|
return _machine_id |
|
|
|
|
|
class _ConsoleFrame: |
|
"""Helper class so that we can reuse the frame console code for the |
|
standalone console. |
|
""" |
|
|
|
def __init__(self, namespace: dict[str, t.Any]): |
|
self.console = Console(namespace) |
|
self.id = 0 |
|
|
|
def eval(self, code: str) -> t.Any: |
|
return self.console.eval(code) |
|
|
|
|
|
def get_pin_and_cookie_name( |
|
app: WSGIApplication, |
|
) -> tuple[str, str] | tuple[None, None]: |
|
"""Given an application object this returns a semi-stable 9 digit pin |
|
code and a random key. The hope is that this is stable between |
|
restarts to not make debugging particularly frustrating. If the pin |
|
was forcefully disabled this returns `None`. |
|
|
|
Second item in the resulting tuple is the cookie name for remembering. |
|
""" |
|
pin = os.environ.get("WERKZEUG_DEBUG_PIN") |
|
rv = None |
|
num = None |
|
|
|
# Pin was explicitly disabled |
|
if pin == "off": |
|
return None, None |
|
|
|
# Pin was provided explicitly |
|
if pin is not None and pin.replace("-", "").isdecimal(): |
|
# If there are separators in the pin, return it directly |
|
if "-" in pin: |
|
rv = pin |
|
else: |
|
num = pin |
|
|
|
modname = getattr(app, "__module__", t.cast(object, app).__class__.__module__) |
|
username: str | None |
|
|
|
try: |
|
# getuser imports the pwd module, which does not exist in Google |
|
# App Engine. It may also raise a KeyError if the UID does not |
|
# have a username, such as in Docker. |
|
username = getpass.getuser() |
|
except (ImportError, KeyError): |
|
username = None |
|
|
|
mod = sys.modules.get(modname) |
|
|
|
# This information only exists to make the cookie unique on the |
|
# computer, not as a security feature. |
|
probably_public_bits = [ |
|
username, |
|
modname, |
|
getattr(app, "__name__", type(app).__name__), |
|
getattr(mod, "__file__", None), |
|
] |
|
|
|
# This information is here to make it harder for an attacker to |
|
# guess the cookie name. They are unlikely to be contained anywhere |
|
# within the unauthenticated debug page. |
|
private_bits = [str(uuid.getnode()), get_machine_id()] |
|
|
|
h = hashlib.sha1() |
|
for bit in chain(probably_public_bits, private_bits): |
|
if not bit: |
|
continue |
|
if isinstance(bit, str): |
|
bit = bit.encode("utf-8") |
|
h.update(bit) |
|
h.update(b"cookiesalt") |
|
|
|
cookie_name = f"__wzd{h.hexdigest()[:20]}" |
|
|
|
# If we need to generate a pin we salt it a bit more so that we don't |
|
# end up with the same value and generate out 9 digits |
|
if num is None: |
|
h.update(b"pinsalt") |
|
num = f"{int(h.hexdigest(), 16):09d}"[:9] |
|
|
|
# Format the pincode in groups of digits for easier remembering if |
|
# we don't have a result yet. |
|
if rv is None: |
|
for group_size in 5, 4, 3: |
|
if len(num) % group_size == 0: |
|
rv = "-".join( |
|
num[x : x + group_size].rjust(group_size, "0") |
|
for x in range(0, len(num), group_size) |
|
) |
|
break |
|
else: |
|
rv = num |
|
|
|
return rv, cookie_name |
|
|
|
|
|
class DebuggedApplication: |
|
"""Enables debugging support for a given application:: |
|
|
|
from werkzeug.debug import DebuggedApplication |
|
from myapp import app |
|
app = DebuggedApplication(app, evalex=True) |
|
|
|
The ``evalex`` argument allows evaluating expressions in any frame |
|
of a traceback. This works by preserving each frame with its local |
|
state. Some state, such as context globals, cannot be restored with |
|
the frame by default. When ``evalex`` is enabled, |
|
``environ["werkzeug.debug.preserve_context"]`` will be a callable |
|
that takes a context manager, and can be called multiple times. |
|
Each context manager will be entered before evaluating code in the |
|
frame, then exited again, so they can perform setup and cleanup for |
|
each call. |
|
|
|
:param app: the WSGI application to run debugged. |
|
:param evalex: enable exception evaluation feature (interactive |
|
debugging). This requires a non-forking server. |
|
:param request_key: The key that points to the request object in this |
|
environment. This parameter is ignored in current |
|
versions. |
|
:param console_path: the URL for a general purpose console. |
|
:param console_init_func: the function that is executed before starting |
|
the general purpose console. The return value |
|
is used as initial namespace. |
|
:param show_hidden_frames: by default hidden traceback frames are skipped. |
|
You can show them by setting this parameter |
|
to `True`. |
|
:param pin_security: can be used to disable the pin based security system. |
|
:param pin_logging: enables the logging of the pin system. |
|
|
|
.. versionchanged:: 2.2 |
|
Added the ``werkzeug.debug.preserve_context`` environ key. |
|
""" |
|
|
|
_pin: str |
|
_pin_cookie: str |
|
|
|
def __init__( |
|
self, |
|
app: WSGIApplication, |
|
evalex: bool = False, |
|
request_key: str = "werkzeug.request", |
|
console_path: str = "/console", |
|
console_init_func: t.Callable[[], dict[str, t.Any]] | None = None, |
|
show_hidden_frames: bool = False, |
|
pin_security: bool = True, |
|
pin_logging: bool = True, |
|
) -> None: |
|
if not console_init_func: |
|
console_init_func = None |
|
self.app = app |
|
self.evalex = evalex |
|
self.frames: dict[int, DebugFrameSummary | _ConsoleFrame] = {} |
|
self.frame_contexts: dict[int, list[t.ContextManager[None]]] = {} |
|
self.request_key = request_key |
|
self.console_path = console_path |
|
self.console_init_func = console_init_func |
|
self.show_hidden_frames = show_hidden_frames |
|
self.secret = gen_salt(20) |
|
self._failed_pin_auth = 0 |
|
|
|
self.pin_logging = pin_logging |
|
if pin_security: |
|
# Print out the pin for the debugger on standard out. |
|
if os.environ.get("WERKZEUG_RUN_MAIN") == "true" and pin_logging: |
|
_log("warning", " * Debugger is active!") |
|
if self.pin is None: |
|
_log("warning", " * Debugger PIN disabled. DEBUGGER UNSECURED!") |
|
else: |
|
_log("info", " * Debugger PIN: %s", self.pin) |
|
else: |
|
self.pin = None |
|
|
|
@property |
|
def pin(self) -> str | None: |
|
if not hasattr(self, "_pin"): |
|
pin_cookie = get_pin_and_cookie_name(self.app) |
|
self._pin, self._pin_cookie = pin_cookie # type: ignore |
|
return self._pin |
|
|
|
@pin.setter |
|
def pin(self, value: str) -> None: |
|
self._pin = value |
|
|
|
@property |
|
def pin_cookie_name(self) -> str: |
|
"""The name of the pin cookie.""" |
|
if not hasattr(self, "_pin_cookie"): |
|
pin_cookie = get_pin_and_cookie_name(self.app) |
|
self._pin, self._pin_cookie = pin_cookie # type: ignore |
|
return self._pin_cookie |
|
|
|
def debug_application( |
|
self, environ: WSGIEnvironment, start_response: StartResponse |
|
) -> t.Iterator[bytes]: |
|
"""Run the application and conserve the traceback frames.""" |
|
contexts: list[t.ContextManager[t.Any]] = [] |
|
|
|
if self.evalex: |
|
environ["werkzeug.debug.preserve_context"] = contexts.append |
|
|
|
app_iter = None |
|
try: |
|
app_iter = self.app(environ, start_response) |
|
yield from app_iter |
|
if hasattr(app_iter, "close"): |
|
app_iter.close() |
|
except Exception as e: |
|
if hasattr(app_iter, "close"): |
|
app_iter.close() # type: ignore |
|
|
|
tb = DebugTraceback(e, skip=1, hide=not self.show_hidden_frames) |
|
|
|
for frame in tb.all_frames: |
|
self.frames[id(frame)] = frame |
|
self.frame_contexts[id(frame)] = contexts |
|
|
|
is_trusted = bool(self.check_pin_trust(environ)) |
|
html = tb.render_debugger_html( |
|
evalex=self.evalex, |
|
secret=self.secret, |
|
evalex_trusted=is_trusted, |
|
) |
|
response = Response(html, status=500, mimetype="text/html") |
|
|
|
try: |
|
yield from response(environ, start_response) |
|
except Exception: |
|
# if we end up here there has been output but an error |
|
# occurred. in that situation we can do nothing fancy any |
|
# more, better log something into the error log and fall |
|
# back gracefully. |
|
environ["wsgi.errors"].write( |
|
"Debugging middleware caught exception in streamed " |
|
"response at a point where response headers were already " |
|
"sent.\n" |
|
) |
|
|
|
environ["wsgi.errors"].write("".join(tb.render_traceback_text())) |
|
|
|
def execute_command( # type: ignore[return] |
|
self, |
|
request: Request, |
|
command: str, |
|
frame: DebugFrameSummary | _ConsoleFrame, |
|
) -> Response: |
|
"""Execute a command in a console.""" |
|
contexts = self.frame_contexts.get(id(frame), []) |
|
|
|
with ExitStack() as exit_stack: |
|
for cm in contexts: |
|
exit_stack.enter_context(cm) |
|
|
|
return Response(frame.eval(command), mimetype="text/html") |
|
|
|
def display_console(self, request: Request) -> Response: |
|
"""Display a standalone shell.""" |
|
if 0 not in self.frames: |
|
if self.console_init_func is None: |
|
ns = {} |
|
else: |
|
ns = dict(self.console_init_func()) |
|
ns.setdefault("app", self.app) |
|
self.frames[0] = _ConsoleFrame(ns) |
|
is_trusted = bool(self.check_pin_trust(request.environ)) |
|
return Response( |
|
render_console_html(secret=self.secret, evalex_trusted=is_trusted), |
|
mimetype="text/html", |
|
) |
|
|
|
def get_resource(self, request: Request, filename: str) -> Response: |
|
"""Return a static resource from the shared folder.""" |
|
path = join("shared", basename(filename)) |
|
|
|
try: |
|
data = pkgutil.get_data(__package__, path) |
|
except OSError: |
|
return NotFound() # type: ignore[return-value] |
|
else: |
|
if data is None: |
|
return NotFound() # type: ignore[return-value] |
|
|
|
etag = str(adler32(data) & 0xFFFFFFFF) |
|
return send_file( |
|
BytesIO(data), request.environ, download_name=filename, etag=etag |
|
) |
|
|
|
def check_pin_trust(self, environ: WSGIEnvironment) -> bool | None: |
|
"""Checks if the request passed the pin test. This returns `True` if the |
|
request is trusted on a pin/cookie basis and returns `False` if not. |
|
Additionally if the cookie's stored pin hash is wrong it will return |
|
`None` so that appropriate action can be taken. |
|
""" |
|
if self.pin is None: |
|
return True |
|
val = parse_cookie(environ).get(self.pin_cookie_name) |
|
if not val or "|" not in val: |
|
return False |
|
ts_str, pin_hash = val.split("|", 1) |
|
|
|
try: |
|
ts = int(ts_str) |
|
except ValueError: |
|
return False |
|
|
|
if pin_hash != hash_pin(self.pin): |
|
return None |
|
return (time.time() - PIN_TIME) < ts |
|
|
|
def _fail_pin_auth(self) -> None: |
|
time.sleep(5.0 if self._failed_pin_auth > 5 else 0.5) |
|
self._failed_pin_auth += 1 |
|
|
|
def pin_auth(self, request: Request) -> Response: |
|
"""Authenticates with the pin.""" |
|
exhausted = False |
|
auth = False |
|
trust = self.check_pin_trust(request.environ) |
|
pin = t.cast(str, self.pin) |
|
|
|
# If the trust return value is `None` it means that the cookie is |
|
# set but the stored pin hash value is bad. This means that the |
|
# pin was changed. In this case we count a bad auth and unset the |
|
# cookie. This way it becomes harder to guess the cookie name |
|
# instead of the pin as we still count up failures. |
|
bad_cookie = False |
|
if trust is None: |
|
self._fail_pin_auth() |
|
bad_cookie = True |
|
|
|
# If we're trusted, we're authenticated. |
|
elif trust: |
|
auth = True |
|
|
|
# If we failed too many times, then we're locked out. |
|
elif self._failed_pin_auth > 10: |
|
exhausted = True |
|
|
|
# Otherwise go through pin based authentication |
|
else: |
|
entered_pin = request.args["pin"] |
|
|
|
if entered_pin.strip().replace("-", "") == pin.replace("-", ""): |
|
self._failed_pin_auth = 0 |
|
auth = True |
|
else: |
|
self._fail_pin_auth() |
|
|
|
rv = Response( |
|
json.dumps({"auth": auth, "exhausted": exhausted}), |
|
mimetype="application/json", |
|
) |
|
if auth: |
|
rv.set_cookie( |
|
self.pin_cookie_name, |
|
f"{int(time.time())}|{hash_pin(pin)}", |
|
httponly=True, |
|
samesite="Strict", |
|
secure=request.is_secure, |
|
) |
|
elif bad_cookie: |
|
rv.delete_cookie(self.pin_cookie_name) |
|
return rv |
|
|
|
def log_pin_request(self) -> Response: |
|
"""Log the pin if needed.""" |
|
if self.pin_logging and self.pin is not None: |
|
_log( |
|
"info", " * To enable the debugger you need to enter the security pin:" |
|
) |
|
_log("info", " * Debugger pin code: %s", self.pin) |
|
return Response("") |
|
|
|
def __call__( |
|
self, environ: WSGIEnvironment, start_response: StartResponse |
|
) -> t.Iterable[bytes]: |
|
"""Dispatch the requests.""" |
|
# important: don't ever access a function here that reads the incoming |
|
# form data! Otherwise the application won't have access to that data |
|
# any more! |
|
request = Request(environ) |
|
response = self.debug_application |
|
if request.args.get("__debugger__") == "yes": |
|
cmd = request.args.get("cmd") |
|
arg = request.args.get("f") |
|
secret = request.args.get("s") |
|
frame = self.frames.get(request.args.get("frm", type=int)) # type: ignore |
|
if cmd == "resource" and arg: |
|
response = self.get_resource(request, arg) # type: ignore |
|
elif cmd == "pinauth" and secret == self.secret: |
|
response = self.pin_auth(request) # type: ignore |
|
elif cmd == "printpin" and secret == self.secret: |
|
response = self.log_pin_request() # type: ignore |
|
elif ( |
|
self.evalex |
|
and cmd is not None |
|
and frame is not None |
|
and self.secret == secret |
|
and self.check_pin_trust(environ) |
|
): |
|
response = self.execute_command(request, cmd, frame) # type: ignore |
|
elif ( |
|
self.evalex |
|
and self.console_path is not None |
|
and request.path == self.console_path |
|
): |
|
response = self.display_console(request) # type: ignore |
|
return response(environ, start_response)
|
|
|