diff --git a/sqlmesh/core/context.py b/sqlmesh/core/context.py index dc51aad2a7..f1a7657704 100644 --- a/sqlmesh/core/context.py +++ b/sqlmesh/core/context.py @@ -1605,9 +1605,11 @@ def plan_builder( backfill_models = None models_override: t.Optional[UniqueKeyDict[str, Model]] = None + selected_fqns: t.Set[str] = set() + selected_deletion_fqns: t.Set[str] = set() if select_models: try: - models_override = model_selector.select_models( + models_override, selected_fqns = model_selector.select_models( select_models, environment, fallback_env_name=create_from or c.PROD, @@ -1622,12 +1624,17 @@ def plan_builder( # Only backfill selected models unless explicitly specified. backfill_models = model_selector.expand_model_selections(select_models) + if not backfill_models: + # The selection matched nothing locally. Check whether it matched models + # in the deployed environment that were deleted locally. + selected_deletion_fqns = selected_fqns - set(self._models) + expanded_restate_models = None if restate_models is not None: expanded_restate_models = model_selector.expand_model_selections(restate_models) if (restate_models is not None and not expanded_restate_models) or ( - backfill_models is not None and not backfill_models + backfill_models is not None and not backfill_models and not selected_deletion_fqns ): raise PlanError( "Selector did not return any models. Please check your model selection and try again." @@ -1636,7 +1643,7 @@ def plan_builder( if always_include_local_changes is None: # default behaviour - if restatements are detected; we operate entirely out of state and ignore local changes force_no_diff = restate_models is not None or ( - backfill_models is not None and not backfill_models + backfill_models is not None and not backfill_models and not selected_deletion_fqns ) else: force_no_diff = not always_include_local_changes diff --git a/sqlmesh/core/selector.py b/sqlmesh/core/selector.py index 9eaf4995c8..54b89d2680 100644 --- a/sqlmesh/core/selector.py +++ b/sqlmesh/core/selector.py @@ -62,7 +62,7 @@ def select_models( target_env_name: str, fallback_env_name: t.Optional[str] = None, ensure_finalized_snapshots: bool = False, - ) -> UniqueKeyDict[str, Model]: + ) -> t.Tuple[UniqueKeyDict[str, Model], t.Set[str]]: """Given a set of selections returns models from the current state with names matching the selection while sourcing the remaining models from the target environment. @@ -76,29 +76,11 @@ def select_models( the environment is not finalized. Returns: - A dictionary of models. + A tuple of (models dict, set of all matched FQNs including env models). """ - target_env = self._state_reader.get_environment(Environment.sanitize_name(target_env_name)) - if target_env and target_env.expired: - target_env = None - - if not target_env and fallback_env_name: - target_env = self._state_reader.get_environment( - Environment.sanitize_name(fallback_env_name) - ) - - env_models: t.Dict[str, Model] = {} - if target_env: - environment_snapshot_infos = ( - target_env.snapshots - if not ensure_finalized_snapshots - else target_env.finalized_or_current_snapshots - ) - env_models = { - s.name: s.model - for s in self._state_reader.get_snapshots(environment_snapshot_infos).values() - if s.is_model - } + env_models = self._load_env_models( + target_env_name, fallback_env_name, ensure_finalized_snapshots + ) all_selected_models = self.expand_model_selections( model_selections, models={**env_models, **self._models} @@ -166,7 +148,37 @@ def get_model(fqn: str) -> t.Optional[Model]: if needs_update: update_model_schemas(dag, models=models, cache_dir=self._cache_dir) - return models + return models, all_selected_models + + def _load_env_models( + self, + target_env_name: str, + fallback_env_name: t.Optional[str] = None, + ensure_finalized_snapshots: bool = False, + ) -> t.Dict[str, "Model"]: + """Loads models from the target environment, falling back to the fallback environment if needed.""" + target_env = self._state_reader.get_environment(Environment.sanitize_name(target_env_name)) + if target_env and target_env.expired: + target_env = None + + if not target_env and fallback_env_name: + target_env = self._state_reader.get_environment( + Environment.sanitize_name(fallback_env_name) + ) + + if not target_env: + return {} + + environment_snapshot_infos = ( + target_env.snapshots + if not ensure_finalized_snapshots + else target_env.finalized_or_current_snapshots + ) + return { + s.name: s.model + for s in self._state_reader.get_snapshots(environment_snapshot_infos).values() + if s.is_model + } def expand_model_selections( self, model_selections: t.Iterable[str], models: t.Optional[t.Dict[str, Node]] = None diff --git a/tests/core/test_context.py b/tests/core/test_context.py index c3d88e205e..50e5f656a7 100644 --- a/tests/core/test_context.py +++ b/tests/core/test_context.py @@ -2273,6 +2273,44 @@ def test_plan_selector_expression_no_match(sushi_context: Context) -> None: sushi_context.plan("prod", restate_models=["*missing*"]) +def test_plan_select_model_deleted_model(sushi_context: Context) -> None: + """Selecting a model that has been deleted locally but still exists in the deployed + environment should produce a valid plan with the deletion, not raise PlanError.""" + # Pick a leaf model that can be safely deleted without breaking other models' rendering. + model_name = "sushi.top_waiters" + snapshot = sushi_context.get_snapshot(model_name) + assert snapshot is not None + + # Delete the model file from disk. + model = sushi_context.get_model(model_name) + assert model._path.exists() + model._path.unlink() + + # Reload the context so it no longer knows about the deleted model. + sushi_context.load() + assert model_name not in [m for m in sushi_context.models] + + # Planning with select_models for the deleted model should succeed (not raise PlanError). + plan = sushi_context.plan("prod", select_models=[model_name], no_prompts=True) + assert plan is not None + + # The deleted model should appear in removed_snapshots. + removed_names = {s.name for s in plan.context_diff.removed_snapshots.values()} + assert snapshot.name in removed_names + + +def test_plan_select_model_deleted_model_still_rejects_nonexistent( + sushi_context: Context, +) -> None: + """A model that neither exists locally nor in the deployed environment should still + raise PlanError.""" + with pytest.raises( + PlanError, + match="Selector did not return any models. Please check your model selection and try again.", + ): + sushi_context.plan("prod", select_models=["sushi.completely_nonexistent"]) + + def test_plan_on_virtual_update_this_model_in_macro(tmp_path: pathlib.Path): models_dir = pathlib.Path("models") macros_dir = pathlib.Path("macros") diff --git a/tests/core/test_selector_native.py b/tests/core/test_selector_native.py index 5889efadda..0b741c1f51 100644 --- a/tests/core/test_selector_native.py +++ b/tests/core/test_selector_native.py @@ -309,7 +309,7 @@ def test_select_change_schema(mocker: MockerFixture, make_snapshot): selector = NativeSelector(state_reader_mock, local_models) - selected = selector.select_models(["db.parent"], env_name) + selected, _ = selector.select_models(["db.parent"], env_name) assert selected[local_child.fqn].render_query() != child.render_query() _assert_models_equal( @@ -320,7 +320,7 @@ def test_select_change_schema(mocker: MockerFixture, make_snapshot): }, ) - selected = selector.select_models(["db.child"], env_name) + selected, _ = selector.select_models(["db.child"], env_name) assert selected[local_child.fqn].data_hash == child.data_hash _assert_models_equal( @@ -343,12 +343,12 @@ def test_select_models_missing_env(mocker: MockerFixture, make_snapshot): selector = NativeSelector(state_reader_mock, local_models) - assert selector.select_models([model.name], "missing_env").keys() == {model.fqn} - assert not selector.select_models(["missing"], "missing_env") + assert selector.select_models([model.name], "missing_env")[0].keys() == {model.fqn} + assert not selector.select_models(["missing"], "missing_env")[0] assert selector.select_models( [model.name], "missing_env", fallback_env_name="another_missing_env" - ).keys() == {model.fqn} + )[0].keys() == {model.fqn} state_reader_mock.get_environment.assert_has_calls( [ @@ -789,7 +789,7 @@ def test_select_models_local_tags_take_precedence_over_remote( selector = NativeSelector(state_reader_mock, local_models) - selected = selector.select_models(["tag:a"], env_name) + selected, _ = selector.select_models(["tag:a"], env_name) # both should get selected because they both now have the 'a' tag locally, even though one exists in remote state without the 'a' tag _assert_models_equal( @@ -801,7 +801,137 @@ def test_select_models_local_tags_take_precedence_over_remote( ) -def _assert_models_equal(actual: t.Dict[str, Model], expected: t.Dict[str, Model]) -> None: +def test_select_models_returns_selected_fqns(mocker: MockerFixture, make_snapshot): + """select_models should return the set of all matched FQNs (including env-only models) + alongside the model dict.""" + local_model = SqlModel( + name="db.local_model", + query=d.parse_one("SELECT 1 AS a"), + ) + deleted_model = SqlModel( + name="db.deleted_model", + query=d.parse_one("SELECT 2 AS b"), + ) + + deleted_model_snapshot = make_snapshot(deleted_model) + deleted_model_snapshot.categorize_as(SnapshotChangeCategory.BREAKING) + + env_name = "test_env" + + state_reader_mock = mocker.Mock() + state_reader_mock.get_environment.return_value = Environment( + name=env_name, + snapshots=[deleted_model_snapshot.table_info], + start_at="2023-01-01", + end_at="2023-02-01", + plan_id="test_plan_id", + ) + state_reader_mock.get_snapshots.return_value = { + deleted_model_snapshot.snapshot_id: deleted_model_snapshot, + } + + local_models: UniqueKeyDict[str, Model] = UniqueKeyDict("models") + local_models[local_model.fqn] = local_model + + selector = NativeSelector(state_reader_mock, local_models) + + # Selecting a deleted model: selected_fqns includes it even though models dict won't. + _, selected_fqns = selector.select_models(["db.deleted_model"], env_name) + assert deleted_model.fqn in selected_fqns + + # Selecting a local model: selected_fqns includes it. + _, selected_fqns = selector.select_models(["db.local_model"], env_name) + assert local_model.fqn in selected_fqns + + # Mixed selection (active + deleted): both appear in selected_fqns. + _, selected_fqns = selector.select_models( + ["db.deleted_model", "db.local_model"], env_name + ) + assert selected_fqns == {deleted_model.fqn, local_model.fqn} + + # Wildcard should match both local and env models. + _, selected_fqns = selector.select_models(["*_model"], env_name) + assert selected_fqns == {deleted_model.fqn, local_model.fqn} + + # Non-existent model should not appear. + _, selected_fqns = selector.select_models(["db.nonexistent"], env_name) + assert selected_fqns == set() + + +def test_select_models_selected_fqns_fallback(mocker: MockerFixture, make_snapshot): + """select_models selected_fqns should include env models found via fallback environment.""" + deleted_model = SqlModel( + name="db.deleted_model", + query=d.parse_one("SELECT 1 AS a"), + ) + + deleted_model_snapshot = make_snapshot(deleted_model) + deleted_model_snapshot.categorize_as(SnapshotChangeCategory.BREAKING) + + fallback_env = Environment( + name="prod", + snapshots=[deleted_model_snapshot.table_info], + start_at="2023-01-01", + end_at="2023-02-01", + plan_id="test_plan_id", + ) + + state_reader_mock = mocker.Mock() + state_reader_mock.get_environment.side_effect = ( + lambda name: fallback_env if name == "prod" else None + ) + state_reader_mock.get_snapshots.return_value = { + deleted_model_snapshot.snapshot_id: deleted_model_snapshot, + } + + local_models: UniqueKeyDict[str, Model] = UniqueKeyDict("models") + selector = NativeSelector(state_reader_mock, local_models) + + _, selected_fqns = selector.select_models( + ["db.deleted_model"], "missing_env", fallback_env_name="prod" + ) + assert deleted_model.fqn in selected_fqns + + +def test_select_models_selected_fqns_expired(mocker: MockerFixture, make_snapshot): + """select_models should not match env models from expired environments.""" + deleted_model = SqlModel( + name="db.deleted_model", + query=d.parse_one("SELECT 1 AS a"), + ) + + deleted_model_snapshot = make_snapshot(deleted_model) + deleted_model_snapshot.categorize_as(SnapshotChangeCategory.BREAKING) + + expired_env = Environment( + name="test_env", + snapshots=[deleted_model_snapshot.table_info], + start_at="2023-01-01", + end_at="2023-02-01", + plan_id="test_plan_id", + expiration_ts=now_timestamp() - 1, + ) + + state_reader_mock = mocker.Mock() + state_reader_mock.get_environment.return_value = expired_env + state_reader_mock.get_snapshots.return_value = { + deleted_model_snapshot.snapshot_id: deleted_model_snapshot, + } + + local_models: UniqueKeyDict[str, Model] = UniqueKeyDict("models") + selector = NativeSelector(state_reader_mock, local_models) + + _, selected_fqns = selector.select_models(["db.deleted_model"], "test_env") + assert selected_fqns == set() + + +def _assert_models_equal( + actual: t.Union[t.Dict[str, Model], t.Tuple[t.Dict[str, Model], t.Set[str]]], + expected: t.Dict[str, Model], +) -> None: + # select_models returns a tuple; unwrap if needed. + if isinstance(actual, tuple): + actual = actual[0] assert set(actual) == set(expected) for name, model in actual.items(): # Use dict() to make Pydantic V2 happy.