Extend filter and filter tests (#45179)

This commit is contained in:
Franck Nijhof 2021-01-15 10:30:29 +01:00 committed by GitHub
parent a276f2d19e
commit dee0f887de
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 30 additions and 9 deletions

View File

@ -40,12 +40,19 @@ def setup_security_filter(app):
@middleware
async def security_filter_middleware(request, handler):
"""Process request and block commonly known exploit attempts."""
if FILTERS.search(request.raw_path):
if FILTERS.search(request.path):
_LOGGER.warning(
"Filtered a potential harmful request to: %s", request.raw_path
)
raise HTTPBadRequest
if FILTERS.search(request.query_string):
_LOGGER.warning(
"Filtered a request with a potential harmful query string: %s",
request.raw_path,
)
raise HTTPBadRequest
return await handler(request)
app.middlewares.append(security_filter_middleware)

View File

@ -35,17 +35,26 @@ async def test_ok_requests(request_path, request_params, aiohttp_client):
@pytest.mark.parametrize(
"request_path,request_params",
"request_path,request_params,fail_on_query_string",
[
("/proc/self/environ", {}),
("/", {"test": "/test/../../api"}),
("/", {"test": "test/../../api"}),
("/", {"sql": ";UNION SELECT (a, b"}),
("/", {"sql": "concat(..."}),
("/", {"xss": "<script >"}),
("/proc/self/environ", {}, False),
("/", {"test": "/test/../../api"}, True),
("/", {"test": "test/../../api"}, True),
("/", {"test": "/test/%2E%2E%2f%2E%2E%2fapi"}, True),
("/", {"test": "test/%2E%2E%2f%2E%2E%2fapi"}, True),
("/test/%2E%2E%2f%2E%2E%2fapi", {}, False),
("/", {"sql": ";UNION SELECT (a, b"}, True),
("/", {"sql": "UNION%20SELECT%20%28a%2C%20b"}, True),
("/UNION%20SELECT%20%28a%2C%20b", {}, False),
("/", {"sql": "concat(..."}, True),
("/", {"xss": "<script >"}, True),
("/<script >", {"xss": ""}, False),
("/%3Cscript%3E", {}, False),
],
)
async def test_bad_requests(request_path, request_params, aiohttp_client):
async def test_bad_requests(
request_path, request_params, fail_on_query_string, aiohttp_client, caplog
):
"""Test request paths that should be filtered."""
app = web.Application()
app.router.add_get("/{all:.*}", mock_handler)
@ -56,3 +65,8 @@ async def test_bad_requests(request_path, request_params, aiohttp_client):
resp = await mock_api_client.get(request_path, params=request_params)
assert resp.status == 400
message = "Filtered a potential harmful request to:"
if fail_on_query_string:
message = "Filtered a request with a potential harmful query string:"
assert message in caplog.text