Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,17 @@ async def test_xml_size_limits(self):
async for _ in block.run(XMLParserBlock.Input(input_xml=large_xml)):
pass

async def test_rejects_text_outside_root(self):
"""Ensure parser surfaces readable errors for invalid root text."""
block = XMLParserBlock()
invalid_xml = "<root><child>value</child></root> trailing"

with pytest.raises(
ValueError, match="text outside the root element"
):
async for _ in block.run(XMLParserBlock.Input(input_xml=invalid_xml)):
pass


class TestStoreMediaFileSecurity:
"""Test file storage security limits."""
Expand Down
38 changes: 36 additions & 2 deletions autogpt_platform/backend/backend/blocks/xml_parser.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from gravitasml.parser import Parser
from gravitasml.token import tokenize
from gravitasml.token import Token, tokenize

from backend.data.block import Block, BlockOutput, BlockSchemaInput, BlockSchemaOutput
from backend.data.model import SchemaField
Expand All @@ -25,6 +25,38 @@ def __init__(self):
],
)

@staticmethod
def _validate_tokens(tokens: list[Token]) -> None:
"""Ensure the XML has a single root element and no stray text."""
if not tokens:
raise ValueError("XML input is empty.")

depth = 0
root_seen = False

for token in tokens:
if token.type == "TAG_OPEN":
if depth == 0 and root_seen:
raise ValueError("XML must have a single root element.")
depth += 1
if depth == 1:
root_seen = True
elif token.type == "TAG_CLOSE":
depth -= 1
if depth < 0:
raise SyntaxError("Unexpected closing tag in XML input.")
elif token.type in {"TEXT", "ESCAPE"}:
if depth == 0 and token.value:
raise ValueError(
"XML contains text outside the root element; "
"wrap content in a single root tag."
)

if depth != 0:
raise SyntaxError("Unclosed tag detected in XML input.")
if not root_seen:
raise ValueError("XML must include a root element.")

async def run(self, input_data: Input, **kwargs) -> BlockOutput:
# Security fix: Add size limits to prevent XML bomb attacks
MAX_XML_SIZE = 10 * 1024 * 1024 # 10MB limit for XML input
Expand All @@ -35,7 +67,9 @@ async def run(self, input_data: Input, **kwargs) -> BlockOutput:
)

try:
tokens = tokenize(input_data.input_xml)
tokens = list(tokenize(input_data.input_xml))
self._validate_tokens(tokens)

parser = Parser(tokens)
parsed_result = parser.parse()
yield "parsed_xml", parsed_result
Expand Down
Loading