Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions packages/google-auth/google/auth/_agent_identity_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

from google.auth import environment_vars
from google.auth import exceptions
from typing import Any


_LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -62,7 +63,7 @@ def _is_certificate_file_ready(path):
return path and os.path.exists(path) and os.path.getsize(path) > 0


def get_agent_identity_certificate_path():
def get_agent_identity_certificate_path() -> str | None:
"""Gets the certificate path from the certificate config file.

The path to the certificate config file is read from the
Expand Down Expand Up @@ -127,7 +128,7 @@ def get_agent_identity_certificate_path():
)


def get_and_parse_agent_identity_certificate():
def get_and_parse_agent_identity_certificate() -> Any | None:
"""Gets and parses the agent identity certificate if not opted out.

Checks if the user has opted out of certificate-bound tokens. If not,
Expand Down Expand Up @@ -158,7 +159,7 @@ def get_and_parse_agent_identity_certificate():
return parse_certificate(cert_bytes)


def parse_certificate(cert_bytes):
def parse_certificate(cert_bytes: bytes):
"""Parses a PEM-encoded certificate.

Args:
Expand Down Expand Up @@ -212,7 +213,7 @@ def _is_agent_identity_certificate(cert):
raise ImportError(CRYPTOGRAPHY_NOT_FOUND_ERROR) from e


def calculate_certificate_fingerprint(cert):
def calculate_certificate_fingerprint(cert: Any) -> str:
"""Calculates the URL-encoded, unpadded, base64-encoded SHA256 hash of a
DER-encoded certificate.

Expand All @@ -239,7 +240,7 @@ def calculate_certificate_fingerprint(cert):
raise ImportError(CRYPTOGRAPHY_NOT_FOUND_ERROR) from e


def should_request_bound_token(cert):
def should_request_bound_token(cert: Any) -> bool:
"""Determines if a bound token should be requested.

This is based on the GOOGLE_API_PREVENT_AGENT_TOKEN_SHARING_FOR_GCP_SERVICES
Expand All @@ -262,7 +263,7 @@ def should_request_bound_token(cert):
return is_agent_cert and is_opted_in


def get_cached_cert_fingerprint(cached_cert):
def get_cached_cert_fingerprint(cached_cert: bytes | None) -> str:
"""Returns the fingerprint of the cached certificate."""
if cached_cert:
cert_obj = parse_certificate(cached_cert)
Expand Down
21 changes: 13 additions & 8 deletions packages/google-auth/google/auth/_cache.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,32 +13,37 @@
# limitations under the License.

from collections import OrderedDict
from typing import TypeVar

_Key = TypeVar("_Key", bound=Hashable)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The type hint Hashable is used but not imported. It should be imported from collections.abc.

Suggested change
_Key = TypeVar("_Key", bound=Hashable)
_Key = TypeVar("_Key", bound=collections.abc.Hashable)

_Value = TypeVar("_Value")
_T = TypeVar("_T")


class LRUCache(dict):
def __init__(self, maxsize):
def __init__(self, maxsize: int) -> None:
super().__init__()
self._order = OrderedDict()
self.maxsize = maxsize

def clear(self):
def clear(self) -> None:
super().clear()
self._order.clear()

def get(self, key, default=None):
def get(self, key: _Key, default: _T=None) -> _Value | _T:
try:
value = super().__getitem__(key)
self._update(key)
return value
except KeyError:
return default

def __getitem__(self, key):
def __getitem__(self, key: _Key) -> _Value:
value = super().__getitem__(key)
self._update(key)
return value

def __setitem__(self, key, value):
def __setitem__(self, key: _Key, value: _Value) -> None:
maxsize = self.maxsize
if maxsize <= 0:
return
Expand All @@ -48,16 +53,16 @@ def __setitem__(self, key, value):
super().__setitem__(key, value)
self._update(key)

def __delitem__(self, key):
def __delitem__(self, key: _Key) -> None:
super().__delitem__(key)
del self._order[key]

def popitem(self):
def popitem(self) -> tuple[_Key, _Value]:
"""Remove and return the least recently used key-value pair."""
key, _ = self._order.popitem(last=False)
return key, super().pop(key)

def _update(self, key):
def _update(self, key: _Key) -> None:
try:
self._order.move_to_end(key)
except KeyError:
Expand Down
8 changes: 4 additions & 4 deletions packages/google-auth/google/auth/_cloud_sdk.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
)


def get_config_path():
def get_config_path() -> str:
"""Returns the absolute path the the Cloud SDK's configuration directory.

