Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
153 changes: 151 additions & 2 deletions src/google/adk/cli/fast_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,8 +306,132 @@ def _has_parent_reference(path: str) -> bool:
# functions, which is an RCE vector when exposed through the builder UI.
# Block any upload that contains an `args` key anywhere in the document.
_BLOCKED_YAML_KEYS = frozenset({"args"})
_BUILDER_BUILT_IN_AGENT_CLASSES = frozenset({
"LlmAgent",
"LoopAgent",
"ParallelAgent",
"SequentialAgent",
"google.adk.agents.LlmAgent",
"google.adk.agents.LoopAgent",
"google.adk.agents.ParallelAgent",
"google.adk.agents.SequentialAgent",
"google.adk.agents.llm_agent.LlmAgent",
"google.adk.agents.loop_agent.LoopAgent",
"google.adk.agents.parallel_agent.ParallelAgent",
"google.adk.agents.sequential_agent.SequentialAgent",
})
_BUILDER_CODE_CONFIG_KEYS = frozenset({
"after_agent_callbacks",
"after_model_callbacks",
"after_tool_callbacks",
"before_agent_callbacks",
"before_model_callbacks",
"before_tool_callbacks",
"input_schema",
"model_code",
"output_schema",
})
_BUILDER_RESERVED_TOP_LEVEL_MODULES = frozenset({"google"})

def _app_name_conflicts_with_importable_module(app_name: str) -> bool:
"""Return whether app_name would make project references ambiguous."""
stdlib_module_names = getattr(sys, "stdlib_module_names", frozenset())
return (
app_name in sys.builtin_module_names
or app_name in stdlib_module_names
or app_name in _BUILDER_RESERVED_TOP_LEVEL_MODULES
)

def _check_project_code_reference(
reference: str,
*,
app_name: str,
filename: str,
field_name: str,
allow_short_builtin_tool: bool = False,
) -> None:
"""Validate that builder YAML cannot import arbitrary external code."""
if "." not in reference:
if allow_short_builtin_tool:
return
raise ValueError(
f"Invalid code reference {reference!r} in {filename!r}. "
f"The '{field_name}' field must use a project-local dotted path."
)

if not reference.startswith(f"{app_name}."):
raise ValueError(
f"Blocked code reference {reference!r} in {filename!r}. "
f"The '{field_name}' field must reference code under "
f"'{app_name}.*'."
)

if _app_name_conflicts_with_importable_module(app_name):
raise ValueError(
f"Blocked code reference {reference!r} in {filename!r}. "
f"The app name {app_name!r} conflicts with an importable Python "
"module, so project-local code references would be ambiguous."
)

def _check_yaml_for_blocked_keys(content: bytes, filename: str) -> None:
def _check_agent_class_reference(
value: Any, *, app_name: str, filename: str
) -> None:
if not isinstance(value, str):
return
if value in _BUILDER_BUILT_IN_AGENT_CLASSES:
return
_check_project_code_reference(
value,
app_name=app_name,
filename=filename,
field_name="agent_class",
)

def _check_code_config_reference(
value: Any, *, app_name: str, filename: str, field_name: str
) -> None:
if isinstance(value, list):
for item in value:
_check_code_config_reference(
item,
app_name=app_name,
filename=filename,
field_name=field_name,
)
return
if not isinstance(value, dict):
return

name = value.get("name")
if isinstance(name, str):
_check_project_code_reference(
name,
app_name=app_name,
filename=filename,
field_name=field_name,
)

def _check_tool_references(
value: Any, *, app_name: str, filename: str
) -> None:
if not isinstance(value, list):
return
for item in value:
if not isinstance(item, dict):
continue
name = item.get("name")
if isinstance(name, str):
_check_project_code_reference(
name,
app_name=app_name,
filename=filename,
field_name="tools.name",
allow_short_builtin_tool=True,
)

def _check_yaml_for_blocked_keys(
content: bytes, filename: str, app_name: str
) -> None:
"""Raise if the YAML document contains any blocked keys."""
import yaml

