diff --git a/.azdo/ci-pr.yaml b/.azdo/ci-pr.yaml index eeb64f83..80fc4b4d 100644 --- a/.azdo/ci-pr.yaml +++ b/.azdo/ci-pr.yaml @@ -42,7 +42,7 @@ extends: - script: | python -m pip install --upgrade pip - python -m pip install flake8 black build + python -m pip install flake8 black build diff-cover python -m pip install -e .[dev] displayName: 'Install dependencies' @@ -60,18 +60,30 @@ extends: - script: | python -m build displayName: 'Build package' - + - script: | python -m pip install dist/*.whl displayName: 'Install wheel' - + - script: | - pytest + PYTHONPATH=src pytest --junitxml=test-results.xml --cov --cov-report=xml displayName: 'Test with pytest' - + + - script: | + git fetch origin main + diff-cover coverage.xml --compare-branch=origin/main --fail-under=90 + displayName: 'Diff coverage (90% for new changes)' + - task: PublishTestResults@2 condition: succeededOrFailed() inputs: - testResultsFiles: '**/test-*.xml' + testResultsFiles: '**/test-results.xml' testRunTitle: 'Python 3.12' displayName: 'Publish test results' + + - task: PublishCodeCoverageResults@2 + condition: succeededOrFailed() + inputs: + summaryFileLocation: '**/coverage.xml' + pathToSources: '$(Build.SourcesDirectory)/src' + displayName: 'Publish code coverage' diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 26209bf7..886bc72b 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -18,6 +18,8 @@ jobs: steps: - uses: actions/checkout@v4 + with: + fetch-depth: 0 - name: Set up Python 3.12 uses: actions/setup-python@v5 @@ -27,7 +29,7 @@ jobs: - name: Install dependencies run: | python -m pip install --upgrade pip - python -m pip install flake8 black build + python -m pip install flake8 black build diff-cover python -m pip install -e .[dev] - name: Check format with black @@ -44,11 +46,30 @@ jobs: - name: Build package run: | python -m build - + - name: Install wheel run: | python -m pip install dist/*.whl - + - name: Test with pytest run: | - pytest + PYTHONPATH=src pytest --junitxml=test-results.xml --cov --cov-report=xml + + - name: Diff coverage (90% for new changes) + run: | + git fetch origin ${{ github.base_ref }} + diff-cover coverage.xml --compare-branch=origin/${{ github.base_ref }} --fail-under=90 + + - name: Upload test results + if: always() + uses: actions/upload-artifact@v4 + with: + name: test-results + path: test-results.xml + + - name: Upload coverage report + if: always() + uses: actions/upload-artifact@v4 + with: + name: coverage-report + path: coverage.xml diff --git a/pyproject.toml b/pyproject.toml index 3df59c9d..3e26347e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -93,6 +93,14 @@ select = [ [tool.pytest.ini_options] testpaths = ["tests/unit"] + +[tool.coverage.run] +source = ["src/PowerPlatform"] + +[tool.coverage.report] +fail_under = 90 +show_missing = true + markers = [ "e2e: end-to-end tests requiring a live Dataverse environment (DATAVERSE_URL)", ] diff --git a/tests/unit/core/test_auth.py b/tests/unit/core/test_auth.py new file mode 100644 index 00000000..b82bb9bd --- /dev/null +++ b/tests/unit/core/test_auth.py @@ -0,0 +1,35 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + +import unittest +from unittest.mock import MagicMock + +from azure.core.credentials import TokenCredential + +from PowerPlatform.Dataverse.core._auth import _AuthManager, _TokenPair + + +class TestAuthManager(unittest.TestCase): + """Tests for _AuthManager credential validation and token acquisition.""" + + def test_non_token_credential_raises(self): + """_AuthManager raises TypeError when credential does not implement TokenCredential.""" + with self.assertRaises(TypeError) as ctx: + _AuthManager("not-a-credential") + self.assertEqual( + str(ctx.exception), + "credential must implement azure.core.credentials.TokenCredential.", + ) + + def test_acquire_token_returns_token_pair(self): + """_acquire_token calls get_token and returns a _TokenPair with scope and token.""" + mock_credential = MagicMock(spec=TokenCredential) + mock_credential.get_token.return_value = MagicMock(token="my-access-token") + + manager = _AuthManager(mock_credential) + result = manager._acquire_token("https://org.crm.dynamics.com/.default") + + mock_credential.get_token.assert_called_once_with("https://org.crm.dynamics.com/.default") + self.assertIsInstance(result, _TokenPair) + self.assertEqual(result.resource, "https://org.crm.dynamics.com/.default") + self.assertEqual(result.access_token, "my-access-token") diff --git a/tests/unit/core/test_http_client.py b/tests/unit/core/test_http_client.py new file mode 100644 index 00000000..200b8be9 --- /dev/null +++ b/tests/unit/core/test_http_client.py @@ -0,0 +1,123 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + +import unittest +from unittest.mock import MagicMock, patch, call + +import requests + +from PowerPlatform.Dataverse.core._http import _HttpClient + + +class TestHttpClientTimeout(unittest.TestCase): + """Tests for automatic timeout selection in _HttpClient._request.""" + + def _make_response(self, status=200): + resp = MagicMock(spec=requests.Response) + resp.status_code = status + return resp + + def test_get_uses_10s_default_timeout(self): + """GET requests use 10s default when no timeout is specified.""" + client = _HttpClient(retries=1) + with patch("requests.request", return_value=self._make_response()) as mock_req: + client._request("get", "https://example.com/data") + _, kwargs = mock_req.call_args + self.assertEqual(kwargs["timeout"], 10) + + def test_post_uses_120s_default_timeout(self): + """POST requests use 120s default when no timeout is specified.""" + client = _HttpClient(retries=1) + with patch("requests.request", return_value=self._make_response()) as mock_req: + client._request("post", "https://example.com/data") + _, kwargs = mock_req.call_args + self.assertEqual(kwargs["timeout"], 120) + + def test_delete_uses_120s_default_timeout(self): + """DELETE requests use 120s default when no timeout is specified.""" + client = _HttpClient(retries=1) + with patch("requests.request", return_value=self._make_response()) as mock_req: + client._request("delete", "https://example.com/data") + _, kwargs = mock_req.call_args + self.assertEqual(kwargs["timeout"], 120) + + def test_default_timeout_overrides_per_method_default(self): + """Explicit default_timeout on the client overrides per-method defaults.""" + client = _HttpClient(retries=1, timeout=30.0) + with patch("requests.request", return_value=self._make_response()) as mock_req: + client._request("get", "https://example.com/data") + _, kwargs = mock_req.call_args + self.assertEqual(kwargs["timeout"], 30.0) + + def test_explicit_timeout_kwarg_takes_precedence(self): + """If timeout is already in kwargs it is passed through unchanged.""" + client = _HttpClient(retries=1, timeout=30.0) + with patch("requests.request", return_value=self._make_response()) as mock_req: + client._request("get", "https://example.com/data", timeout=5) + _, kwargs = mock_req.call_args + self.assertEqual(kwargs["timeout"], 5) + + +class TestHttpClientRequester(unittest.TestCase): + """Tests for session vs direct requests.request routing.""" + + def _make_response(self): + resp = MagicMock(spec=requests.Response) + resp.status_code = 200 + return resp + + def test_uses_direct_request_without_session(self): + """Without a session, _request uses requests.request directly.""" + client = _HttpClient(retries=1) + with patch("requests.request", return_value=self._make_response()) as mock_req: + client._request("get", "https://example.com/data") + mock_req.assert_called_once() + + def test_uses_session_request_when_session_provided(self): + """With a session, _request uses session.request instead of requests.request.""" + mock_session = MagicMock(spec=requests.Session) + mock_session.request.return_value = self._make_response() + client = _HttpClient(retries=1, session=mock_session) + with patch("requests.request") as mock_req: + client._request("get", "https://example.com/data") + mock_session.request.assert_called_once() + mock_req.assert_not_called() + + +class TestHttpClientRetry(unittest.TestCase): + """Tests for retry behavior on RequestException.""" + + def test_retries_on_request_exception_and_succeeds(self): + """Retries after a RequestException and returns response on second attempt.""" + resp = MagicMock(spec=requests.Response) + resp.status_code = 200 + client = _HttpClient(retries=2, backoff=0) + with patch("requests.request", side_effect=[requests.exceptions.ConnectionError(), resp]) as mock_req: + with patch("time.sleep"): + result = client._request("get", "https://example.com/data") + self.assertEqual(mock_req.call_count, 2) + self.assertIs(result, resp) + + def test_raises_after_all_retries_exhausted(self): + """Raises RequestException after all retry attempts fail.""" + client = _HttpClient(retries=3, backoff=0) + with patch("requests.request", side_effect=requests.exceptions.ConnectionError("timeout")): + with patch("time.sleep"): + with self.assertRaises(requests.exceptions.RequestException): + client._request("get", "https://example.com/data") + + def test_backoff_delay_between_retries(self): + """Sleeps with exponential backoff between retry attempts.""" + resp = MagicMock(spec=requests.Response) + resp.status_code = 200 + client = _HttpClient(retries=3, backoff=1.0) + side_effects = [ + requests.exceptions.ConnectionError(), + requests.exceptions.ConnectionError(), + resp, + ] + with patch("requests.request", side_effect=side_effects): + with patch("time.sleep") as mock_sleep: + client._request("get", "https://example.com/data") + # First retry: delay = 1.0 * 2^0 = 1.0, second retry: 1.0 * 2^1 = 2.0 + mock_sleep.assert_has_calls([call(1.0), call(2.0)]) diff --git a/tests/unit/core/test_http_errors.py b/tests/unit/core/test_http_errors.py index 729ebae3..39373e05 100644 --- a/tests/unit/core/test_http_errors.py +++ b/tests/unit/core/test_http_errors.py @@ -180,3 +180,40 @@ def test_correlation_id_shared_inside_call_scope(): h1, h2 = recorder.recorded_headers assert h1["x-ms-client-request-id"] != h2["x-ms-client-request-id"] assert h1["x-ms-correlation-id"] == h2["x-ms-correlation-id"] + + +def test_validation_error_instantiates(): + """ValidationError can be raised and carries the correct code.""" + from PowerPlatform.Dataverse.core.errors import ValidationError + + err = ValidationError("bad input", subcode="missing_field", details={"field": "name"}) + assert err.code == "validation_error" + assert err.subcode == "missing_field" + assert err.details["field"] == "name" + assert err.source == "client" + + +def test_sql_parse_error_instantiates(): + """SQLParseError can be raised and carries the correct code.""" + from PowerPlatform.Dataverse.core.errors import SQLParseError + + err = SQLParseError("unexpected token", subcode="syntax_error") + assert err.code == "sql_parse_error" + assert err.subcode == "syntax_error" + assert err.source == "client" + + +def test_http_error_optional_diagnostic_fields(): + """HttpError stores correlation_id, service_request_id, and traceparent in details.""" + from PowerPlatform.Dataverse.core.errors import HttpError + + err = HttpError( + "Server error", + status_code=500, + correlation_id="corr-123", + service_request_id="svc-456", + traceparent="00-abc-def-01", + ) + assert err.details["correlation_id"] == "corr-123" + assert err.details["service_request_id"] == "svc-456" + assert err.details["traceparent"] == "00-abc-def-01" diff --git a/tests/unit/data/test_batch_serialization.py b/tests/unit/data/test_batch_serialization.py index 561b48b0..b194a341 100644 --- a/tests/unit/data/test_batch_serialization.py +++ b/tests/unit/data/test_batch_serialization.py @@ -16,20 +16,27 @@ _RecordGet, _RecordUpdate, _RecordUpsert, + _TableCreate, + _TableDelete, _TableGet, _TableList, + _TableAddColumns, + _TableRemoveColumns, + _TableCreateOneToMany, + _TableCreateManyToMany, + _TableDeleteRelationship, + _TableGetRelationship, + _TableCreateLookupField, _QuerySql, _extract_boundary, _raise_top_level_batch_error, - _split_multipart, _parse_mime_part, _parse_http_response_part, _CRLF, ) -from PowerPlatform.Dataverse.core.errors import HttpError +from PowerPlatform.Dataverse.core.errors import HttpError, MetadataError, ValidationError from PowerPlatform.Dataverse.models.upsert import UpsertItem from PowerPlatform.Dataverse.data._raw_request import _RawRequest -from PowerPlatform.Dataverse.models.batch import BatchItemResponse def _make_od(): @@ -186,19 +193,19 @@ def test_single_request_body_ends_with_closing_boundary(self): self.assertIn("--batch_bnd--", body) def test_multiple_requests_all_in_body(self): - r1 = _RawRequest(method="GET", url="https://org/api/data/v9.2/accounts") - r2 = _RawRequest( + req1 = _RawRequest(method="GET", url="https://org/api/data/v9.2/accounts") + req2 = _RawRequest( method="DELETE", url="https://org/api/data/v9.2/accounts(guid)", headers={"If-Match": "*"}, ) client = self._client() - body = client._build_batch_body([r1, r2], "bnd") + body = client._build_batch_body([req1, req2], "bnd") self.assertEqual(body.count("--bnd\r\n"), 2) def test_changeset_produces_nested_multipart(self): - r1 = _RawRequest(method="POST", url="https://org/api/data/v9.2/accounts", body="{}") - cs = _ChangeSetBatchItem(requests=[r1]) + req1 = _RawRequest(method="POST", url="https://org/api/data/v9.2/accounts", body="{}") + cs = _ChangeSetBatchItem(requests=[req1]) client = self._client() body = client._build_batch_body([cs], "outer_bnd") self.assertIn("Content-Type: multipart/mixed", body) @@ -389,12 +396,36 @@ def test_exceeds_1000_raises(self): client = _BatchClient(od) items = [_RecordGet(table="account", record_id=f"guid-{i}") for i in range(1001)] - from PowerPlatform.Dataverse.core.errors import ValidationError - with self.assertRaises(ValidationError): client.execute(items) +class TestContinueOnError(unittest.TestCase): + """execute() sends Prefer: odata.continue-on-error when requested.""" + + def setUp(self): + self.od = _make_od() + self.od._build_get.return_value = _RawRequest(method="GET", url="https://x/accounts(g)") + mock_resp = MagicMock() + mock_resp.headers = {"Content-Type": 'multipart/mixed; boundary="batch_x"'} + mock_resp.status_code = 200 + mock_resp.text = "--batch_x\r\n\r\nHTTP/1.1 204 No Content\r\n\r\n\r\n--batch_x--" + self.od._request.return_value = mock_resp + self.client = _BatchClient(self.od) + + def test_continue_on_error_header_sent(self): + """Prefer: odata.continue-on-error header is included when continue_on_error=True.""" + self.client.execute([_RecordGet(table="account", record_id="guid-1")], continue_on_error=True) + _, kwargs = self.od._request.call_args + self.assertEqual(kwargs.get("headers", {}).get("Prefer"), "odata.continue-on-error") + + def test_no_continue_on_error_header_by_default(self): + """Prefer header is absent when continue_on_error is not set.""" + self.client.execute([_RecordGet(table="account", record_id="guid-1")]) + _, kwargs = self.od._request.call_args + self.assertNotIn("Prefer", kwargs.get("headers", {})) + + class TestChangeSetInternal(unittest.TestCase): def test_add_create_returns_dollar_n(self): cs = _ChangeSet() @@ -628,5 +659,279 @@ def test_parse_batch_response_raises_on_missing_boundary(self): client._parse_batch_response(resp) +class TestResolveItemDispatch(unittest.TestCase): + """_resolve_item() routes each intent type to the correct resolver.""" + + def _client_and_od(self): + od = _make_od() + client = _BatchClient(od) + return client, od + + def test_dispatch_record_update(self): + """_resolve_item routes _RecordUpdate to _resolve_record_update.""" + client, od = self._client_and_od() + od._build_update.return_value = MagicMock() + op = _RecordUpdate(table="account", ids="guid-1", changes={"name": "X"}) + result = client._resolve_item(op) + od._build_update.assert_called_once_with("account", "guid-1", {"name": "X"}, content_id=None) + self.assertEqual(len(result), 1) + + def test_dispatch_record_delete(self): + """_resolve_item routes _RecordDelete to _resolve_record_delete.""" + client, od = self._client_and_od() + od._build_delete.return_value = MagicMock() + op = _RecordDelete(table="account", ids="guid-1") + result = client._resolve_item(op) + od._build_delete.assert_called_once_with("account", "guid-1", content_id=None) + self.assertEqual(len(result), 1) + + def test_dispatch_table_create(self): + """_resolve_item routes _TableCreate to _build_create_entity.""" + client, od = self._client_and_od() + od._build_create_entity.return_value = MagicMock() + op = _TableCreate(table="new_Widget", columns={"new_name": str}) + result = client._resolve_item(op) + od._build_create_entity.assert_called_once_with("new_Widget", {"new_name": str}, None, None) + self.assertEqual(len(result), 1) + + def test_dispatch_table_delete(self): + """_resolve_item routes _TableDelete, resolving MetadataId before calling _build_delete_entity.""" + client, od = self._client_and_od() + od._get_entity_by_table_schema_name.return_value = {"MetadataId": "meta-1"} + od._build_delete_entity.return_value = MagicMock() + op = _TableDelete(table="new_Widget") + result = client._resolve_item(op) + od._build_delete_entity.assert_called_once_with("meta-1") + self.assertEqual(len(result), 1) + + def test_dispatch_table_get(self): + """_resolve_item routes _TableGet to _build_get_entity.""" + client, od = self._client_and_od() + od._build_get_entity.return_value = MagicMock() + op = _TableGet(table="account") + result = client._resolve_item(op) + od._build_get_entity.assert_called_once_with("account") + self.assertEqual(len(result), 1) + + def test_dispatch_table_list(self): + """_resolve_item routes _TableList to _build_list_entities, passing filter and select.""" + client, od = self._client_and_od() + od._build_list_entities.return_value = MagicMock() + op = _TableList() + result = client._resolve_item(op) + od._build_list_entities.assert_called_once_with(filter=None, select=None) + self.assertEqual(len(result), 1) + + def test_dispatch_table_add_columns(self): + """_resolve_item routes _TableAddColumns, emitting one request per column.""" + client, od = self._client_and_od() + od._get_entity_by_table_schema_name.return_value = {"MetadataId": "meta-1"} + od._build_create_column.return_value = MagicMock() + op = _TableAddColumns(table="account", columns={"new_col": str}) + result = client._resolve_item(op) + od._build_create_column.assert_called_once_with("meta-1", "new_col", str) + self.assertEqual(len(result), 1) + + def test_dispatch_table_remove_columns(self): + """_resolve_item routes _TableRemoveColumns, fetching attribute metadata before deleting.""" + client, od = self._client_and_od() + od._get_entity_by_table_schema_name.return_value = {"MetadataId": "meta-1"} + od._get_attribute_metadata.return_value = {"MetadataId": "attr-1"} + od._build_delete_column.return_value = MagicMock() + op = _TableRemoveColumns(table="account", columns="new_col") + result = client._resolve_item(op) + od._build_delete_column.assert_called_once_with("meta-1", "attr-1") + self.assertEqual(len(result), 1) + + def test_dispatch_table_create_one_to_many(self): + """_resolve_item routes _TableCreateOneToMany, merging lookup into relationship body.""" + client, od = self._client_and_od() + od._build_create_relationship.return_value = MagicMock() + lookup = MagicMock() + lookup.to_dict.return_value = {"SchemaName": "new_account_contact"} + relationship = MagicMock() + relationship.to_dict.return_value = {"ReferencedEntity": "account"} + op = _TableCreateOneToMany(lookup=lookup, relationship=relationship) + result = client._resolve_item(op) + od._build_create_relationship.assert_called_once_with( + {"ReferencedEntity": "account", "Lookup": {"SchemaName": "new_account_contact"}}, + solution=None, + ) + self.assertEqual(len(result), 1) + + def test_dispatch_table_create_many_to_many(self): + """_resolve_item routes _TableCreateManyToMany to _build_create_relationship.""" + client, od = self._client_and_od() + od._build_create_relationship.return_value = MagicMock() + relationship = MagicMock() + relationship.to_dict.return_value = {"SchemaName": "new_account_contact"} + op = _TableCreateManyToMany(relationship=relationship) + result = client._resolve_item(op) + od._build_create_relationship.assert_called_once_with({"SchemaName": "new_account_contact"}, solution=None) + self.assertEqual(len(result), 1) + + def test_dispatch_table_delete_relationship(self): + """_resolve_item routes _TableDeleteRelationship, passing relationship_id.""" + client, od = self._client_and_od() + od._build_delete_relationship.return_value = MagicMock() + op = _TableDeleteRelationship(relationship_id="rel-guid-1") + result = client._resolve_item(op) + od._build_delete_relationship.assert_called_once_with("rel-guid-1") + self.assertEqual(len(result), 1) + + def test_dispatch_table_get_relationship(self): + """_resolve_item routes _TableGetRelationship, passing schema_name.""" + client, od = self._client_and_od() + od._build_get_relationship.return_value = MagicMock() + op = _TableGetRelationship(schema_name="new_account_contact") + result = client._resolve_item(op) + od._build_get_relationship.assert_called_once_with("new_account_contact") + self.assertEqual(len(result), 1) + + def test_dispatch_table_create_lookup_field(self): + """_resolve_item routes _TableCreateLookupField, building lookup and relationship models.""" + client, od = self._client_and_od() + lookup = MagicMock() + lookup.to_dict.return_value = {"SchemaName": "new_accountid"} + relationship = MagicMock() + relationship.to_dict.return_value = {"ReferencedEntity": "account"} + od._build_lookup_field_models.return_value = (lookup, relationship) + od._build_create_relationship.return_value = MagicMock() + op = _TableCreateLookupField( + referencing_table="new_Widget", + lookup_field_name="new_accountid", + referenced_table="account", + ) + result = client._resolve_item(op) + od._build_lookup_field_models.assert_called_once_with( + referencing_table="new_Widget", + lookup_field_name="new_accountid", + referenced_table="account", + display_name=None, + description=None, + required=False, + cascade_delete="RemoveLink", + language_code=1033, + ) + od._build_create_relationship.assert_called_once_with( + {"ReferencedEntity": "account", "Lookup": {"SchemaName": "new_accountid"}}, + solution=None, + ) + self.assertEqual(len(result), 1) + + def test_dispatch_query_sql(self): + """_resolve_item routes _QuerySql to _build_sql, passing the SQL string.""" + client, od = self._client_and_od() + od._build_sql.return_value = MagicMock() + op = _QuerySql(sql="SELECT name FROM account") + result = client._resolve_item(op) + od._build_sql.assert_called_once_with("SELECT name FROM account") + self.assertEqual(len(result), 1) + + +class TestResolveOneChangeset(unittest.TestCase): + """_resolve_one() raises ValidationError when operation produces != 1 request.""" + + def test_multi_request_op_in_changeset_raises(self): + """use_bulk_delete=False with 2 ids produces 2 requests — not allowed in a changeset.""" + od = _make_od() + client = _BatchClient(od) + od._build_delete.return_value = MagicMock() + op = _RecordDelete(table="account", ids=["guid-1", "guid-2"], use_bulk_delete=False) + with self.assertRaises(ValidationError): + client._resolve_one(op) + + +class TestRequireEntityMetadata(unittest.TestCase): + """_require_entity_metadata raises MetadataError when table not found.""" + + def test_missing_entity_raises_metadata_error(self): + """MetadataError raised when _get_entity_by_table_schema_name returns None.""" + od = _make_od() + od._get_entity_by_table_schema_name.return_value = None + client = _BatchClient(od) + with self.assertRaises(MetadataError): + client._require_entity_metadata("new_Missing") + + def test_entity_without_metadata_id_raises(self): + """MetadataError raised when entity exists but has no MetadataId field.""" + od = _make_od() + od._get_entity_by_table_schema_name.return_value = {"LogicalName": "new_missing"} + client = _BatchClient(od) + with self.assertRaises(MetadataError): + client._require_entity_metadata("new_Missing") + + def test_valid_entity_returns_metadata_id(self): + """Returns MetadataId string when entity is found and has a MetadataId.""" + od = _make_od() + od._get_entity_by_table_schema_name.return_value = {"MetadataId": "meta-abc"} + client = _BatchClient(od) + result = client._require_entity_metadata("account") + self.assertEqual(result, "meta-abc") + + +class TestTableRemoveColumnsResolver(unittest.TestCase): + """_resolve_table_remove_columns covers string input and missing column error.""" + + def _client_and_od(self): + od = _make_od() + client = _BatchClient(od) + return client, od + + def test_single_string_column_resolved(self): + """A single string column name is accepted and resolved to one delete request.""" + client, od = self._client_and_od() + od._get_entity_by_table_schema_name.return_value = {"MetadataId": "meta-1"} + od._get_attribute_metadata.return_value = {"MetadataId": "attr-1"} + od._build_delete_column.return_value = MagicMock() + op = _TableRemoveColumns(table="account", columns="new_col") + result = client._resolve_table_remove_columns(op) + od._build_delete_column.assert_called_once_with("meta-1", "attr-1") + self.assertEqual(len(result), 1) + + def test_missing_column_raises_metadata_error(self): + """MetadataError raised when attribute metadata is not found for the column.""" + client, od = self._client_and_od() + od._get_entity_by_table_schema_name.return_value = {"MetadataId": "meta-1"} + od._get_attribute_metadata.return_value = None + op = _TableRemoveColumns(table="account", columns="new_missing") + with self.assertRaises(MetadataError): + client._resolve_table_remove_columns(op) + + def test_column_without_metadata_id_raises(self): + """MetadataError raised when attribute metadata exists but has no MetadataId.""" + client, od = self._client_and_od() + od._get_entity_by_table_schema_name.return_value = {"MetadataId": "meta-1"} + od._get_attribute_metadata.return_value = {"AttributeType": "String"} + op = _TableRemoveColumns(table="account", columns="new_col") + with self.assertRaises(MetadataError): + client._resolve_table_remove_columns(op) + + +class TestParseMimePartNoSeparator(unittest.TestCase): + """_parse_mime_part handles raw string with no blank-line separator.""" + + def test_no_double_newline_returns_empty_body(self): + """When raw part has no blank-line separator, headers are parsed and body is empty.""" + raw = "Content-Type: application/http" + headers, body = _parse_mime_part(raw) + self.assertEqual(headers.get("content-type"), "application/http") + self.assertEqual(body, "") + + +class TestParseHttpResponsePartMalformed(unittest.TestCase): + """_parse_http_response_part returns None for malformed status lines.""" + + def test_status_line_too_short_returns_none(self): + """Returns None when status line has fewer than 2 tokens (no status code).""" + result = _parse_http_response_part("HTTP/1.1\r\n\r\n", content_id=None) + self.assertIsNone(result) + + def test_non_integer_status_code_returns_none(self): + """Returns None when status code token is not a valid integer.""" + result = _parse_http_response_part("HTTP/1.1 XYZ OK\r\n\r\n", content_id=None) + self.assertIsNone(result) + + if __name__ == "__main__": unittest.main() diff --git a/tests/unit/data/test_odata_internal.py b/tests/unit/data/test_odata_internal.py index 0f8873f4..1aa50c0a 100644 --- a/tests/unit/data/test_odata_internal.py +++ b/tests/unit/data/test_odata_internal.py @@ -2,10 +2,12 @@ # Licensed under the MIT license. import json +import time import unittest +from enum import Enum from unittest.mock import MagicMock, patch -from PowerPlatform.Dataverse.core.errors import ValidationError +from PowerPlatform.Dataverse.core.errors import HttpError, MetadataError, ValidationError from PowerPlatform.Dataverse.data._odata import _ODataClient @@ -18,6 +20,33 @@ def _make_odata_client() -> _ODataClient: return client +def _mock_response(json_data=None, text="", status_code=200, headers=None): + """Create a mock HTTP response.""" + response = MagicMock() + response.status_code = status_code + response.text = text or (str(json_data) if json_data else "") + response.json.return_value = json_data or {} + response.headers = headers or {} + return response + + +def _entity_def_response(entity_set_name="accounts", primary_id="accountid", metadata_id="meta-001"): + """Simulate a successful EntityDefinitions response.""" + return _mock_response( + json_data={ + "value": [ + { + "LogicalName": "account", + "EntitySetName": entity_set_name, + "PrimaryIdAttribute": primary_id, + "MetadataId": metadata_id, + "SchemaName": "Account", + } + ] + } + ) + + class TestUpsertMultipleValidation(unittest.TestCase): """Unit tests for _ODataClient._upsert_multiple internal validation.""" @@ -58,6 +87,10 @@ def test_equal_lengths_does_not_raise(self): post_calls = [c for c in self.od._request.call_args_list if c.args[0] == "post"] self.assertEqual(len(post_calls), 1) self.assertIn("UpsertMultiple", post_calls[0].args[1]) + payload = post_calls[0].kwargs.get("json", {}) + self.assertEqual(len(payload["Targets"]), 2) + self.assertIn("@odata.type", payload["Targets"][0]) + self.assertIn("@odata.id", payload["Targets"][0]) def test_payload_excludes_alternate_key_fields(self): """Alternate key fields must NOT appear in the request body (only in @odata.id).""" @@ -301,7 +334,7 @@ def test_record_keys_lowercased(self): """Regular record field names are lowercased before sending.""" self.od._create("accounts", "account", {"Name": "Contoso", "AccountNumber": "ACC-001"}) call = self._post_call() - payload = json.loads(call.kwargs["data"]) if "data" in call.kwargs else call.kwargs["json"] + payload = json.loads(call.kwargs["data"]) self.assertIn("name", payload) self.assertIn("accountnumber", payload) self.assertNotIn("Name", payload) @@ -319,7 +352,7 @@ def test_odata_bind_keys_preserve_case(self): }, ) call = self._post_call() - payload = json.loads(call.kwargs["data"]) if "data" in call.kwargs else call.kwargs["json"] + payload = json.loads(call.kwargs["data"]) self.assertIn("new_name", payload) self.assertIn("new_CustomerId@odata.bind", payload) self.assertIn("new_AgentId@odata.bind", payload) @@ -385,7 +418,7 @@ def test_record_keys_lowercased(self): """Regular field names are lowercased in _update.""" self.od._update("new_ticket", "00000000-0000-0000-0000-000000000001", {"New_Status": 100000001}) call = self._patch_call() - payload = json.loads(call.kwargs["data"]) if "data" in call.kwargs else call.kwargs["json"] + payload = json.loads(call.kwargs["data"]) self.assertIn("new_status", payload) self.assertNotIn("New_Status", payload) @@ -400,7 +433,7 @@ def test_odata_bind_keys_preserve_case(self): }, ) call = self._patch_call() - payload = json.loads(call.kwargs["data"]) if "data" in call.kwargs else call.kwargs["json"] + payload = json.loads(call.kwargs["data"]) self.assertIn("new_status", payload) self.assertIn("new_CustomerId@odata.bind", payload) self.assertNotIn("new_customerid@odata.bind", payload) @@ -469,7 +502,7 @@ def test_record_keys_lowercased(self): """Record field names are lowercased before sending.""" self.od._upsert("accounts", "account", {"accountnumber": "ACC-001"}, {"Name": "Contoso"}) call = self._patch_call() - payload = json.loads(call.kwargs["data"]) if "data" in call.kwargs else call.kwargs["json"] + payload = call.kwargs["json"] self.assertIn("name", payload) self.assertNotIn("Name", payload) @@ -485,7 +518,7 @@ def test_odata_bind_keys_preserve_case(self): }, ) call = self._patch_call() - payload = json.loads(call.kwargs["data"]) if "data" in call.kwargs else call.kwargs["json"] + payload = call.kwargs["json"] # Regular field is lowercased self.assertIn("name", payload) # @odata.bind key preserves original casing @@ -519,14 +552,1029 @@ def test_returns_none(self): self.assertIsNone(result) +class TestStaticHelpers(unittest.TestCase): + """Unit tests for _ODataClient static helper methods.""" + + def test_normalize_cache_key_non_string_returns_empty(self): + """_normalize_cache_key with non-string returns empty string.""" + self.assertEqual(_ODataClient._normalize_cache_key(None), "") + self.assertEqual(_ODataClient._normalize_cache_key(42), "") + + def test_lowercase_list_none_returns_none(self): + """_lowercase_list(None) returns None.""" + self.assertIsNone(_ODataClient._lowercase_list(None)) + + def test_lowercase_list_empty_returns_empty(self): + """_lowercase_list([]) returns [].""" + self.assertFalse(_ODataClient._lowercase_list([])) + + def test_lowercase_keys_non_dict_returned_as_is(self): + """_lowercase_keys with non-dict input returns it unchanged.""" + self.assertEqual(_ODataClient._lowercase_keys("a string"), "a string") + self.assertIsNone(_ODataClient._lowercase_keys(None)) + + def test_lowercase_keys_preserves_odata_bind_casing(self): + """_lowercase_keys lowercases regular keys but preserves @odata.bind key casing.""" + result = _ODataClient._lowercase_keys( + { + "Name": "Contoso", + "new_CustomerId@odata.bind": "/contacts(id-1)", + "@odata.type": "Microsoft.Dynamics.CRM.account", + } + ) + self.assertIn("name", result) + self.assertNotIn("Name", result) + self.assertIn("new_CustomerId@odata.bind", result) + self.assertNotIn("new_customerid@odata.bind", result) + self.assertIn("@odata.type", result) + + def test_to_pascal_basic(self): + """_to_pascal converts snake_case to PascalCase.""" + client = _make_odata_client() + self.assertEqual(client._to_pascal("hello_world"), "HelloWorld") + self.assertEqual(client._to_pascal("my_table_name"), "MyTableName") + self.assertEqual(client._to_pascal("single"), "Single") + + +class TestRequestErrorParsing(unittest.TestCase): + """Unit tests for _ODataClient._request error response handling.""" + + def setUp(self): + mock_auth = MagicMock() + mock_auth._acquire_token.return_value = MagicMock(access_token="token") + self.client = _ODataClient(mock_auth, "https://example.crm.dynamics.com") + + def _make_raw_response(self, status_code, json_data=None, headers=None): + response = MagicMock() + response.status_code = status_code + response.text = "body" + response.json.return_value = json_data or {} + response.headers = headers or {} + return response + + def test_message_key_fallback_used_when_no_error_key(self): + """_request uses 'message' key when 'error' key is absent.""" + response = self._make_raw_response(400, json_data={"message": "Bad input received"}) + self.client._raw_request = MagicMock(return_value=response) + with self.assertRaises(HttpError) as ctx: + self.client._request("get", "http://example.com/test") + self.assertIn("Bad input received", str(ctx.exception)) + + def test_retry_after_non_int_not_stored_in_details(self): + """Retry-After header that is non-numeric results in retry_after absent from details.""" + response = self._make_raw_response(429, headers={"Retry-After": "not-a-number"}) + self.client._raw_request = MagicMock(return_value=response) + with self.assertRaises(HttpError) as ctx: + self.client._request("get", "http://example.com/test") + self.assertIsNone(ctx.exception.details.get("retry_after")) + + def test_retry_after_int_stored_in_details(self): + """Retry-After header that is numeric is stored in exception details.""" + response = self._make_raw_response(429, headers={"Retry-After": "30"}) + self.client._raw_request = MagicMock(return_value=response) + with self.assertRaises(HttpError) as ctx: + self.client._request("get", "http://example.com/test") + self.assertEqual(ctx.exception.details.get("retry_after"), 30) + + +class TestCreateMultiple(unittest.TestCase): + """Unit tests for _ODataClient._create_multiple.""" + + def setUp(self): + self.od = _make_odata_client() + + def test_non_dict_items_raise_type_error(self): + """_create_multiple raises TypeError for non-dict items.""" + with self.assertRaises(TypeError): + self.od._create_multiple("accounts", "account", ["not a dict"]) + + def test_odata_type_already_present_not_duplicated(self): + """If @odata.type already in record, it is preserved as-is.""" + self.od._request.return_value = _mock_response( + json_data={"Ids": ["id-1"]}, + text='{"Ids": ["id-1"]}', + ) + self.od._create_multiple( + "accounts", + "account", + [{"@odata.type": "Microsoft.Dynamics.CRM.account", "name": "Test"}], + ) + post_calls = [c for c in self.od._request.call_args_list if c.args[0] == "post"] + target = json.loads(post_calls[0].kwargs["data"])["Targets"][0] + self.assertEqual(target["@odata.type"], "Microsoft.Dynamics.CRM.account") + + def test_body_not_dict_returns_empty_list(self): + """When response body is not a dict, returns empty list.""" + response = _mock_response(text='["id1", "id2"]') + response.json.return_value = ["id1", "id2"] + self.od._request.return_value = response + result = self.od._create_multiple("accounts", "account", [{"name": "A"}]) + self.assertEqual(result, []) + + def test_value_key_path_extracts_ids(self): + """Falls back to 'value' key to extract IDs via heuristic.""" + long_guid = "a" * 32 + response = _mock_response( + json_data={"value": [{"accountid": long_guid, "name": "Test"}]}, + text="...", + ) + self.od._request.return_value = response + result = self.od._create_multiple("accounts", "account", [{"name": "Test"}]) + self.assertEqual(result, [long_guid]) + + def test_value_key_with_non_dict_items_returns_empty(self): + """'value' list with non-dict items returns empty list.""" + response = _mock_response(json_data={"value": ["not-a-dict"]}, text="...") + self.od._request.return_value = response + self.od._convert_labels_to_ints = MagicMock(side_effect=lambda _, rec: rec) + result = self.od._create_multiple("accounts", "account", [{"name": "Test"}]) + self.assertEqual(result, []) + + def test_no_ids_or_value_key_returns_empty_list(self): + """When body has neither 'Ids' nor 'value' keys, returns empty list.""" + response = _mock_response(json_data={"something_else": "data"}, text="...") + self.od._request.return_value = response + self.od._convert_labels_to_ints = MagicMock(side_effect=lambda _, rec: rec) + result = self.od._create_multiple("accounts", "account", [{"name": "Test"}]) + self.assertEqual(result, []) + + def test_value_parse_error_returns_empty_list(self): + """ValueError in body.json() returns empty list.""" + response = MagicMock() + response.text = "invalid json" + response.json.side_effect = ValueError("bad json") + self.od._request.return_value = response + self.od._convert_labels_to_ints = MagicMock(side_effect=lambda _, rec: rec) + result = self.od._create_multiple("accounts", "account", [{"name": "Test"}]) + self.assertEqual(result, []) + + def test_multiple_records_returns_all_ids(self): + """All IDs from the Ids response key are returned for multiple input records.""" + self.od._request.return_value = _mock_response( + json_data={"Ids": ["id-1", "id-2", "id-3"]}, + text='{"Ids": ["id-1", "id-2", "id-3"]}', + ) + result = self.od._create_multiple( + "accounts", + "account", + [{"name": "A"}, {"name": "B"}, {"name": "C"}], + ) + self.assertEqual(result, ["id-1", "id-2", "id-3"]) + + +class TestPrimaryIdAttr(unittest.TestCase): + """Unit tests for _ODataClient._primary_id_attr cache-miss behavior.""" + + def setUp(self): + self.od = _make_odata_client() + + def test_cache_miss_resolves_via_entity_set_lookup(self): + """Cache miss triggers entity set lookup and populates primary ID cache.""" + + def mock_entity_set(table_schema_name): + cache_key = table_schema_name.lower() + self.od._logical_to_entityset_cache[cache_key] = "accounts" + self.od._logical_primaryid_cache[cache_key] = "accountid" + return "accounts" + + self.od._entity_set_from_schema_name = MagicMock(side_effect=mock_entity_set) + result = self.od._primary_id_attr("account") + self.assertEqual(result, "accountid") + + def test_cache_miss_no_primary_id_raises_runtime_error(self): + """Cache miss with no PrimaryIdAttribute in metadata raises RuntimeError.""" + + def mock_entity_set_no_pid(table_schema_name): + cache_key = table_schema_name.lower() + self.od._logical_to_entityset_cache[cache_key] = "accounts" + return "accounts" + + self.od._entity_set_from_schema_name = MagicMock(side_effect=mock_entity_set_no_pid) + with self.assertRaises(RuntimeError) as ctx: + self.od._primary_id_attr("account") + self.assertIn("PrimaryIdAttribute not resolved", str(ctx.exception)) + + def test_cache_hit_returns_without_lookup(self): + """Cache hit returns primary ID immediately without issuing any request.""" + self.od._logical_primaryid_cache["account"] = "accountid" + result = self.od._primary_id_attr("account") + self.assertEqual(result, "accountid") + self.od._request.assert_not_called() + + +class TestUpdateByIds(unittest.TestCase): + """Unit tests for _ODataClient._update_by_ids.""" + + def setUp(self): + self.od = _make_odata_client() + + def test_non_list_ids_raises_type_error(self): + """_update_by_ids raises TypeError when ids is not a list.""" + with self.assertRaises(TypeError): + self.od._update_by_ids("account", "not-a-list", {"name": "X"}) + + def test_empty_ids_returns_none(self): + """_update_by_ids returns None immediately for empty ids list.""" + result = self.od._update_by_ids("account", [], {"name": "X"}) + self.assertIsNone(result) + self.od._request.assert_not_called() + + def test_non_list_non_dict_changes_raises_type_error(self): + """_update_by_ids raises TypeError for changes that is not dict or list.""" + self.od._primary_id_attr = MagicMock(return_value="accountid") + self.od._entity_set_from_schema_name = MagicMock(return_value="accounts") + with self.assertRaises(TypeError) as ctx: + self.od._update_by_ids("account", ["id-1"], "bad-changes") + self.assertIn("changes must be dict or list[dict]", str(ctx.exception)) + + def test_list_changes_length_mismatch_raises_value_error(self): + """_update_by_ids raises ValueError when changes list length != ids length.""" + self.od._primary_id_attr = MagicMock(return_value="accountid") + self.od._entity_set_from_schema_name = MagicMock(return_value="accounts") + with self.assertRaises(ValueError) as ctx: + self.od._update_by_ids("account", ["id-1", "id-2"], [{"name": "A"}]) + self.assertIn("Length of changes list must match", str(ctx.exception)) + + def test_non_dict_patch_in_list_raises_type_error(self): + """_update_by_ids raises TypeError when a patch in the list is not a dict.""" + self.od._primary_id_attr = MagicMock(return_value="accountid") + self.od._entity_set_from_schema_name = MagicMock(return_value="accounts") + self.od._update_multiple = MagicMock() + with self.assertRaises(TypeError) as ctx: + self.od._update_by_ids("account", ["id-1"], ["not-a-dict"]) + self.assertIn("Each patch must be a dict", str(ctx.exception)) + + def test_dict_changes_broadcasts_to_all_ids(self): + """_update_by_ids with dict changes builds one batch record per ID.""" + self.od._primary_id_attr = MagicMock(return_value="accountid") + self.od._entity_set_from_schema_name = MagicMock(return_value="accounts") + self.od._update_multiple = MagicMock() + self.od._update_by_ids("account", ["id-1", "id-2"], {"name": "X"}) + self.od._update_multiple.assert_called_once() + _, _, batch = self.od._update_multiple.call_args.args + self.assertEqual(len(batch), 2) + self.assertEqual(batch[0]["accountid"], "id-1") + self.assertEqual(batch[1]["accountid"], "id-2") + self.assertEqual(batch[0]["name"], "X") + self.assertEqual(batch[1]["name"], "X") + + def test_list_changes_merges_per_record(self): + """_update_by_ids with list changes merges each patch with its corresponding ID.""" + self.od._primary_id_attr = MagicMock(return_value="accountid") + self.od._entity_set_from_schema_name = MagicMock(return_value="accounts") + self.od._update_multiple = MagicMock() + self.od._update_by_ids("account", ["id-1", "id-2"], [{"name": "A"}, {"name": "B"}]) + _, _, batch = self.od._update_multiple.call_args.args + self.assertEqual(batch[0], {"accountid": "id-1", "name": "A"}) + self.assertEqual(batch[1], {"accountid": "id-2", "name": "B"}) + + +class TestUpdateMultiple(unittest.TestCase): + """Unit tests for _ODataClient._update_multiple.""" + + def setUp(self): + self.od = _make_odata_client() + + def test_non_list_records_raises_type_error(self): + """_update_multiple raises TypeError for non-list records.""" + with self.assertRaises(TypeError): + self.od._update_multiple("accounts", "account", "not-a-list") + + def test_empty_list_raises_type_error(self): + """_update_multiple raises TypeError for empty list.""" + with self.assertRaises(TypeError): + self.od._update_multiple("accounts", "account", []) + + def test_odata_type_already_present_not_overridden(self): + """If all records have @odata.type, it is preserved.""" + self.od._request.return_value = _mock_response() + records = [{"@odata.type": "Microsoft.Dynamics.CRM.CustomType", "accountid": "id-1", "name": "A"}] + self.od._update_multiple("accounts", "account", records) + payload = json.loads(self.od._request.call_args.kwargs["data"]) + self.assertEqual(payload["Targets"][0]["@odata.type"], "Microsoft.Dynamics.CRM.CustomType") + + def test_posts_to_update_multiple_endpoint(self): + """_update_multiple POSTs to {entity_set}/Microsoft.Dynamics.CRM.UpdateMultiple.""" + self.od._request.return_value = _mock_response() + self.od._update_multiple("accounts", "account", [{"accountid": "id-1", "name": "X"}]) + method, url = self.od._request.call_args.args + self.assertEqual(method, "post") + self.assertIn("accounts/Microsoft.Dynamics.CRM.UpdateMultiple", url) + + def test_payload_contains_targets_array(self): + """_update_multiple sends {"Targets": [...]} with @odata.type injected per record.""" + self.od._request.return_value = _mock_response() + self.od._update_multiple("accounts", "account", [{"accountid": "id-1", "name": "X"}]) + payload = json.loads(self.od._request.call_args.kwargs["data"]) + self.assertIn("Targets", payload) + self.assertEqual(len(payload["Targets"]), 1) + self.assertIn("@odata.type", payload["Targets"][0]) + + def test_multiple_records_all_in_targets(self): + """All records are included in the Targets payload for multiple inputs.""" + self.od._request.return_value = _mock_response() + records = [ + {"accountid": "id-1", "name": "A"}, + {"accountid": "id-2", "name": "B"}, + {"accountid": "id-3", "name": "C"}, + ] + self.od._update_multiple("accounts", "account", records) + payload = json.loads(self.od._request.call_args.kwargs["data"]) + self.assertEqual(len(payload["Targets"]), 3) + self.assertEqual(payload["Targets"][0]["accountid"], "id-1") + self.assertEqual(payload["Targets"][2]["accountid"], "id-3") + + +class TestDeleteMultiple(unittest.TestCase): + """Unit tests for _ODataClient._delete_multiple.""" + + def setUp(self): + self.od = _make_odata_client() + self.od._primary_id_attr = MagicMock(return_value="accountid") + + def test_empty_ids_returns_none(self): + """_delete_multiple returns None for empty IDs.""" + result = self.od._delete_multiple("account", []) + self.assertIsNone(result) + self.od._request.assert_not_called() + + def test_filters_out_falsy_ids(self): + """_delete_multiple filters None/empty strings from ids.""" + result = self.od._delete_multiple("account", [None, "", None]) + self.assertIsNone(result) + self.od._request.assert_not_called() + + def test_posts_bulk_delete_payload(self): + """_delete_multiple issues POST to BulkDelete with correct payload.""" + self.od._request.return_value = _mock_response(json_data={"JobId": "job-001"}, text='{"JobId": "job-001"}') + result = self.od._delete_multiple("account", ["id-1", "id-2"]) + self.assertEqual(result, "job-001") + call_args = self.od._request.call_args + self.assertEqual(call_args.args[0], "post") + self.assertIn("BulkDelete", call_args.args[1]) + payload = json.loads(call_args.kwargs["data"]) + self.assertIn("QuerySet", payload) + self.assertIn("JobName", payload) + query = payload["QuerySet"][0] + self.assertEqual(query["EntityName"], "account") + conditions = query["Criteria"]["Conditions"] + self.assertEqual(len(conditions), 1) + self.assertEqual(conditions[0]["AttributeName"], "accountid") + values = conditions[0]["Values"] + self.assertEqual(len(values), 2) + self.assertEqual({v["Value"] for v in values}, {"id-1", "id-2"}) + + def test_returns_none_when_no_job_id_in_body(self): + """_delete_multiple returns None when response body has no JobId.""" + self.od._request.return_value = _mock_response(json_data={}, text="{}") + result = self.od._delete_multiple("account", ["id-1"]) + self.assertIsNone(result) + + def test_handles_value_error_in_json_parsing(self): + """_delete_multiple handles ValueError in response JSON parsing gracefully.""" + response = MagicMock() + response.text = "invalid" + response.json.side_effect = ValueError + self.od._request.return_value = response + result = self.od._delete_multiple("account", ["id-1"]) + self.assertIsNone(result) + + +class TestFormatKey(unittest.TestCase): + """Unit tests for _ODataClient._format_key.""" + + def setUp(self): + self.od = _make_odata_client() + + def test_guid_wrapped_in_parens(self): + """_format_key wraps 36-char GUID in parentheses.""" + guid = "11111111-2222-3333-4444-555555555555" + self.assertEqual(self.od._format_key(guid), f"({guid})") + + def test_already_wrapped_key_returned_as_is(self): + """_format_key returns already-parenthesized key unchanged.""" + key = "(some-key)" + self.assertEqual(self.od._format_key(key), key) + + def test_alternate_key_with_quotes_is_escaped(self): + """_format_key wraps alternate key with single-quoted value in parentheses.""" + result = self.od._format_key("mykey='it''s value'") + self.assertEqual(result, "(mykey='it''s value')") + + +class TestGetMultiple(unittest.TestCase): + """Unit tests for _ODataClient._get_multiple query parameter handling.""" + + def setUp(self): + self.od = _make_odata_client() + self.od._entity_set_from_schema_name = MagicMock(return_value="accounts") + + def _single_page_response(self, items=None): + data = {"value": items or [{"accountid": "id-1"}]} + response = _mock_response(json_data=data, text=str(data)) + self.od._request.return_value = response + + def test_filter_param_passed(self): + """_get_multiple passes $filter to params.""" + self._single_page_response() + list(self.od._get_multiple("account", filter="statecode eq 0")) + params = self.od._request.call_args.kwargs["params"] + self.assertEqual(params["$filter"], "statecode eq 0") + + def test_orderby_param_passed(self): + """_get_multiple passes $orderby to params.""" + self._single_page_response() + list(self.od._get_multiple("account", orderby=["name asc", "createdon desc"])) + params = self.od._request.call_args.kwargs["params"] + self.assertEqual(params["$orderby"], "name asc,createdon desc") + + def test_expand_param_passed(self): + """_get_multiple passes $expand to params.""" + self._single_page_response() + list(self.od._get_multiple("account", expand=["contact_customer_accounts"])) + params = self.od._request.call_args.kwargs["params"] + self.assertEqual(params["$expand"], "contact_customer_accounts") + + def test_top_param_passed(self): + """_get_multiple passes $top to params.""" + self._single_page_response() + list(self.od._get_multiple("account", top=5)) + params = self.od._request.call_args.kwargs["params"] + self.assertEqual(params["$top"], 5) + + def test_count_param_passed(self): + """_get_multiple passes $count=true when count=True.""" + self._single_page_response() + list(self.od._get_multiple("account", count=True)) + params = self.od._request.call_args.kwargs["params"] + self.assertEqual(params["$count"], "true") + + def test_include_annotations_sets_prefer_header(self): + """_get_multiple sets Prefer header with include-annotations.""" + self._single_page_response() + list(self.od._get_multiple("account", include_annotations="*")) + headers = self.od._request.call_args.kwargs.get("headers") or {} + self.assertIn("Prefer", headers) + self.assertIn("include-annotations", headers["Prefer"]) + + def test_page_size_sets_prefer_header(self): + """_get_multiple sets Prefer odata.maxpagesize when page_size > 0.""" + self._single_page_response() + list(self.od._get_multiple("account", page_size=50)) + headers = self.od._request.call_args.kwargs.get("headers") or {} + self.assertIn("odata.maxpagesize=50", headers.get("Prefer", "")) + + def test_value_error_in_json_returns_empty(self): + """ValueError in page JSON parsing yields nothing.""" + response = MagicMock() + response.text = "bad json" + response.json.side_effect = ValueError + self.od._request.return_value = response + pages = list(self.od._get_multiple("account")) + self.assertEqual(pages, []) + + def test_yields_value_items_as_page(self): + """_get_multiple yields the 'value' list as a page of dicts.""" + items = [{"accountid": "id-1", "name": "A"}, {"accountid": "id-2", "name": "B"}] + self._single_page_response(items) + pages = list(self.od._get_multiple("account")) + self.assertEqual(len(pages), 1) + self.assertEqual(pages[0], items) + + def test_follows_nextlink_pagination(self): + """_get_multiple follows @odata.nextLink across multiple pages.""" + page1 = _mock_response( + json_data={ + "value": [{"accountid": "id-1"}], + "@odata.nextLink": "https://example.crm.dynamics.com/next-page", + }, + text="...", + ) + page2 = _mock_response( + json_data={"value": [{"accountid": "id-2"}]}, + text="...", + ) + self.od._request.side_effect = [page1, page2] + pages = list(self.od._get_multiple("account")) + self.assertEqual(len(pages), 2) + self.assertEqual(pages[0][0]["accountid"], "id-1") + self.assertEqual(pages[1][0]["accountid"], "id-2") + + def test_stops_when_no_nextlink(self): + """_get_multiple stops after a page without nextLink.""" + self._single_page_response([{"accountid": "id-1"}]) + pages = list(self.od._get_multiple("account")) + self.assertEqual(len(pages), 1) + self.od._request.assert_called_once() + + def test_filters_non_dict_items_from_page(self): + """_get_multiple filters out non-dict items from each page.""" + data = {"value": [{"accountid": "id-1"}, "not-a-dict", 42]} + response = _mock_response(json_data=data, text=str(data)) + self.od._request.return_value = response + pages = list(self.od._get_multiple("account")) + self.assertEqual(len(pages), 1) + self.assertEqual(len(pages[0]), 1) + self.assertEqual(pages[0][0]["accountid"], "id-1") + + def test_empty_value_list_yields_nothing(self): + """_get_multiple yields nothing when value list is empty.""" + data = {"value": []} + response = _mock_response(json_data=data, text=str(data)) + self.od._request.return_value = response + pages = list(self.od._get_multiple("account")) + self.assertEqual(pages, []) + + +class TestQuerySql(unittest.TestCase): + """Unit tests for _ODataClient._query_sql.""" + + def setUp(self): + self.od = _make_odata_client() + self.od._entity_set_from_schema_name = MagicMock(return_value="accounts") + + def test_non_string_sql_raises_validation_error(self): + """_query_sql raises ValidationError for non-string sql.""" + with self.assertRaises(ValidationError): + self.od._query_sql(123) + + def test_empty_sql_raises_validation_error(self): + """_query_sql raises ValidationError for empty sql.""" + with self.assertRaises(ValidationError): + self.od._query_sql(" ") + + def test_returns_value_list(self): + """_query_sql returns rows from response 'value' key.""" + self.od._request.return_value = _mock_response( + json_data={"value": [{"accountid": "id-1", "name": "Contoso"}]}, + text="...", + ) + result = self.od._query_sql("SELECT name FROM account") + self.assertEqual(len(result), 1) + self.assertEqual(result[0]["name"], "Contoso") + + def test_filters_non_dict_rows(self): + """_query_sql filters out non-dict rows from 'value' list.""" + self.od._request.return_value = _mock_response( + json_data={"value": [{"name": "A"}, "not-a-dict", 42]}, text="..." + ) + result = self.od._query_sql("SELECT name FROM account") + self.assertEqual(len(result), 1) + + def test_body_as_list_fallback(self): + """_query_sql handles body being a list directly.""" + response = _mock_response(text="...") + response.json.return_value = [{"name": "A"}, {"name": "B"}] + self.od._request.return_value = response + result = self.od._query_sql("SELECT name FROM account") + self.assertEqual(len(result), 2) + + def test_value_error_in_json_returns_empty(self): + """_query_sql returns empty list when JSON parsing fails.""" + response = MagicMock() + response.text = "bad json" + response.json.side_effect = ValueError + self.od._request.return_value = response + result = self.od._query_sql("SELECT name FROM account") + self.assertEqual(result, []) + + def test_unexpected_body_returns_empty(self): + """_query_sql returns empty list for non-dict, non-list body.""" + response = _mock_response(text="...") + response.json.return_value = "unexpected" + self.od._request.return_value = response + result = self.od._query_sql("SELECT name FROM account") + self.assertEqual(result, []) + + def test_extract_non_string_raises_value_error(self): + """_extract_logical_table with non-string raises ValueError.""" + with self.assertRaises(ValueError): + _ODataClient._extract_logical_table(123) + + def test_extract_no_from_clause_raises_value_error(self): + """_extract_logical_table without FROM raises ValueError.""" + with self.assertRaises(ValueError) as ctx: + _ODataClient._extract_logical_table("SELECT name, surname") + self.assertIn("FROM", str(ctx.exception)) + + +class TestEntitySetFromSchemaName(unittest.TestCase): + """Unit tests for _ODataClient._entity_set_from_schema_name.""" + + def setUp(self): + self.od = _make_odata_client() + + def test_empty_table_schema_name_raises_value_error(self): + """_entity_set_from_schema_name raises ValueError for empty input.""" + with self.assertRaises(ValueError) as ctx: + self.od._entity_set_from_schema_name("") + self.assertIn("table schema name required", str(ctx.exception)) + + def test_json_value_error_in_response_treated_as_empty(self): + """_entity_set_from_schema_name handles ValueError in JSON parsing.""" + response = MagicMock() + response.text = "invalid json" + response.json.side_effect = ValueError + self.od._request.return_value = response + with self.assertRaises(MetadataError): + self.od._entity_set_from_schema_name("account") + + def test_plural_hint_when_name_ends_with_s(self): + """Error message includes plural hint when name ends with 's' (not 'ss').""" + self.od._request.return_value = _mock_response(json_data={"value": []}, text="{}") + with self.assertRaises(MetadataError) as ctx: + self.od._entity_set_from_schema_name("accounts") + self.assertIn("plural", str(ctx.exception).lower()) + + def test_no_plural_hint_when_name_ends_with_ss(self): + """No plural hint when name ends with 'ss'.""" + self.od._request.return_value = _mock_response(json_data={"value": []}, text="{}") + with self.assertRaises(MetadataError) as ctx: + self.od._entity_set_from_schema_name("address") + self.assertNotIn("plural", str(ctx.exception).lower()) + + def test_missing_entity_set_name_raises_metadata_error(self): + """MetadataError raised when EntitySetName is absent from metadata.""" + self.od._request.return_value = _mock_response( + json_data={"value": [{"LogicalName": "account", "EntitySetName": None, "PrimaryIdAttribute": "accountid"}]}, + text="...", + ) + with self.assertRaises(MetadataError) as ctx: + self.od._entity_set_from_schema_name("account") + self.assertIn("EntitySetName", str(ctx.exception)) + + def test_cache_hit_returns_without_request(self): + """Cache hit returns entity set name immediately without issuing any request.""" + self.od._logical_to_entityset_cache["account"] = "accounts" + result = self.od._entity_set_from_schema_name("account") + self.assertEqual(result, "accounts") + self.od._request.assert_not_called() + + def test_success_populates_entityset_cache(self): + """Successful API response populates _logical_to_entityset_cache.""" + self.od._request.return_value = _entity_def_response(entity_set_name="accounts", primary_id="accountid") + result = self.od._entity_set_from_schema_name("account") + self.assertEqual(result, "accounts") + self.assertEqual(self.od._logical_to_entityset_cache["account"], "accounts") + + def test_success_populates_primaryid_cache(self): + """Successful API response populates _logical_primaryid_cache.""" + self.od._request.return_value = _entity_def_response(entity_set_name="accounts", primary_id="accountid") + self.od._entity_set_from_schema_name("account") + self.assertEqual(self.od._logical_primaryid_cache["account"], "accountid") + + def test_success_without_primary_id_does_not_populate_primaryid_cache(self): + """When PrimaryIdAttribute is missing, _logical_primaryid_cache is not populated.""" + self.od._request.return_value = _mock_response( + json_data={"value": [{"LogicalName": "account", "EntitySetName": "accounts"}]}, + text="...", + ) + self.od._entity_set_from_schema_name("account") + self.assertNotIn("account", self.od._logical_primaryid_cache) + + +class TestGetEntityByTableSchemaName(unittest.TestCase): + """Unit tests for _ODataClient._get_entity_by_table_schema_name.""" + + def setUp(self): + self.od = _make_odata_client() + + def test_returns_first_match(self): + """_get_entity_by_table_schema_name returns first entity when found.""" + self.od._request.return_value = _entity_def_response() + result = self.od._get_entity_by_table_schema_name("account") + self.assertIsNotNone(result) + self.assertEqual(result["EntitySetName"], "accounts") + + def test_returns_none_when_not_found(self): + """_get_entity_by_table_schema_name returns None when no match.""" + self.od._request.return_value = _mock_response(json_data={"value": []}, text="{}") + result = self.od._get_entity_by_table_schema_name("nonexistent") + self.assertIsNone(result) + + +class TestCreateEntity(unittest.TestCase): + """Unit tests for _ODataClient._create_entity.""" + + def setUp(self): + self.od = _make_odata_client() + + def _setup_entity_creation(self, get_response=None): + """Mock _request: POST returns 201, GET returns entity definition.""" + + def side_effect(method, url, **kwargs): + if method == "post": + return _mock_response(status_code=201) + else: + return get_response or _entity_def_response() + + self.od._request.side_effect = side_effect + + def test_successful_entity_creation(self): + """_create_entity returns metadata on success.""" + self._setup_entity_creation() + result = self.od._create_entity("new_TestTable", "Test Table", [], solution_unique_name=None) + self.assertEqual(result["EntitySetName"], "accounts") + + def test_entity_set_name_missing_raises_runtime_error(self): + """_create_entity raises RuntimeError when EntitySetName not available after create.""" + get_response = _mock_response(json_data={"value": []}, text="{}") + + def side_effect(method, url, **kwargs): + return _mock_response(status_code=201) if method == "post" else get_response + + self.od._request.side_effect = side_effect + with self.assertRaises(RuntimeError) as ctx: + self.od._create_entity("new_TestTable", "Test Table", []) + self.assertIn("EntitySetName not available", str(ctx.exception)) + + def test_metadata_id_missing_raises_runtime_error(self): + """_create_entity raises RuntimeError when MetadataId missing after create.""" + get_response = _mock_response( + json_data={"value": [{"EntitySetName": "new_testtables", "SchemaName": "new_TestTable"}]}, + text="...", + ) + + def side_effect(method, url, **kwargs): + return _mock_response(status_code=201) if method == "post" else get_response + + self.od._request.side_effect = side_effect + with self.assertRaises(RuntimeError) as ctx: + self.od._create_entity("new_TestTable", "Test Table", []) + self.assertIn("MetadataId missing", str(ctx.exception)) + + def test_solution_unique_name_passed_as_param(self): + """_create_entity passes SolutionUniqueName as query param when provided.""" + self._setup_entity_creation() + self.od._create_entity("new_TestTable", "Test Table", [], solution_unique_name="MySolution") + post_call = next(c for c in self.od._request.call_args_list if c.args[0] == "post") + self.assertEqual(post_call.kwargs.get("params"), {"SolutionUniqueName": "MySolution"}) + + +class TestGetAttributeMetadata(unittest.TestCase): + """Unit tests for _ODataClient._get_attribute_metadata.""" + + def setUp(self): + self.od = _make_odata_client() + + def test_returns_attribute_when_found(self): + """_get_attribute_metadata returns attribute dict when found.""" + self.od._request.return_value = _mock_response( + json_data={"value": [{"MetadataId": "attr-001", "LogicalName": "name", "SchemaName": "Name"}]}, + text="...", + ) + result = self.od._get_attribute_metadata("meta-001", "name") + self.assertIsNotNone(result) + self.assertEqual(result["MetadataId"], "attr-001") + + def test_returns_none_when_not_found(self): + """_get_attribute_metadata returns None when attribute not in response.""" + self.od._request.return_value = _mock_response(json_data={"value": []}, text="{}") + result = self.od._get_attribute_metadata("meta-001", "name") + self.assertIsNone(result) + + def test_extra_select_fields_included(self): + """_get_attribute_metadata appends extra_select fields to $select.""" + self.od._request.return_value = _mock_response(json_data={"value": []}, text="{}") + self.od._get_attribute_metadata("meta-001", "name", extra_select="AttributeType,MaxLength") + params = self.od._request.call_args.kwargs["params"] + self.assertIn("AttributeType", params["$select"]) + self.assertIn("MaxLength", params["$select"]) + + def test_extra_select_skips_empty_pieces(self): + """_get_attribute_metadata skips empty pieces in extra_select.""" + self.od._request.return_value = _mock_response(json_data={"value": []}, text="{}") + self.od._get_attribute_metadata("meta-001", "name", extra_select=",AttributeType,") + params = self.od._request.call_args.kwargs["params"] + self.assertIn("AttributeType", params["$select"]) + + def test_extra_select_skips_odata_annotation_pieces(self): + """_get_attribute_metadata skips pieces starting with @.""" + self.od._request.return_value = _mock_response(json_data={"value": []}, text="{}") + self.od._get_attribute_metadata("meta-001", "name", extra_select="@odata.type,MaxLength") + params = self.od._request.call_args.kwargs["params"] + self.assertNotIn("@odata.type", params["$select"]) + self.assertIn("MaxLength", params["$select"]) + + def test_value_error_in_json_returns_none(self): + """_get_attribute_metadata returns None on JSON parse failure.""" + response = MagicMock() + response.text = "bad json" + response.json.side_effect = ValueError + self.od._request.return_value = response + result = self.od._get_attribute_metadata("meta-001", "name") + self.assertIsNone(result) + + +class TestWaitForAttributeVisibility(unittest.TestCase): + """Unit tests for _ODataClient._wait_for_attribute_visibility.""" + + def setUp(self): + self.od = _make_odata_client() + + def test_returns_immediately_on_success(self): + """_wait_for_attribute_visibility returns immediately when first probe succeeds.""" + self.od._request.return_value = _mock_response() + self.od._wait_for_attribute_visibility("accounts", "name", delays=(0,)) + self.od._request.assert_called_once() + + def test_retries_on_failure_then_succeeds(self): + """_wait_for_attribute_visibility retries after initial failure.""" + self.od._request.side_effect = [RuntimeError("not ready"), _mock_response()] + self.od._wait_for_attribute_visibility("accounts", "name", delays=(0, 0)) + self.assertEqual(self.od._request.call_count, 2) + + def test_sleep_is_called_for_nonzero_delays(self): + """_wait_for_attribute_visibility calls time.sleep for non-zero delays.""" + self.od._request.side_effect = [RuntimeError("not ready"), _mock_response()] + with patch("PowerPlatform.Dataverse.data._odata.time.sleep") as mock_sleep: + self.od._wait_for_attribute_visibility("accounts", "name", delays=(0, 5)) + mock_sleep.assert_called_once_with(5) + + def test_raises_runtime_error_after_all_retries_exhausted(self): + """_wait_for_attribute_visibility raises RuntimeError when all retries fail.""" + self.od._request.side_effect = RuntimeError("not ready") + with self.assertRaises(RuntimeError) as ctx: + self.od._wait_for_attribute_visibility("accounts", "name", delays=(0, 0)) + self.assertIn("did not become visible", str(ctx.exception)) + + +class TestLocalizedLabelsPayload(unittest.TestCase): + """Unit tests for _ODataClient._build_localizedlabels_payload.""" + + def setUp(self): + self.od = _make_odata_client() + + def test_non_int_lang_raises_value_error(self): + """_build_localizedlabels_payload raises ValueError for non-int language code.""" + with self.assertRaises(ValueError) as ctx: + self.od._build_localizedlabels_payload({"1033": "English"}) + self.assertIn("must be int", str(ctx.exception)) + + def test_non_string_label_raises_value_error(self): + """_build_localizedlabels_payload raises ValueError for non-string label.""" + with self.assertRaises(ValueError) as ctx: + self.od._build_localizedlabels_payload({1033: 42}) + self.assertIn("non-empty string", str(ctx.exception)) + + def test_empty_translations_raises_value_error(self): + """_build_localizedlabels_payload raises ValueError for empty translations.""" + with self.assertRaises(ValueError) as ctx: + self.od._build_localizedlabels_payload({}) + self.assertIn("At least one translation", str(ctx.exception)) + + def test_empty_string_label_raises_value_error(self): + """_build_localizedlabels_payload raises ValueError for empty string label.""" + with self.assertRaises(ValueError): + self.od._build_localizedlabels_payload({1033: " "}) + + +class TestEnumOptionSetPayload(unittest.TestCase): + """Unit tests for _ODataClient._enum_optionset_payload.""" + + def setUp(self): + self.od = _make_odata_client() + + def test_empty_enum_raises_value_error(self): + """_enum_optionset_payload raises ValueError for enum with no members.""" + + class EmptyEnum(Enum): + pass + + with self.assertRaises(ValueError) as ctx: + self.od._enum_optionset_payload("new_Status", EmptyEnum) + self.assertIn("no members", str(ctx.exception)) + + def test_int_key_in_labels_resolved_to_member_name(self): + """__labels__ with int keys (matching enum values) are resolved to member names.""" + + class Status(Enum): + Active = 1 + Inactive = 2 + + Status.__labels__ = {1033: {1: "Active", 2: "Inactive"}} + result = self.od._enum_optionset_payload("new_Status", Status) + self.assertEqual(len(result["OptionSet"]["Options"]), 2) + + def test_enum_member_object_as_labels_key(self): + """__labels__ with enum member objects as keys resolves member name.""" + + class Status(Enum): + Active = 1 + Inactive = 2 + + Status.__labels__ = {1033: {Status.Active: "Active Label", Status.Inactive: "Inactive Label"}} + result = self.od._enum_optionset_payload("new_Status", Status) + options = result["OptionSet"]["Options"] + self.assertEqual(len(options), 2) + active_opt = next(o for o in options if o["Value"] == 1) + active_label = next( + loc["Label"] for loc in active_opt["Label"]["LocalizedLabels"] if loc["LanguageCode"] == 1033 + ) + self.assertEqual(active_label, "Active Label") + + def test_int_key_not_matching_any_member_raises_value_error(self): + """__labels__ with int key not matching any member raises ValueError.""" + + class Status(Enum): + Active = 1 + + Status.__labels__ = {1033: {99: "Unknown"}} + with self.assertRaises(ValueError) as ctx: + self.od._enum_optionset_payload("new_Status", Status) + self.assertIn("int key", str(ctx.exception)) + + def test_duplicate_enum_values_raises_value_error(self): + """_enum_optionset_payload raises ValueError when two members share the same int value.""" + + # Python treats second definition as an alias; __members__ exposes both names + class Status(Enum): + Active = 1 + DuplicateActive = 1 # alias for Active in Python Enum + + with self.assertRaises(ValueError) as ctx: + self.od._enum_optionset_payload("new_Status", Status) + self.assertIn("Duplicate", str(ctx.exception)) + + def test_non_int_enum_value_raises_value_error(self): + """_enum_optionset_payload raises ValueError for enum member with a non-int value.""" + + class Status(Enum): + Active = "active" + + with self.assertRaises(ValueError) as ctx: + self.od._enum_optionset_payload("new_Status", Status) + self.assertIn("non-int", str(ctx.exception)) + + class TestAttributePayload(unittest.TestCase): """Unit tests for _ODataClient._attribute_payload.""" def setUp(self): self.od = _make_odata_client() + def test_int_dtype(self): + """'int' produces IntegerAttributeMetadata.""" + result = self.od._attribute_payload("new_Count", "int") + self.assertEqual(result["@odata.type"], "Microsoft.Dynamics.CRM.IntegerAttributeMetadata") + + def test_integer_dtype_alias(self): + """'integer' is an alias for 'int'.""" + result = self.od._attribute_payload("new_Count", "integer") + self.assertIn("Integer", result["@odata.type"]) + + def test_decimal_dtype(self): + """'decimal' produces DecimalAttributeMetadata.""" + result = self.od._attribute_payload("new_Price", "decimal") + self.assertEqual(result["@odata.type"], "Microsoft.Dynamics.CRM.DecimalAttributeMetadata") + + def test_money_dtype_alias(self): + """'money' is an alias for 'decimal'.""" + result = self.od._attribute_payload("new_Revenue", "money") + self.assertIn("Decimal", result["@odata.type"]) + + def test_float_dtype(self): + """'float' produces DoubleAttributeMetadata.""" + result = self.od._attribute_payload("new_Score", "float") + self.assertEqual(result["@odata.type"], "Microsoft.Dynamics.CRM.DoubleAttributeMetadata") + + def test_double_dtype_alias(self): + """'double' is an alias for 'float'.""" + result = self.od._attribute_payload("new_Score", "double") + self.assertIn("Double", result["@odata.type"]) + + def test_datetime_dtype(self): + """'datetime' produces DateTimeAttributeMetadata.""" + result = self.od._attribute_payload("new_CreatedDate", "datetime") + self.assertEqual(result["@odata.type"], "Microsoft.Dynamics.CRM.DateTimeAttributeMetadata") + + def test_date_dtype_alias(self): + """'date' is an alias for 'datetime'.""" + result = self.od._attribute_payload("new_BirthDate", "date") + self.assertIn("DateTime", result["@odata.type"]) + + def test_bool_dtype(self): + """'bool' produces BooleanAttributeMetadata.""" + result = self.od._attribute_payload("new_IsActive", "bool") + self.assertEqual(result["@odata.type"], "Microsoft.Dynamics.CRM.BooleanAttributeMetadata") + + def test_boolean_dtype_alias(self): + """'boolean' is an alias for 'bool'.""" + result = self.od._attribute_payload("new_IsActive", "boolean") + self.assertIn("Boolean", result["@odata.type"]) + + def test_file_dtype(self): + """'file' produces FileAttributeMetadata.""" + result = self.od._attribute_payload("new_Attachment", "file") + self.assertEqual(result["@odata.type"], "Microsoft.Dynamics.CRM.FileAttributeMetadata") + + def test_non_string_dtype_raises_value_error(self): + """Non-string dtype raises ValueError.""" + with self.assertRaises(ValueError): + self.od._attribute_payload("new_Field", 42) + def test_memo_type(self): - """'memo' should produce MemoAttributeMetadata with MaxLength 4000.""" + """'memo' produces MemoAttributeMetadata with MaxLength 4000.""" result = self.od._attribute_payload("new_Notes", "memo") self.assertEqual(result["@odata.type"], "Microsoft.Dynamics.CRM.MemoAttributeMetadata") self.assertEqual(result["SchemaName"], "new_Notes") @@ -535,13 +1583,13 @@ def test_memo_type(self): self.assertNotIn("IsPrimaryName", result) def test_multiline_alias(self): - """'multiline' should produce identical payload to 'memo'.""" + """'multiline' produces identical payload to 'memo'.""" memo_result = self.od._attribute_payload("new_Description", "memo") multiline_result = self.od._attribute_payload("new_Description", "multiline") self.assertEqual(multiline_result, memo_result) - def test_string_type(self): - """'string' should produce StringAttributeMetadata with MaxLength 200.""" + def test_string_type_max_length(self): + """'string' produces StringAttributeMetadata with MaxLength 200.""" result = self.od._attribute_payload("new_Title", "string") self.assertEqual(result["@odata.type"], "Microsoft.Dynamics.CRM.StringAttributeMetadata") self.assertEqual(result["MaxLength"], 200) @@ -553,6 +1601,388 @@ def test_unsupported_type_returns_none(self): self.assertIsNone(result) +class TestGetTableInfo(unittest.TestCase): + """Unit tests for _ODataClient._get_table_info.""" + + def setUp(self): + self.od = _make_odata_client() + + def test_returns_none_when_entity_not_found(self): + """_get_table_info returns None when entity does not exist.""" + self.od._request.return_value = _mock_response(json_data={"value": []}, text="{}") + self.assertIsNone(self.od._get_table_info("new_NonExistent")) + + def test_returns_metadata_when_found(self): + """_get_table_info returns metadata dict when entity exists.""" + self.od._request.return_value = _entity_def_response() + result = self.od._get_table_info("account") + self.assertIsNotNone(result) + self.assertIn("entity_set_name", result) + + def test_returns_full_dict_shape(self): + """_get_table_info returns all expected keys from metadata.""" + self.od._request.return_value = _entity_def_response( + entity_set_name="accounts", primary_id="accountid", metadata_id="meta-001" + ) + result = self.od._get_table_info("account") + self.assertEqual(result["table_schema_name"], "Account") + self.assertEqual(result["table_logical_name"], "account") + self.assertEqual(result["entity_set_name"], "accounts") + self.assertEqual(result["metadata_id"], "meta-001") + self.assertEqual(result["primary_id_attribute"], "accountid") + self.assertIsInstance(result["columns_created"], list) + self.assertEqual(result["columns_created"], []) + + +class TestDeleteTable(unittest.TestCase): + """Unit tests for _ODataClient._delete_table.""" + + def setUp(self): + self.od = _make_odata_client() + + def test_deletes_table_by_metadata_id(self): + """_delete_table issues DELETE to EntityDefinitions({MetadataId}).""" + self.od._request.return_value = _mock_response() + self.od._get_entity_by_table_schema_name = MagicMock( + return_value={"MetadataId": "meta-001", "SchemaName": "new_Test"} + ) + self.od._delete_table("new_Test") + delete_call = next(c for c in self.od._request.call_args_list if c.args[0] == "delete") + self.assertIn("meta-001", delete_call.args[1]) + + def test_raises_metadata_error_when_not_found(self): + """_delete_table raises MetadataError when entity does not exist.""" + self.od._get_entity_by_table_schema_name = MagicMock(return_value=None) + with self.assertRaises(MetadataError): + self.od._delete_table("new_NonExistent") + + def test_raises_metadata_error_when_metadata_id_missing(self): + """_delete_table raises MetadataError when MetadataId is absent from entity.""" + self.od._get_entity_by_table_schema_name = MagicMock(return_value={"SchemaName": "new_Test"}) + with self.assertRaises(MetadataError): + self.od._delete_table("new_Test") + + +class TestCreateAlternateKey(unittest.TestCase): + """Unit tests for _ODataClient._create_alternate_key.""" + + def setUp(self): + self.od = _make_odata_client() + + def test_creates_alternate_key(self): + """_create_alternate_key posts to Keys endpoint and returns metadata.""" + post_response = MagicMock() + post_response.headers = {"OData-EntityId": "https://example.com/Keys(key-meta-001)"} + self.od._get_entity_by_table_schema_name = MagicMock( + return_value={"MetadataId": "meta-001", "LogicalName": "account", "SchemaName": "Account"} + ) + self.od._request.return_value = post_response + result = self.od._create_alternate_key("account", "new_AccountNumKey", ["accountnumber"]) + self.assertEqual(result["schema_name"], "new_AccountNumKey") + self.assertEqual(result["key_attributes"], ["accountnumber"]) + + def test_display_name_label_passed_to_payload(self): + """_create_alternate_key includes DisplayName when display_name_label is provided.""" + post_response = MagicMock() + post_response.headers = {"OData-EntityId": "https://example.com/Keys(key-id)"} + self.od._get_entity_by_table_schema_name = MagicMock( + return_value={"MetadataId": "meta-001", "LogicalName": "account"} + ) + self.od._request.return_value = post_response + mock_label = MagicMock() + mock_label.to_dict.return_value = {"LocalizedLabels": [{"Label": "Account Number Key", "LanguageCode": 1033}]} + self.od._create_alternate_key("account", "new_AccNumKey", ["accountnumber"], display_name_label=mock_label) + payload = self.od._request.call_args.kwargs["json"] + self.assertIn("DisplayName", payload) + mock_label.to_dict.assert_called_once() + + def test_raises_metadata_error_when_table_not_found(self): + """_create_alternate_key raises MetadataError when table not found.""" + self.od._get_entity_by_table_schema_name = MagicMock(return_value=None) + with self.assertRaises(MetadataError): + self.od._create_alternate_key("nonexistent", "key", ["col"]) + + +class TestGetAlternateKeys(unittest.TestCase): + """Unit tests for _ODataClient._get_alternate_keys.""" + + def setUp(self): + self.od = _make_odata_client() + + def test_returns_keys_list(self): + """_get_alternate_keys returns list of alternate keys.""" + self.od._get_entity_by_table_schema_name = MagicMock( + return_value={"MetadataId": "meta-001", "LogicalName": "account"} + ) + self.od._request.return_value = _mock_response( + json_data={"value": [{"SchemaName": "new_AccountNumKey"}]}, + text="...", + ) + result = self.od._get_alternate_keys("account") + self.assertEqual(len(result), 1) + self.assertEqual(result[0]["SchemaName"], "new_AccountNumKey") + + def test_raises_metadata_error_when_table_not_found(self): + """_get_alternate_keys raises MetadataError when table not found.""" + self.od._get_entity_by_table_schema_name = MagicMock(return_value=None) + with self.assertRaises(MetadataError): + self.od._get_alternate_keys("nonexistent") + + +class TestDeleteAlternateKey(unittest.TestCase): + """Unit tests for _ODataClient._delete_alternate_key.""" + + def setUp(self): + self.od = _make_odata_client() + + def test_deletes_alternate_key(self): + """_delete_alternate_key issues DELETE to Keys({key_id}).""" + self.od._get_entity_by_table_schema_name = MagicMock( + return_value={"MetadataId": "meta-001", "LogicalName": "account"} + ) + self.od._request.return_value = _mock_response() + self.od._delete_alternate_key("account", "key-meta-001") + delete_call = self.od._request.call_args + self.assertEqual(delete_call.args[0], "delete") + self.assertIn("key-meta-001", delete_call.args[1]) + + def test_raises_metadata_error_when_table_not_found(self): + """_delete_alternate_key raises MetadataError when table not found.""" + self.od._get_entity_by_table_schema_name = MagicMock(return_value=None) + with self.assertRaises(MetadataError): + self.od._delete_alternate_key("nonexistent", "key-id") + + +class TestCreateTable(unittest.TestCase): + """Unit tests for _ODataClient._create_table.""" + + def setUp(self): + self.od = _make_odata_client() + + def _setup_for_create(self, entity_exists=False): + """Mock helpers for _create_table.""" + existing = {"MetadataId": "meta-001", "EntitySetName": "accounts"} if entity_exists else None + created = { + "MetadataId": "meta-001", + "EntitySetName": "new_testtables", + "LogicalName": "new_testtable", + "SchemaName": "new_TestTable", + "PrimaryNameAttribute": "new_name", + "PrimaryIdAttribute": "new_testtableid", + } + call_count = [0] + + def mock_get_entity(table_schema_name, headers=None): + call_count[0] += 1 + if entity_exists: + return existing + return None if call_count[0] == 1 else created + + self.od._get_entity_by_table_schema_name = MagicMock(side_effect=mock_get_entity) + self.od._request.return_value = _mock_response(status_code=201) + + def test_creates_table_successfully(self): + """_create_table returns metadata dict on success.""" + self._setup_for_create() + result = self.od._create_table("new_TestTable", {"new_Name": "string", "new_Age": "int"}) + self.assertEqual(result["table_schema_name"], "new_TestTable") + self.assertIn("new_Name", result["columns_created"]) + self.assertIn("new_Age", result["columns_created"]) + + def test_raises_metadata_error_when_table_already_exists(self): + """_create_table raises MetadataError when table already exists.""" + self._setup_for_create(entity_exists=True) + with self.assertRaises(MetadataError): + self.od._create_table("new_TestTable", {"new_Name": "string"}) + + def test_raises_value_error_for_unsupported_column_type(self): + """_create_table raises ValueError for unsupported column type.""" + self._setup_for_create() + with self.assertRaises(ValueError) as ctx: + self.od._create_table("new_TestTable", {"new_Col": "unsupported_type"}) + self.assertIn("Unsupported column type", str(ctx.exception)) + + def test_raises_type_error_for_non_string_solution_name(self): + """_create_table raises TypeError when solution_unique_name is not str.""" + self._setup_for_create() + with self.assertRaises(TypeError): + self.od._create_table("new_TestTable", {}, solution_unique_name=123) + + def test_raises_value_error_for_empty_solution_name(self): + """_create_table raises ValueError when solution_unique_name is empty string.""" + self._setup_for_create() + with self.assertRaises(ValueError): + self.od._create_table("new_TestTable", {}, solution_unique_name="") + + def test_primary_column_schema_name_used_when_provided(self): + """_create_table uses provided primary_column_schema_name in the POST payload.""" + self._setup_for_create() + self.od._create_table("new_TestTable", {}, primary_column_schema_name="new_CustomName") + post_json = self.od._request.call_args.kwargs["json"] + attrs = post_json["Attributes"] + primary_attr = next((a for a in attrs if a.get("IsPrimaryName")), None) + self.assertIsNotNone(primary_attr) + self.assertEqual(primary_attr["SchemaName"], "new_CustomName") + + +class TestCreateColumns(unittest.TestCase): + """Unit tests for _ODataClient._create_columns.""" + + def setUp(self): + self.od = _make_odata_client() + self.od._get_entity_by_table_schema_name = MagicMock( + return_value={"MetadataId": "meta-001", "SchemaName": "new_Test"} + ) + self.od._request.return_value = _mock_response(status_code=201) + + def test_creates_columns_successfully(self): + """_create_columns returns list of created column names.""" + result = self.od._create_columns("new_Test", {"new_Name": "string", "new_Age": "int"}) + self.assertIn("new_Name", result) + self.assertIn("new_Age", result) + + def test_empty_columns_raises_type_error(self): + """_create_columns raises TypeError for empty columns dict.""" + with self.assertRaises(TypeError): + self.od._create_columns("new_Test", {}) + + def test_non_dict_columns_raises_type_error(self): + """_create_columns raises TypeError for non-dict columns.""" + with self.assertRaises(TypeError): + self.od._create_columns("new_Test", None) + + def test_table_not_found_raises_metadata_error(self): + """_create_columns raises MetadataError when table does not exist.""" + self.od._get_entity_by_table_schema_name = MagicMock(return_value=None) + with self.assertRaises(MetadataError): + self.od._create_columns("new_NonExistent", {"new_Col": "string"}) + + def test_unsupported_column_type_raises_validation_error(self): + """Raises ValidationError for unsupported column type.""" + from PowerPlatform.Dataverse.core.errors import ValidationError + + with self.assertRaises(ValidationError): + self.od._create_columns("new_Test", {"new_Col": "unsupported"}) + + def test_picklist_column_flushes_cache(self): + """_create_columns calls _flush_cache when a picklist column is created.""" + self.od._flush_cache = MagicMock(return_value=0) + + class Status(Enum): + Active = 1 + + result = self.od._create_columns("new_Test", {"new_Status": Status}) + self.assertIn("new_Status", result) + self.od._flush_cache.assert_called_once_with("picklist") + + def test_posts_to_correct_endpoint(self): + """_create_columns POSTs each column to EntityDefinitions({metadata_id})/Attributes.""" + self.od._create_columns("new_Test", {"new_Name": "string"}) + call_args = self.od._request.call_args + self.assertEqual(call_args.args[0], "post") + self.assertIn("EntityDefinitions(meta-001)/Attributes", call_args.args[1]) + + +class TestDeleteColumns(unittest.TestCase): + """Unit tests for _ODataClient._delete_columns.""" + + def setUp(self): + self.od = _make_odata_client() + self.od._get_entity_by_table_schema_name = MagicMock( + return_value={"MetadataId": "meta-001", "SchemaName": "new_Test"} + ) + self.od._get_attribute_metadata = MagicMock( + return_value={"MetadataId": "attr-001", "LogicalName": "new_name", "@odata.type": "StringAttributeMetadata"} + ) + self.od._request.return_value = _mock_response(status_code=204) + + def test_deletes_single_column(self): + """_delete_columns accepts a string column name and issues DELETE.""" + result = self.od._delete_columns("new_Test", "new_Name") + self.assertIn("new_Name", result) + delete_calls = [c for c in self.od._request.call_args_list if c.args[0] == "delete"] + self.assertEqual(len(delete_calls), 1) + self.assertIn("attr-001", delete_calls[0].args[1]) + + def test_deletes_list_of_columns(self): + """_delete_columns accepts a list of column names and issues DELETE for each.""" + result = self.od._delete_columns("new_Test", ["new_Name1", "new_Name2"]) + self.assertEqual(len(result), 2) + delete_calls = [c for c in self.od._request.call_args_list if c.args[0] == "delete"] + self.assertEqual(len(delete_calls), 2) + + def test_non_string_non_list_raises_type_error(self): + """_delete_columns raises TypeError for invalid columns type.""" + with self.assertRaises(TypeError): + self.od._delete_columns("new_Test", 42) + + def test_empty_column_name_raises_value_error(self): + """_delete_columns raises ValueError for empty column name.""" + with self.assertRaises(ValueError): + self.od._delete_columns("new_Test", "") + + def test_table_not_found_raises_metadata_error(self): + """_delete_columns raises MetadataError when table not found.""" + self.od._get_entity_by_table_schema_name = MagicMock(return_value=None) + with self.assertRaises(MetadataError): + self.od._delete_columns("new_NonExistent", "new_Col") + + def test_column_not_found_raises_metadata_error(self): + """_delete_columns raises MetadataError when column not found.""" + self.od._get_attribute_metadata = MagicMock(return_value=None) + with self.assertRaises(MetadataError) as ctx: + self.od._delete_columns("new_Test", "new_Missing") + self.assertIn("not found", str(ctx.exception)) + + def test_missing_metadata_id_raises_runtime_error(self): + """_delete_columns raises RuntimeError when column MetadataId is missing.""" + self.od._get_attribute_metadata = MagicMock(return_value={"LogicalName": "new_name"}) + with self.assertRaises(RuntimeError) as ctx: + self.od._delete_columns("new_Test", "new_Name") + self.assertIn("MetadataId", str(ctx.exception)) + + def test_picklist_column_deletion_flushes_cache(self): + """_delete_columns flushes picklist cache when a picklist column is deleted.""" + self.od._get_attribute_metadata = MagicMock( + return_value={ + "MetadataId": "attr-001", + "LogicalName": "new_status", + "@odata.type": "PicklistAttributeMetadata", + } + ) + self.od._flush_cache = MagicMock(return_value=0) + self.od._delete_columns("new_Test", "new_Status") + self.od._flush_cache.assert_called_once_with("picklist") + + +class TestFlushCache(unittest.TestCase): + """Unit tests for _ODataClient._flush_cache.""" + + def setUp(self): + self.od = _make_odata_client() + + def test_flush_picklist_clears_cache(self): + """_flush_cache('picklist') clears _picklist_label_cache.""" + self.od._picklist_label_cache = {("account", "statuscode"): {"map": {}, "ts": 0.0}} + removed = self.od._flush_cache("picklist") + self.assertEqual(removed, 1) + self.assertEqual(len(self.od._picklist_label_cache), 0) + + def test_flush_empty_cache_returns_zero(self): + """_flush_cache returns 0 when cache is already empty.""" + self.assertEqual(self.od._flush_cache("picklist"), 0) + + def test_unsupported_cache_kind_raises_validation_error(self): + """_flush_cache raises ValidationError for unsupported kind.""" + with self.assertRaises(ValidationError): + self.od._flush_cache("entityset") + + def test_none_kind_raises_validation_error(self): + """_flush_cache raises ValidationError for None kind.""" + with self.assertRaises(ValidationError): + self.od._flush_cache(None) + + class TestPicklistLabelResolution(unittest.TestCase): """Tests for picklist label-to-integer resolution. @@ -994,11 +2424,11 @@ def test_convert_different_tables_separate_fetches(self): resp2 = self._bulk_response(("new_status", [(100, "Open")])) self.od._request.side_effect = [resp1, resp2] - r1 = self.od._convert_labels_to_ints("account", {"industrycode": "Tech"}) - r2 = self.od._convert_labels_to_ints("new_ticket", {"new_status": "Open"}) + result1 = self.od._convert_labels_to_ints("account", {"industrycode": "Tech"}) + result2 = self.od._convert_labels_to_ints("new_ticket", {"new_status": "Open"}) - self.assertEqual(r1["industrycode"], 6) - self.assertEqual(r2["new_status"], 100) + self.assertEqual(result1["industrycode"], 6) + self.assertEqual(result2["new_status"], 100) self.assertEqual(self.od._request.call_count, 2) def test_convert_only_odata_and_non_strings_skips_fetch(self): @@ -1312,8 +2742,6 @@ def setUp(self): self.od = _make_odata_client() def _targets(self, alt_keys, records): - import json - req = self.od._build_upsert_multiple("accounts", "account", alt_keys, records) return json.loads(req.body)["Targets"] diff --git a/tests/unit/data/test_relationships.py b/tests/unit/data/test_relationships.py index c67b3f1e..6636b184 100644 --- a/tests/unit/data/test_relationships.py +++ b/tests/unit/data/test_relationships.py @@ -196,6 +196,19 @@ def test_create_m2m_relationship_returns_result(self): self.assertEqual(result["entity1_logical_name"], "account") self.assertEqual(result["entity2_logical_name"], "contact") + def test_create_m2m_relationship_with_solution(self): + """Solution name is added as MSCRM.SolutionUniqueName header.""" + mock_response = Mock() + mock_response.headers = { + "OData-EntityId": "https://example.crm.dynamics.com/api/data/v9.2/RelationshipDefinitions(abcd1234-abcd-1234-abcd-1234abcd5678)" + } + self.client._mock_request.return_value = mock_response + + self.client._create_many_to_many_relationship(self.relationship, solution="MySolution") + + headers = self.client._mock_request.call_args.kwargs["headers"] + self.assertEqual(headers["MSCRM.SolutionUniqueName"], "MySolution") + class TestDeleteRelationship(unittest.TestCase): """Tests for _delete_relationship method.""" diff --git a/tests/unit/data/test_upload.py b/tests/unit/data/test_upload.py new file mode 100644 index 00000000..2cf3b751 --- /dev/null +++ b/tests/unit/data/test_upload.py @@ -0,0 +1,430 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + +import os +import tempfile +import unittest +from unittest.mock import MagicMock, patch + +from PowerPlatform.Dataverse.data._odata import _ODataClient + + +def _make_odata_client() -> _ODataClient: + """Return an _ODataClient with HTTP calls mocked out.""" + mock_auth = MagicMock() + mock_auth._acquire_token.return_value = MagicMock(access_token="token") + client = _ODataClient(mock_auth, "https://example.crm.dynamics.com") + client._request = MagicMock() + return client + + +def _make_temp_file(content: bytes = b"test content", suffix: str = ".bin") -> str: + """Create a temporary file and return its path. Caller must delete.""" + with tempfile.NamedTemporaryFile(suffix=suffix, delete=False) as f: + f.write(content) + return f.name + + +class TestUploadFile(unittest.TestCase): + """Tests for _upload_file() mode selection, column auto-creation, and argument forwarding.""" + + def setUp(self): + self.od = _make_odata_client() + self.od._entity_set_from_schema_name = MagicMock(return_value="accounts") + self.od._get_entity_by_table_schema_name = MagicMock( + return_value={"MetadataId": "meta-1", "LogicalName": "account"} + ) + self.od._get_attribute_metadata = MagicMock(return_value={"LogicalName": "new_document"}) + + def test_auto_mode_small_file(self): + """Auto mode routes files <128MB to _upload_file_small.""" + path = _make_temp_file() + self.addCleanup(os.unlink, path) + self.od._upload_file_small = MagicMock() + self.od._upload_file("account", "guid-1", "new_Document", path, mode="auto") + self.od._upload_file_small.assert_called_once() + + def test_auto_mode_large_file_routes_to_chunk(self): + """Auto mode routes files >=128MB to _upload_file_chunk.""" + path = _make_temp_file() + self.addCleanup(os.unlink, path) + self.od._upload_file_chunk = MagicMock() + with patch("os.path.getsize", return_value=128 * 1024 * 1024): + self.od._upload_file("account", "guid-1", "new_Document", path, mode="auto") + self.od._upload_file_chunk.assert_called_once() + + def test_default_mode_is_auto(self): + """mode=None is treated as 'auto'.""" + path = _make_temp_file() + self.addCleanup(os.unlink, path) + self.od._upload_file_small = MagicMock() + self.od._upload_file("account", "guid-1", "new_Document", path) + self.od._upload_file_small.assert_called_once() + + def test_auto_mode_file_not_found(self): + """Auto mode raises FileNotFoundError for missing file.""" + with self.assertRaises(FileNotFoundError): + self.od._upload_file("account", "guid-1", "new_Document", "/nonexistent/file.pdf") + + def test_explicit_small_mode(self): + """Explicit 'small' mode calls _upload_file_small.""" + path = _make_temp_file() + self.addCleanup(os.unlink, path) + self.od._upload_file_small = MagicMock() + self.od._upload_file("account", "guid-1", "new_Document", path, mode="small") + self.od._upload_file_small.assert_called_once() + + def test_explicit_chunk_mode(self): + """Explicit 'chunk' mode calls _upload_file_chunk.""" + path = _make_temp_file() + self.addCleanup(os.unlink, path) + self.od._upload_file_chunk = MagicMock() + self.od._upload_file("account", "guid-1", "new_Document", path, mode="chunk") + self.od._upload_file_chunk.assert_called_once() + + def test_invalid_mode_raises(self): + """Invalid mode raises ValueError.""" + path = _make_temp_file() + self.addCleanup(os.unlink, path) + with self.assertRaises(ValueError) as ctx: + self.od._upload_file("account", "guid-1", "new_Document", path, mode="invalid") + self.assertIn("invalid", str(ctx.exception).lower()) + + def test_column_auto_creation_when_missing(self): + """Creates file column when attribute metadata not found.""" + self.od._get_attribute_metadata = MagicMock(return_value=None) + self.od._create_columns = MagicMock() + self.od._wait_for_attribute_visibility = MagicMock() + self.od._upload_file_small = MagicMock() + path = _make_temp_file() + self.addCleanup(os.unlink, path) + self.od._upload_file("account", "guid-1", "new_Document", path, mode="small") + self.od._create_columns.assert_called_once_with("account", {"new_Document": "file"}) + self.od._wait_for_attribute_visibility.assert_called_once_with("accounts", "new_Document") + + def test_column_exists_skips_creation(self): + """Does not create column when attribute already exists.""" + self.od._create_columns = MagicMock() + self.od._upload_file_small = MagicMock() + path = _make_temp_file() + self.addCleanup(os.unlink, path) + self.od._upload_file("account", "guid-1", "new_Document", path, mode="small") + self.od._create_columns.assert_not_called() + + def test_no_entity_metadata_skips_column_check(self): + """Skips column check entirely when entity metadata is None.""" + self.od._get_entity_by_table_schema_name = MagicMock(return_value=None) + self.od._get_attribute_metadata = MagicMock() + self.od._upload_file_small = MagicMock() + path = _make_temp_file() + self.addCleanup(os.unlink, path) + self.od._upload_file("account", "guid-1", "new_Document", path, mode="small") + self.od._get_attribute_metadata.assert_not_called() + + def test_entity_metadata_without_metadata_id_skips_column_check(self): + """Skips attribute check when entity metadata has no MetadataId.""" + self.od._get_entity_by_table_schema_name = MagicMock(return_value={"LogicalName": "account"}) + self.od._get_attribute_metadata = MagicMock() + self.od._upload_file_small = MagicMock() + path = _make_temp_file() + self.addCleanup(os.unlink, path) + self.od._upload_file("account", "guid-1", "new_Document", path, mode="small") + self.od._get_attribute_metadata.assert_not_called() + + def test_lowercases_attribute_name(self): + """File name attribute is lowercased for URL usage.""" + self.od._upload_file_small = MagicMock() + path = _make_temp_file() + self.addCleanup(os.unlink, path) + self.od._upload_file("account", "guid-1", "new_Document", path, mode="small") + # Third positional arg to _upload_file_small is the logical_name (lowercased) + self.assertEqual(self.od._upload_file_small.call_args.args[2], "new_document") + + def test_passes_mime_type_to_small(self): + """mime_type is forwarded as content_type to _upload_file_small.""" + self.od._upload_file_small = MagicMock() + path = _make_temp_file() + self.addCleanup(os.unlink, path) + self.od._upload_file("account", "guid-1", "new_Document", path, mode="small", mime_type="text/csv") + self.assertEqual(self.od._upload_file_small.call_args.kwargs["content_type"], "text/csv") + + def test_passes_if_none_match_to_small(self): + """if_none_match is forwarded to _upload_file_small.""" + self.od._upload_file_small = MagicMock() + path = _make_temp_file() + self.addCleanup(os.unlink, path) + self.od._upload_file("account", "guid-1", "new_Document", path, mode="small", if_none_match=False) + self.assertFalse(self.od._upload_file_small.call_args.kwargs["if_none_match"]) + + def test_passes_if_none_match_to_chunk(self): + """if_none_match is forwarded to _upload_file_chunk.""" + self.od._upload_file_chunk = MagicMock() + path = _make_temp_file() + self.addCleanup(os.unlink, path) + self.od._upload_file("account", "guid-1", "new_Document", path, mode="chunk", if_none_match=False) + self.assertFalse(self.od._upload_file_chunk.call_args.kwargs["if_none_match"]) + + +class TestUploadFileSmall(unittest.TestCase): + """Tests for _upload_file_small() single PATCH upload.""" + + def setUp(self): + self.od = _make_odata_client() + + def test_successful_upload(self): + """Sends PATCH with correct URL, headers and file data.""" + path = _make_temp_file(b"PDF file content here", suffix=".pdf") + self.addCleanup(os.unlink, path) + self.od._upload_file_small("accounts", "guid-1", "new_document", path) + self.od._request.assert_called_once() + call = self.od._request.call_args + self.assertEqual(call.args[0], "patch") + self.assertIn("new_document", call.args[1]) + self.assertEqual(call.kwargs["data"], b"PDF file content here") + + def test_url_contains_entity_set_and_record_id(self): + """URL is constructed from entity_set, record_id, and attribute.""" + path = _make_temp_file() + self.addCleanup(os.unlink, path) + self.od._upload_file_small("accounts", "guid-1", "new_document", path) + url = self.od._request.call_args.args[1] + self.assertIn("accounts", url) + self.assertIn("guid-1", url) + self.assertIn("new_document", url) + + def test_if_none_match_header(self): + """if_none_match=True sends If-None-Match: null.""" + path = _make_temp_file() + self.addCleanup(os.unlink, path) + self.od._upload_file_small("accounts", "guid-1", "col", path, if_none_match=True) + headers = self.od._request.call_args.kwargs["headers"] + self.assertEqual(headers["If-None-Match"], "null") + self.assertNotIn("If-Match", headers) + + def test_if_match_overwrite_header(self): + """if_none_match=False sends If-Match: *.""" + path = _make_temp_file() + self.addCleanup(os.unlink, path) + self.od._upload_file_small("accounts", "guid-1", "col", path, if_none_match=False) + headers = self.od._request.call_args.kwargs["headers"] + self.assertEqual(headers["If-Match"], "*") + self.assertNotIn("If-None-Match", headers) + + def test_custom_mime_type(self): + """Custom content_type is used in Content-Type header.""" + path = _make_temp_file(b"{}", suffix=".json") + self.addCleanup(os.unlink, path) + self.od._upload_file_small("accounts", "guid-1", "col", path, content_type="application/json") + headers = self.od._request.call_args.kwargs["headers"] + self.assertEqual(headers["Content-Type"], "application/json") + + def test_default_mime_type(self): + """Default Content-Type is application/octet-stream.""" + path = _make_temp_file() + self.addCleanup(os.unlink, path) + self.od._upload_file_small("accounts", "guid-1", "col", path) + headers = self.od._request.call_args.kwargs["headers"] + self.assertEqual(headers["Content-Type"], "application/octet-stream") + + def test_file_not_found_raises(self): + """Raises FileNotFoundError for missing file.""" + with self.assertRaises(FileNotFoundError): + self.od._upload_file_small("accounts", "guid-1", "col", "/no/such/file.txt") + + def test_empty_record_id_raises(self): + """Raises ValueError for empty record_id.""" + with self.assertRaises(ValueError): + self.od._upload_file_small("accounts", "", "col", "/any/path") + + def test_file_name_in_header(self): + """x-ms-file-name header contains the basename of the file.""" + path = _make_temp_file(b"a,b,c", suffix=".csv") + self.addCleanup(os.unlink, path) + self.od._upload_file_small("accounts", "guid-1", "col", path) + headers = self.od._request.call_args.kwargs["headers"] + self.assertEqual(headers["x-ms-file-name"], os.path.basename(path)) + + def test_file_exceeds_small_upload_limit_raises(self): + """Raises ValueError when file exceeds 128MB single-upload limit.""" + path = _make_temp_file() + self.addCleanup(os.unlink, path) + with patch("os.path.getsize", return_value=128 * 1024 * 1024 + 1): + with self.assertRaises(ValueError) as ctx: + self.od._upload_file_small("accounts", "guid-1", "col", path) + self.assertIn("chunk", str(ctx.exception).lower()) + + +class TestUploadFileChunk(unittest.TestCase): + """Tests for _upload_file_chunk() streaming chunked upload.""" + + def setUp(self): + self.od = _make_odata_client() + + @staticmethod + def _mock_init_response(location="https://example.com/session?token=abc", chunk_size=None): + """Create a mock init PATCH response with Location and optional chunk-size headers.""" + resp = MagicMock() + headers = {"Location": location} + if chunk_size is not None: + headers["x-ms-chunk-size"] = str(chunk_size) + resp.headers = headers + return resp + + def test_init_patch_sends_chunked_header(self): + """Initial PATCH sends x-ms-transfer-mode: chunked.""" + self.od._request.return_value = self._mock_init_response() + path = _make_temp_file(b"x" * 100) + self.addCleanup(os.unlink, path) + self.od._upload_file_chunk("accounts", "guid-1", "col", path) + init_call = self.od._request.call_args_list[0] + self.assertEqual(init_call.kwargs["headers"]["x-ms-transfer-mode"], "chunked") + + def test_init_url_contains_file_name(self): + """Init PATCH URL includes x-ms-file-name query parameter.""" + self.od._request.return_value = self._mock_init_response() + path = _make_temp_file(b"data", suffix=".pdf") + self.addCleanup(os.unlink, path) + self.od._upload_file_chunk("accounts", "guid-1", "col", path) + init_url = self.od._request.call_args_list[0].args[1] + self.assertIn("x-ms-file-name=", init_url) + + def test_missing_location_header_raises(self): + """Raises RuntimeError when init response lacks Location header.""" + resp = MagicMock() + resp.headers = {} + self.od._request.return_value = resp + path = _make_temp_file() + self.addCleanup(os.unlink, path) + with self.assertRaises(RuntimeError) as ctx: + self.od._upload_file_chunk("accounts", "guid-1", "col", path) + self.assertIn("Location", str(ctx.exception)) + + def test_lowercase_location_header_accepted(self): + """Accepts lowercase 'location' header as fallback.""" + resp = MagicMock() + resp.headers = {"location": "https://example.com/session?token=abc"} + self.od._request.return_value = resp + path = _make_temp_file(b"data") + self.addCleanup(os.unlink, path) + self.od._upload_file_chunk("accounts", "guid-1", "col", path) + # 1 init + 1 chunk = 2 total calls + self.assertEqual(self.od._request.call_count, 2) + + def test_uses_chunk_size_from_response(self): + """Uses x-ms-chunk-size from init response to determine chunk size.""" + self.od._request.return_value = self._mock_init_response(chunk_size=50) + path = _make_temp_file(b"x" * 120) # 120 bytes / 50-byte chunks = 3 chunks + self.addCleanup(os.unlink, path) + self.od._upload_file_chunk("accounts", "guid-1", "col", path) + # 1 init + 3 chunk calls = 4 total + self.assertEqual(self.od._request.call_count, 4) + + def test_default_chunk_size_when_header_missing(self): + """Falls back to 4MB chunk size when x-ms-chunk-size header missing.""" + self.od._request.return_value = self._mock_init_response() # no chunk_size + path = _make_temp_file(b"x" * 100) # 100 bytes < 4MB = single chunk + self.addCleanup(os.unlink, path) + self.od._upload_file_chunk("accounts", "guid-1", "col", path) + # 1 init + 1 chunk = 2 total + self.assertEqual(self.od._request.call_count, 2) + + def test_malformed_chunk_size_header_falls_back_to_default(self): + """Non-integer x-ms-chunk-size falls back to 4MB default.""" + resp = MagicMock() + resp.headers = {"Location": "https://example.com/session", "x-ms-chunk-size": "not-a-number"} + self.od._request.return_value = resp + path = _make_temp_file(b"x" * 100) # 100 bytes < 4MB = single chunk + self.addCleanup(os.unlink, path) + self.od._upload_file_chunk("accounts", "guid-1", "col", path) + # Falls back to 4MB default → 100 bytes = 1 chunk → 2 total calls + self.assertEqual(self.od._request.call_count, 2) + + def test_negative_chunk_size_raises(self): + """Negative x-ms-chunk-size raises ValueError (zero falls back to 4MB default).""" + resp = MagicMock() + resp.headers = {"Location": "https://example.com/session", "x-ms-chunk-size": "-1"} + self.od._request.return_value = resp + path = _make_temp_file(b"data") + self.addCleanup(os.unlink, path) + with self.assertRaises(ValueError): + self.od._upload_file_chunk("accounts", "guid-1", "col", path) + + def test_empty_file_completes_without_chunk_requests(self): + """Zero-byte file sends only the init PATCH, no chunk PATCHes.""" + self.od._request.return_value = self._mock_init_response() + path = _make_temp_file(b"") + self.addCleanup(os.unlink, path) + self.od._upload_file_chunk("accounts", "guid-1", "col", path) + # Only the init PATCH is sent + self.assertEqual(self.od._request.call_count, 1) + + def test_content_range_headers(self): + """Each chunk has correct Content-Range header.""" + self.od._request.return_value = self._mock_init_response(chunk_size=10) + path = _make_temp_file(b"A" * 10 + b"B" * 10 + b"C" * 5) # 25 bytes -> 3 chunks + self.addCleanup(os.unlink, path) + self.od._upload_file_chunk("accounts", "guid-1", "col", path) + chunk_calls = self.od._request.call_args_list[1:] # skip init + self.assertEqual(len(chunk_calls), 3) + self.assertEqual(chunk_calls[0].kwargs["headers"]["Content-Range"], "bytes 0-9/25") + self.assertEqual(chunk_calls[1].kwargs["headers"]["Content-Range"], "bytes 10-19/25") + self.assertEqual(chunk_calls[2].kwargs["headers"]["Content-Range"], "bytes 20-24/25") + + def test_chunk_content_length_header(self): + """Each chunk includes correct Content-Length header.""" + self.od._request.return_value = self._mock_init_response(chunk_size=10) + path = _make_temp_file(b"A" * 10 + b"B" * 5) # 15 bytes -> 2 chunks (10 + 5) + self.addCleanup(os.unlink, path) + self.od._upload_file_chunk("accounts", "guid-1", "col", path) + chunk_calls = self.od._request.call_args_list[1:] + self.assertEqual(chunk_calls[0].kwargs["headers"]["Content-Length"], "10") + self.assertEqual(chunk_calls[1].kwargs["headers"]["Content-Length"], "5") + + def test_chunk_sends_to_location_url(self): + """Chunk PATCHes go to the Location URL, not the original URL.""" + session_url = "https://example.com/upload?session=xyz" + self.od._request.return_value = self._mock_init_response(location=session_url) + path = _make_temp_file() + self.addCleanup(os.unlink, path) + self.od._upload_file_chunk("accounts", "guid-1", "col", path) + chunk_call = self.od._request.call_args_list[1] + self.assertEqual(chunk_call.args[1], session_url) + + def test_if_none_match_on_init(self): + """if_none_match=True sends If-None-Match on init PATCH.""" + self.od._request.return_value = self._mock_init_response() + path = _make_temp_file() + self.addCleanup(os.unlink, path) + self.od._upload_file_chunk("accounts", "guid-1", "col", path, if_none_match=True) + init_headers = self.od._request.call_args_list[0].kwargs["headers"] + self.assertEqual(init_headers["If-None-Match"], "null") + self.assertNotIn("If-Match", init_headers) + + def test_if_match_overwrite_on_init(self): + """if_none_match=False sends If-Match on init PATCH.""" + self.od._request.return_value = self._mock_init_response() + path = _make_temp_file() + self.addCleanup(os.unlink, path) + self.od._upload_file_chunk("accounts", "guid-1", "col", path, if_none_match=False) + init_headers = self.od._request.call_args_list[0].kwargs["headers"] + self.assertEqual(init_headers["If-Match"], "*") + self.assertNotIn("If-None-Match", init_headers) + + def test_empty_record_id_raises(self): + """Raises ValueError for empty record_id.""" + with self.assertRaises(ValueError): + self.od._upload_file_chunk("accounts", "", "col", "/any/path") + + def test_file_not_found_raises(self): + """Raises FileNotFoundError for missing file.""" + with self.assertRaises(FileNotFoundError): + self.od._upload_file_chunk("accounts", "guid-1", "col", "/no/such/file.bin") + + def test_chunk_requests_accept_206_and_204(self): + """Chunk requests use expected=(206, 204).""" + self.od._request.return_value = self._mock_init_response(chunk_size=50) + path = _make_temp_file(b"x" * 100) + self.addCleanup(os.unlink, path) + self.od._upload_file_chunk("accounts", "guid-1", "col", path) + for chunk_call in self.od._request.call_args_list[1:]: + self.assertEqual(chunk_call.kwargs["expected"], (206, 204)) diff --git a/tests/unit/models/test_query_builder.py b/tests/unit/models/test_query_builder.py index 47bb28fc..9f094912 100644 --- a/tests/unit/models/test_query_builder.py +++ b/tests/unit/models/test_query_builder.py @@ -344,6 +344,12 @@ def test_filter_raw_returns_self(self): qb = QueryBuilder("account") self.assertIs(qb.filter_raw("a eq 1"), qb) + def test_build_with_plain_string_filter_part(self): + """build() handles plain string entries in _filter_parts (internal path).""" + qb = QueryBuilder("account") + qb._filter_parts.append("name eq 'Contoso'") + self.assertEqual(qb.build()["filter"], "name eq 'Contoso'") + class TestWhere(unittest.TestCase): """Tests for the where() method with composable expressions.""" diff --git a/tests/unit/models/test_table_info.py b/tests/unit/models/test_table_info.py index b3da07fa..7842c266 100644 --- a/tests/unit/models/test_table_info.py +++ b/tests/unit/models/test_table_info.py @@ -65,9 +65,15 @@ def test_len(self): def test_keys_values_items(self): self.assertEqual(list(self.info.keys()), list(self.info._LEGACY_KEY_MAP.keys())) + self.assertEqual(self.info.values()[0], "new_Product") items = dict(self.info.items()) self.assertEqual(items["table_schema_name"], "new_Product") + def test_contains_non_string_key_returns_false(self): + """__contains__ returns False for non-string keys.""" + self.assertNotIn(42, self.info) + self.assertNotIn(None, self.info) + def test_to_dict(self): d = self.info.to_dict() self.assertIsInstance(d, dict) diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index cfad101e..440643a3 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -134,6 +134,11 @@ def test_get_multiple(self): self.assertEqual(results[0][0]["name"], "A") self.assertEqual(results[0][1]["name"], "B") + def test_empty_base_url_raises(self): + """DataverseClient raises ValueError when base_url is empty.""" + with self.assertRaises(ValueError): + DataverseClient("", self.mock_credential) + class TestCreateLookupField(unittest.TestCase): """Tests for client.tables.create_lookup_field convenience method.""" diff --git a/tests/unit/test_context_manager.py b/tests/unit/test_context_manager.py index 2f1aab9c..054f0395 100644 --- a/tests/unit/test_context_manager.py +++ b/tests/unit/test_context_manager.py @@ -268,6 +268,17 @@ def test_close_available_without_context_manager(self): with self.assertRaises(RuntimeError): client.records.create("account", {"name": "test"}) + def test_flush_cache_delegates_to_odata(self): + """flush_cache() calls _flush_cache on the OData client and returns its result.""" + client = DataverseClient(self.base_url, self.mock_credential) + client._odata = MagicMock() + client._odata._flush_cache.return_value = 3 + + result = client.flush_cache("picklist") + + client._odata._flush_cache.assert_called_once_with("picklist") + self.assertEqual(result, 3) + class TestExceptionHandling(unittest.TestCase): """Tests for exception handling during context manager usage.""" diff --git a/tests/unit/test_records_operations.py b/tests/unit/test_records_operations.py index 97da5a40..09df6869 100644 --- a/tests/unit/test_records_operations.py +++ b/tests/unit/test_records_operations.py @@ -64,6 +64,27 @@ def test_create_single_returns_string(self): self.assertNotIsInstance(result, list) self.assertEqual(result, "single-guid") + def test_create_single_non_string_return_raises(self): + """create() raises TypeError if _create returns a non-string.""" + self.client._odata._entity_set_from_schema_name.return_value = "accounts" + self.client._odata._create.return_value = 12345 + + with self.assertRaises(TypeError): + self.client.records.create("account", {"name": "Contoso"}) + + def test_create_bulk_non_list_return_raises(self): + """create() raises TypeError if _create_multiple returns a non-list.""" + self.client._odata._entity_set_from_schema_name.return_value = "accounts" + self.client._odata._create_multiple.return_value = "not-a-list" + + with self.assertRaises(TypeError): + self.client.records.create("account", [{"name": "Contoso"}]) + + def test_create_invalid_data_type_raises(self): + """create() raises TypeError if data is neither dict nor list.""" + with self.assertRaises(TypeError): + self.client.records.create("account", "invalid") + # ------------------------------------------------------------------ update def test_update_single(self): @@ -96,6 +117,16 @@ def test_update_paired(self): self.client._odata._update_by_ids.assert_called_once_with("account", ids, changes) + def test_update_single_non_dict_changes_raises(self): + """update() raises TypeError if ids is str but changes is not a dict.""" + with self.assertRaises(TypeError): + self.client.records.update("account", "guid-1", ["not", "a", "dict"]) + + def test_update_invalid_ids_type_raises(self): + """update() raises TypeError if ids is neither str nor list.""" + with self.assertRaises(TypeError): + self.client.records.update("account", 12345, {"name": "X"}) + # ------------------------------------------------------------------ delete def test_delete_single(self): @@ -137,6 +168,16 @@ def test_delete_empty_list(self): self.client._odata._delete_multiple.assert_not_called() self.assertIsNone(result) + def test_delete_invalid_ids_type_raises(self): + """delete() raises TypeError if ids is neither str nor list.""" + with self.assertRaises(TypeError): + self.client.records.delete("account", 12345) + + def test_delete_list_with_non_string_guids_raises(self): + """delete() raises TypeError if the ids list contains non-string entries.""" + with self.assertRaises(TypeError): + self.client.records.delete("account", ["valid-guid", 42]) + # --------------------------------------------------------------------- get def test_get_single(self): @@ -158,6 +199,11 @@ def test_get_single_with_query_params_raises(self): with self.assertRaises(ValueError): self.client.records.get("account", "guid-1", filter="statecode eq 0") + def test_get_non_string_record_id_raises(self): + """get() raises TypeError if record_id is not a string.""" + with self.assertRaises(TypeError): + self.client.records.get("account", 12345) + def test_get_paginated(self): """get() without record_id should yield pages of Record objects.""" page_1 = [{"accountid": "1", "name": "A"}]