Returns:
Expand Down Expand Up @@ -70,7 +70,7 @@ def get_config_path():
return os.path.join(drive, "\\", _CONFIG_DIRECTORY)


def get_application_default_credentials_path():
def get_application_default_credentials_path() -> str:
"""Gets the path to the application default credentials file.

The path may or may not exist.
Expand All @@ -89,7 +89,7 @@ def _run_subprocess_ignore_stderr(command):
return output


def get_project_id():
def get_project_id() -> str | None:
"""Gets the project ID from the Cloud SDK.

Returns:
Expand All @@ -114,7 +114,7 @@ def get_project_id():
return None


def get_auth_access_token(account=None):
def get_auth_access_token(account: str | None=None) -> str:
"""Load user access token with the ``gcloud auth print-access-token`` command.

Args:
Expand Down
7 changes: 5 additions & 2 deletions packages/google-auth/google/auth/_credentials_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
import inspect

from google.auth import credentials
from google.auth.transport import Request as _Request, Request as _Request
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Duplicate import of Request as _Request. Remove duplicate lines of code to keep the codebase clean and avoid redundancy.

References
  1. Remove duplicate lines of code, especially duplicate assertions in tests, to keep the codebase clean and avoid redundancy.

from collections.abc import Mapping, Sequence
from google.auth.credentials import AnonymousCredentials, Credentials, CredentialsWithQuotaProject, ReadOnlyScoped, Scoped, Signing


class Credentials(credentials.Credentials, metaclass=abc.ABCMeta):
Expand All @@ -41,7 +44,7 @@ class Credentials(credentials.Credentials, metaclass=abc.ABCMeta):
with modifications such as :meth:`ScopedCredentials.with_scopes`.
"""

async def before_request(self, request, method, url, headers):
async def before_request(self, request: _Request, method: str, url: str, headers: Mapping[str, str]) -> None:
"""Performs credential-specific before request logic.

Refreshes the credentials if necessary, then calls :meth:`apply` to
Expand Down Expand Up @@ -141,7 +144,7 @@ class Scoped(credentials.Scoped):
"""


def with_scopes_if_required(credentials, scopes):
def with_scopes_if_required(credentials: Credentials, scopes: Sequence[str]) -> Credentials:
"""Creates a copy of the credentials with scopes if scoping is required.

This helper function is useful when you do not know (or care to know) the
Expand Down
7 changes: 5 additions & 2 deletions packages/google-auth/google/auth/_credentials_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
import abc

from google.auth import _helpers
from google.auth.transport import Request as _TransportRequest
from collections.abc import Coroutine
from typing import Any


class _BaseCredentials(metaclass=abc.ABCMeta):
Expand All @@ -43,11 +46,11 @@ class _BaseCredentials(metaclass=abc.ABCMeta):
authenticated requests.
"""

def __init__(self):
def __init__(self) -> None:
self.token = None

@abc.abstractmethod
def refresh(self, request):
def refresh(self, request: _TransportRequest) -> None | Coroutine[Any, Any, None]:
"""Refreshes the access token.

Args:
Expand Down
14 changes: 8 additions & 6 deletions packages/google-auth/google/auth/_default.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,13 @@
import json
import logging
import os
from typing import Optional, Sequence, TYPE_CHECKING
from typing import Any, Optional, Sequence, TYPE_CHECKING
import warnings

from google.auth import environment_vars
from google.auth import exceptions
import collections.abc
from google.auth.api_key import Credentials as _ApiKeyCredentials

if TYPE_CHECKING: # pragma: NO COVER
import google.auth.credentials.Credentials # type: ignore
Expand Down Expand Up @@ -128,8 +130,8 @@ def _warn_about_generic_load_method(method_name): # pragma: NO COVER


def load_credentials_from_file(
filename, scopes=None, default_scopes=None, quota_project_id=None, request=None
):
filename: str, scopes: collections.abc.Sequence[str] | None=None, default_scopes: collections.abc.Sequence[str] | None=None, quota_project_id: str | None=None, request: Request | None=None
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The type hint Request is used but not imported in this file. It should be imported from google.auth.transport.

) -> tuple[Credentials, str | None]:
"""Loads Google credentials from a file.