Expand All @@ -325,6 +449,29 @@ def _walk(node: Any) -> None:
f"The '{key}' field is not allowed in builder uploads "
"because it can execute arbitrary code."
)
if key == "agent_class":
_check_agent_class_reference(
value, app_name=app_name, filename=filename
)
elif key == "code":
if isinstance(value, str):
_check_project_code_reference(
value,
app_name=app_name,
filename=filename,
field_name="code",
)
elif key == "tools":
_check_tool_references(
value, app_name=app_name, filename=filename
)
elif key in _BUILDER_CODE_CONFIG_KEYS:
_check_code_config_reference(
value,
app_name=app_name,
filename=filename,
field_name=key,
)
_walk(value)
elif isinstance(node, list):
for item in node:
Expand Down Expand Up @@ -490,7 +637,9 @@ async def builder_build(

# Phase 2: validate every file *before* writing anything to disk.
for rel_path, content in uploads:
_check_yaml_for_blocked_keys(content, f"{app_name}/{rel_path}")
_check_yaml_for_blocked_keys(
content, f"{app_name}/{rel_path}", app_name
)

# Phase 3: write validated files to disk.
if tmp:
Expand Down
138 changes: 138 additions & 0 deletions tests/unittests/cli/test_fast_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -2113,6 +2113,144 @@ def test_builder_save_rejects_nested_args_key(builder_test_client, tmp_path):
assert "args" in response.json()["detail"]


def test_builder_save_rejects_external_tool_reference(
builder_test_client, tmp_path
):
"""Uploading YAML with an external dotted tool reference is rejected."""
yaml_with_external_tool = b"""\
agent_class: LlmAgent
name: app
model: gemini-2.5-flash
instruction: test
tools:
- name: os.system
"""
response = builder_test_client.post(
"/builder/save?tmp=true",
files=[(
"files",
(
"app/root_agent.yaml",
yaml_with_external_tool,
"application/x-yaml",
),
)],
)
assert response.status_code == 400
assert "os.system" in response.json()["detail"]
assert not (tmp_path / "app" / "tmp" / "app" / "root_agent.yaml").exists()


def test_builder_save_allows_project_tool_reference(
builder_test_client, tmp_path
):
"""Project-local dotted tool references are allowed."""
yaml_with_project_tool = b"""\
agent_class: LlmAgent
name: app
model: gemini-2.5-flash
instruction: test
tools:
- name: app.tools.safe_tool.run
"""
response = builder_test_client.post(
"/builder/save?tmp=true",
files=[(
"files",
(
"app/root_agent.yaml",
yaml_with_project_tool,
"application/x-yaml",
),
)],
)
assert response.status_code == 200
assert response.json() is True
assert (tmp_path / "app" / "tmp" / "app" / "root_agent.yaml").is_file()


def test_builder_save_rejects_stdlib_named_project_reference(
builder_test_client, tmp_path
):
"""Stdlib app names cannot make stdlib imports look project-local."""
yaml_with_stdlib_tool = b"""\
agent_class: LlmAgent
name: os
model: gemini-2.5-flash
instruction: test
tools:
- name: os.system
"""
response = builder_test_client.post(
"/builder/save?tmp=true",
files=[(
"files",
(
"os/root_agent.yaml",
yaml_with_stdlib_tool,
"application/x-yaml",
),
)],
)
assert response.status_code == 400
assert "os.system" in response.json()["detail"]
assert not (tmp_path / "os" / "tmp" / "os" / "root_agent.yaml").exists()


def test_builder_save_rejects_external_callback_reference(
builder_test_client, tmp_path
):
"""Uploading YAML with an external callback reference is rejected."""
yaml_with_external_callback = b"""\
agent_class: LlmAgent
name: app
model: gemini-2.5-flash
instruction: test
before_agent_callbacks:
- name: os.system
"""
response = builder_test_client.post(
"/builder/save?tmp=true",
files=[(
"files",
(
"app/root_agent.yaml",
yaml_with_external_callback,
"application/x-yaml",
),
)],
)
assert response.status_code == 400
assert "os.system" in response.json()["detail"]
assert not (tmp_path / "app" / "tmp" / "app" / "root_agent.yaml").exists()


def test_builder_save_rejects_external_agent_ref_code(
builder_test_client, tmp_path
):
"""Uploading YAML with an external sub-agent code reference is rejected."""
yaml_with_external_sub_agent = b"""\
agent_class: SequentialAgent
name: app
sub_agents:
- code: os.system
"""
response = builder_test_client.post(
"/builder/save?tmp=true",
files=[(
"files",
(
"app/root_agent.yaml",
yaml_with_external_sub_agent,
"application/x-yaml",
),
)],
)
assert response.status_code == 400
assert "os.system" in response.json()["detail"]
assert not (tmp_path / "app" / "tmp" / "app" / "root_agent.yaml").exists()


def test_builder_get_rejects_non_yaml_file_paths(builder_test_client, tmp_path):
"""GET /builder/app/{app_name}?file_path=... rejects non-YAML extensions."""
app_root = tmp_path / "app"
Expand Down