Skip to content
6 changes: 4 additions & 2 deletions bigframes/core/blocks.py
Original file line number Diff line number Diff line change
Expand Up @@ -2354,7 +2354,7 @@ def is_monotonic_decreasing(
return self._is_monotonic(column_id, increasing=False)

def to_sql_query(
self, include_index: bool
self, include_index: bool, enable_cache: bool = True
) -> typing.Tuple[str, list[str], list[Label]]:
"""
Compiles this DataFrame's expression tree to SQL, optionally
Expand Down Expand Up @@ -2388,7 +2388,9 @@ def to_sql_query(
# the BigQuery unicode column name feature?
substitutions[old_id] = new_id

sql = self.session._to_sql(array_value, col_id_overrides=substitutions)
sql = self.session._to_sql(
array_value, col_id_overrides=substitutions, enable_cache=enable_cache
)
return (
sql,
new_ids[: len(idx_labels)],
Expand Down
8 changes: 6 additions & 2 deletions bigframes/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ def guarded_meth(df: DataFrame, *args, **kwargs):
@log_adapter.class_logger
class DataFrame(vendored_pandas_frame.DataFrame):
__doc__ = vendored_pandas_frame.DataFrame.__doc__
# internal flag to disable cache at all
_disable_cache_override: bool = False

def __init__(
self,
Expand Down Expand Up @@ -367,7 +369,7 @@ def astype(
return self._apply_unary_op(ops.AsTypeOp(to_type=dtype))

def _to_sql_query(
self, include_index: bool
self, include_index: bool, enable_cache: bool = True
) -> Tuple[str, list[str], list[blocks.Label]]:
"""Compiles this DataFrame's expression tree to SQL, optionally
including index columns.
Expand All @@ -381,7 +383,7 @@ def _to_sql_query(
If include_index is set to False, index_column_id_list and index_column_label_list
return empty lists.
"""
return self._block.to_sql_query(include_index)
return self._block.to_sql_query(include_index, enable_cache=enable_cache)

@property
def sql(self) -> str:
Expand Down Expand Up @@ -3628,6 +3630,8 @@ def _cached(self, *, force: bool = False) -> DataFrame:
No-op if the dataframe represents a trivial transformation of an existing materialization.
Force=True is used for BQML integration where need to copy data rather than use snapshot.
"""
if self._disable_cache_override:
return self
self._block.cached(force=force)
return self

Expand Down
51 changes: 44 additions & 7 deletions bigframes/session/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@
import bigframes.core.indexes
import bigframes.dataframe as dataframe
import bigframes.series
import bigframes.streaming.dataframe as streaming_dataframe

_BIGFRAMES_DEFAULT_CONNECTION_ID = "bigframes-default-connection"

Expand Down Expand Up @@ -749,6 +750,38 @@ def read_gbq_table(
filters=filters,
)

def read_gbq_table_streaming(
self, table: str
) -> streaming_dataframe.StreamingDataFrame:
"""Turn a BigQuery table into a StreamingDataFrame.

Note: The bigframes.streaming module is a preview feature, and subject to change.

**Examples:**

>>> import bigframes.streaming as bst
>>> import bigframes.pandas as bpd
>>> bpd.options.display.progress_bar = None

>>> sdf = bst.read_gbq_table("bigquery-public-data.ml_datasets.penguins")
"""
warnings.warn(
"The bigframes.streaming module is a preview feature, and subject to change.",
stacklevel=1,
category=bigframes.exceptions.PreviewWarning,
)

import bigframes.streaming.dataframe as streaming_dataframe

df = self._read_gbq_table(
table,
api_name="read_gbq_table_steaming",
enable_snapshot=False,
index_col=bigframes.enums.DefaultIndexKind.NULL,
)

return streaming_dataframe.StreamingDataFrame._from_table_df(df)

def _read_gbq_table(
self,
query: str,
Expand All @@ -759,6 +792,7 @@ def _read_gbq_table(
api_name: str,
use_cache: bool = True,
filters: third_party_pandas_gbq.FiltersType = (),
enable_snapshot: bool = True,
) -> dataframe.DataFrame:
import bigframes.dataframe as dataframe

Expand Down Expand Up @@ -877,7 +911,7 @@ def _read_gbq_table(
else (*columns, *[col for col in index_cols if col not in columns])
)

supports_snapshot = bf_read_gbq_table.validate_table(
enable_snapshot = enable_snapshot and bf_read_gbq_table.validate_table(
self.bqclient, table_ref, all_columns, time_travel_timestamp, filter_str
)

Expand Down Expand Up @@ -905,7 +939,7 @@ def _read_gbq_table(
table,
schema=schema,
predicate=filter_str,
at_time=time_travel_timestamp if supports_snapshot else None,
at_time=time_travel_timestamp if enable_snapshot else None,
primary_key=index_cols if is_index_unique else (),
session=self,
)
Expand Down Expand Up @@ -2056,17 +2090,20 @@ def _to_sql(
offset_column: typing.Optional[str] = None,
col_id_overrides: typing.Mapping[str, str] = {},
ordered: bool = False,
enable_cache: bool = True,
) -> str:
if offset_column:
array_value = array_value.promote_offsets(offset_column)
node_w_cached = self._with_cached_executions(array_value.node)
node = (
self._with_cached_executions(array_value.node)
if enable_cache
else array_value.node
)
if ordered:
return self._compiler.compile_ordered(
node_w_cached, col_id_overrides=col_id_overrides
node, col_id_overrides=col_id_overrides
)
return self._compiler.compile_unordered(
node_w_cached, col_id_overrides=col_id_overrides
)
return self._compiler.compile_unordered(node, col_id_overrides=col_id_overrides)

def _get_table_size(self, destination_table):
table = self.bqclient.get_table(destination_table)
Expand Down
Loading