The credentials file must be a service account key, stored authorized
Expand Down Expand Up @@ -193,8 +195,8 @@ def load_credentials_from_file(


def load_credentials_from_dict(
info, scopes=None, default_scopes=None, quota_project_id=None, request=None
):
info: collections.abc.Mapping[str, Any], scopes: collections.abc.Sequence[str] | None=None, default_scopes: collections.abc.Sequence[str] | None=None, quota_project_id: str | None=None, request: Request | None=None
) -> tuple[Credentials, str | None]:
"""Loads Google credentials from a dict.

The credentials file must be a service account key, stored authorized
Expand Down Expand Up @@ -568,7 +570,7 @@ def _get_gdch_service_account_credentials(filename, info):
return credentials, info.get("project")


def get_api_key_credentials(key):
def get_api_key_credentials(key: str) -> _ApiKeyCredentials:
"""Return credentials with the given API key."""
from google.auth import api_key

Expand Down
7 changes: 5 additions & 2 deletions packages/google-auth/google/auth/_default_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@
from google.auth import _default
from google.auth import environment_vars
from google.auth import exceptions
from google.auth.transport import Request as _Request
from collections.abc import Sequence
from google.auth.credentials import Credentials


def load_credentials_from_file(filename, scopes=None, quota_project_id=None):
def load_credentials_from_file(filename: str, scopes: Sequence[str] | None=None, quota_project_id: str | None=None) -> tuple[Credentials, str | None]:
"""Loads Google credentials from a file.

The credentials file must be a service account key or stored authorized
Expand Down Expand Up @@ -178,7 +181,7 @@ def _get_gce_credentials(request=None):
return _default._get_gce_credentials(request)


def default_async(scopes=None, request=None, quota_project_id=None):
def default_async(scopes: Sequence[str] | None=None, request: _Request | None=None, quota_project_id: str | None=None) -> tuple[Credentials, str | None]:
"""Gets the default credentials for the current environment.

`Application Default Credentials`_ provides an easy way to obtain
Expand Down
26 changes: 13 additions & 13 deletions packages/google-auth/google/auth/_exponential_backoff.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,11 @@ class _BaseExponentialBackoff:

def __init__(
self,
total_attempts=_DEFAULT_RETRY_TOTAL_ATTEMPTS,
initial_wait_seconds=_DEFAULT_INITIAL_INTERVAL_SECONDS,
randomization_factor=_DEFAULT_RANDOMIZATION_FACTOR,
multiplier=_DEFAULT_MULTIPLIER,
):
total_attempts: int=_DEFAULT_RETRY_TOTAL_ATTEMPTS,
initial_wait_seconds: float=_DEFAULT_INITIAL_INTERVAL_SECONDS,
randomization_factor: float=_DEFAULT_RANDOMIZATION_FACTOR,
multiplier: float=_DEFAULT_MULTIPLIER,
) -> None:
if total_attempts < 1:
raise exceptions.InvalidValue(
f"total_attempts must be greater than or equal to 1 but was {total_attempts}"
Expand All @@ -85,12 +85,12 @@ def __init__(
self._backoff_count = 0

@property
def total_attempts(self):
def total_attempts(self) -> int:
"""The total amount of backoff attempts that will be made."""
return self._total_attempts

@property
def backoff_count(self):
def backoff_count(self) -> int:
"""The current amount of backoff attempts that have been made."""
return self._backoff_count

Expand All @@ -113,14 +113,14 @@ class ExponentialBackoff(_BaseExponentialBackoff):
perform requests with exponential backoff.
"""

def __init__(self, *args, **kwargs):
def __init__(self, *args, **kwargs) -> None:
super(ExponentialBackoff, self).__init__(*args, **kwargs)

def __iter__(self):
def __iter__(self) -> "ExponentialBackoff":
self._reset()
return self

def __next__(self):
def __next__(self) -> int:
if self._backoff_count >= self._total_attempts:
raise StopIteration
self._backoff_count += 1
Expand All @@ -141,14 +141,14 @@ class AsyncExponentialBackoff(_BaseExponentialBackoff):
perform async requests with exponential backoff.
"""

def __init__(self, *args, **kwargs):
def __init__(self, *args, **kwargs) -> None:
super(AsyncExponentialBackoff, self).__init__(*args, **kwargs)

def __aiter__(self):
def __aiter__(self) -> "AsyncExponentialBackoff":
self._reset()
return self

async def __anext__(self):
async def __anext__(self) -> int:
if self._backoff_count >= self._total_attempts:
raise StopAsyncIteration
self._backoff_count += 1
Expand Down
Loading
Loading