Skip to content

Commit fe2b6b9

Browse files
committed
(WIP) improve pyarrow schema detection
Add more pyarrow types, convert to pyarrow only the columns the schema could not be detected for, etc.
1 parent e462037 commit fe2b6b9

File tree

1 file changed

+72
-30
lines changed

1 file changed

+72
-30
lines changed

bigquery/google/cloud/bigquery/_pandas_helpers.py

Lines changed: 72 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -110,13 +110,39 @@ def pyarrow_timestamp():
110110
"TIME": pyarrow_time,
111111
"TIMESTAMP": pyarrow_timestamp,
112112
}
113-
ARROW_SCALARS_TO_BQ = {
114-
arrow_type(): bq_type # TODO: explain wht calling arrow_type()
115-
for bq_type, arrow_type in BQ_TO_ARROW_SCALARS.items()
113+
ARROW_SCALAR_IDS_TO_BQ = {
114+
# https://arrow.apache.org/docs/python/api/datatypes.html#type-classes
115+
pyarrow.bool_().id: "BOOLEAN",
116+
pyarrow.int8().id: "INT64",
117+
pyarrow.int16().id: "INT64",
118+
pyarrow.int32().id: "INT64",
119+
pyarrow.int64().id: "INT64",
120+
pyarrow.uint8().id: "INT64",
121+
pyarrow.uint16().id: "INT64",
122+
pyarrow.uint32().id: "INT64",
123+
pyarrow.uint64().id: "INT64",
124+
pyarrow.float16().id: "FLOAT64",
125+
pyarrow.float32().id: "FLOAT64",
126+
pyarrow.float64().id: "FLOAT64",
127+
pyarrow.time32("ms").id: "TIME",
128+
pyarrow.time64("ns").id: "TIME",
129+
pyarrow.timestamp("ns").id: "TIMESTAMP",
130+
pyarrow.date32().id: "DATE",
131+
pyarrow.date64().id: "DATETIME", # because millisecond resolution
132+
pyarrow.binary().id: "BYTES",
133+
pyarrow.string().id: "STRING", # also alias for pyarrow.utf8()
134+
pyarrow.decimal128(
135+
38, scale=9
136+
).id: "NUMERIC", # TODO: what scale and precision?
137+
# ... does not matter, as only the type is important?
116138
}
139+
# TODO: what about geography? represented as string, but that is already mapped
140+
# to the STRING type
141+
# TODO: add additional unit tests covering these types
142+
117143
else: # pragma: NO COVER
118144
BQ_TO_ARROW_SCALARS = {} # pragma: NO COVER
119-
ARROW_SCALARS_TO_BQ = {} # pragma: NO_COVER
145+
ARROW_SCALAR_IDS_TO_BQ = {} # pragma: NO_COVER
120146

121147

122148
def bq_to_arrow_struct_data_type(field):
@@ -279,6 +305,7 @@ def dataframe_to_bq_schema(dataframe, bq_schema):
279305

280306
# Otherwise, try to automatically determine the type based on the
281307
# pandas dtype.
308+
# TODO: make a function for Arrow type + field name -> BQ field
282309
bq_type = _PANDAS_DTYPE_TO_BQ.get(dtype.name)
283310
if not bq_type:
284311
warnings.warn(u"Unable to determine type of column '{}'.".format(column))
@@ -300,39 +327,54 @@ def dataframe_to_bq_schema(dataframe, bq_schema):
300327
if any(field.field_type is None for field in bq_schema_out):
301328
if not pyarrow:
302329
return None # We cannot detect the schema in full.
330+
bq_schema_out = _currate_schema(dataframe, bq_schema_out)
331+
332+
return tuple(bq_schema_out)
303333

304-
arrow_table = dataframe_to_arrow(dataframe, bq_schema_out)
305-
arrow_schema_index = {field.name: field.type for field in arrow_table}
306334

307-
currated_schema = []
308-
for schema_field in bq_schema_out:
309-
if schema_field.field_type is not None:
310-
currated_schema.append(schema_field)
311-
continue
335+
def _currate_schema(dataframe, current_bq_schema):
336+
"""TODO: docstring... and tests... and explain
312337
313-
detected_type = ARROW_SCALARS_TO_BQ.get(
314-
arrow_schema_index.get(schema_field.name)
315-
)
316-
if detected_type is None:
317-
warnings.warn(
318-
u"Pyarrow could not determine the type of column '{}'.".format(
319-
schema_field.name
320-
)
338+
and that it requires pyarrow to run
339+
"""
340+
arrow_tables = {}
341+
342+
for field in current_bq_schema:
343+
if field.field_type is None:
344+
arrow_table = pyarrow.array(dataframe[field.name])
345+
arrow_tables[field.name] = arrow_table
346+
347+
arrow_schema_index = {
348+
field_name: field.type for field_name, field in arrow_tables.items()
349+
}
350+
351+
currated_schema = []
352+
for schema_field in current_bq_schema:
353+
if schema_field.field_type is not None:
354+
currated_schema.append(schema_field)
355+
continue
356+
357+
detected_type = ARROW_SCALAR_IDS_TO_BQ.get(
358+
arrow_schema_index.get(schema_field.name).id
359+
)
360+
if detected_type is None:
361+
warnings.warn(
362+
u"Pyarrow could not determine the type of column '{}'.".format(
363+
schema_field.name
321364
)
322-
return None
323-
324-
new_field = schema.SchemaField(
325-
name=schema_field.name,
326-
field_type=detected_type,
327-
mode=schema_field.mode,
328-
description=schema_field.description,
329-
fields=schema_field.fields,
330365
)
331-
currated_schema.append(new_field)
366+
return None
332367

333-
bq_schema_out = currated_schema
368+
new_field = schema.SchemaField(
369+
name=schema_field.name,
370+
field_type=detected_type,
371+
mode=schema_field.mode,
372+
description=schema_field.description,
373+
fields=schema_field.fields,
374+
)
375+
currated_schema.append(new_field)
334376

335-
return tuple(bq_schema_out)
377+
return currated_schema
336378

337379

338380
def dataframe_to_arrow(dataframe, bq_schema):

0 commit comments

Comments
 (0)