@@ -110,13 +110,39 @@ def pyarrow_timestamp():
110110 "TIME" : pyarrow_time ,
111111 "TIMESTAMP" : pyarrow_timestamp ,
112112 }
113- ARROW_SCALARS_TO_BQ = {
114- arrow_type (): bq_type # TODO: explain wht calling arrow_type()
115- for bq_type , arrow_type in BQ_TO_ARROW_SCALARS .items ()
113+ ARROW_SCALAR_IDS_TO_BQ = {
114+ # https://arrow.apache.org/docs/python/api/datatypes.html#type-classes
115+ pyarrow .bool_ ().id : "BOOLEAN" ,
116+ pyarrow .int8 ().id : "INT64" ,
117+ pyarrow .int16 ().id : "INT64" ,
118+ pyarrow .int32 ().id : "INT64" ,
119+ pyarrow .int64 ().id : "INT64" ,
120+ pyarrow .uint8 ().id : "INT64" ,
121+ pyarrow .uint16 ().id : "INT64" ,
122+ pyarrow .uint32 ().id : "INT64" ,
123+ pyarrow .uint64 ().id : "INT64" ,
124+ pyarrow .float16 ().id : "FLOAT64" ,
125+ pyarrow .float32 ().id : "FLOAT64" ,
126+ pyarrow .float64 ().id : "FLOAT64" ,
127+ pyarrow .time32 ("ms" ).id : "TIME" ,
128+ pyarrow .time64 ("ns" ).id : "TIME" ,
129+ pyarrow .timestamp ("ns" ).id : "TIMESTAMP" ,
130+ pyarrow .date32 ().id : "DATE" ,
131+ pyarrow .date64 ().id : "DATETIME" , # because millisecond resolution
132+ pyarrow .binary ().id : "BYTES" ,
133+ pyarrow .string ().id : "STRING" , # also alias for pyarrow.utf8()
134+ pyarrow .decimal128 (
135+ 38 , scale = 9
136+ ).id : "NUMERIC" , # TODO: what scale and precision?
137+ # ... does not matter, as only the type is important?
116138 }
139+ # TODO: what about geography? represented as string, but that is already mapped
140+ # to the STRING type
141+ # TODO: add additional unit tests covering these types
142+
117143else : # pragma: NO COVER
118144 BQ_TO_ARROW_SCALARS = {} # pragma: NO COVER
119- ARROW_SCALARS_TO_BQ = {} # pragma: NO_COVER
145+ ARROW_SCALAR_IDS_TO_BQ = {} # pragma: NO_COVER
120146
121147
122148def bq_to_arrow_struct_data_type (field ):
@@ -279,6 +305,7 @@ def dataframe_to_bq_schema(dataframe, bq_schema):
279305
280306 # Otherwise, try to automatically determine the type based on the
281307 # pandas dtype.
308+ # TODO: make a function for Arrow type + field name -> BQ field
282309 bq_type = _PANDAS_DTYPE_TO_BQ .get (dtype .name )
283310 if not bq_type :
284311 warnings .warn (u"Unable to determine type of column '{}'." .format (column ))
@@ -300,39 +327,54 @@ def dataframe_to_bq_schema(dataframe, bq_schema):
300327 if any (field .field_type is None for field in bq_schema_out ):
301328 if not pyarrow :
302329 return None # We cannot detect the schema in full.
330+ bq_schema_out = _currate_schema (dataframe , bq_schema_out )
331+
332+ return tuple (bq_schema_out )
303333
304- arrow_table = dataframe_to_arrow (dataframe , bq_schema_out )
305- arrow_schema_index = {field .name : field .type for field in arrow_table }
306334
307- currated_schema = []
308- for schema_field in bq_schema_out :
309- if schema_field .field_type is not None :
310- currated_schema .append (schema_field )
311- continue
335+ def _currate_schema (dataframe , current_bq_schema ):
336+ """TODO: docstring... and tests... and explain
312337
313- detected_type = ARROW_SCALARS_TO_BQ .get (
314- arrow_schema_index .get (schema_field .name )
315- )
316- if detected_type is None :
317- warnings .warn (
318- u"Pyarrow could not determine the type of column '{}'." .format (
319- schema_field .name
320- )
338+ and that it requires pyarrow to run
339+ """
340+ arrow_tables = {}
341+
342+ for field in current_bq_schema :
343+ if field .field_type is None :
344+ arrow_table = pyarrow .array (dataframe [field .name ])
345+ arrow_tables [field .name ] = arrow_table
346+
347+ arrow_schema_index = {
348+ field_name : field .type for field_name , field in arrow_tables .items ()
349+ }
350+
351+ currated_schema = []
352+ for schema_field in current_bq_schema :
353+ if schema_field .field_type is not None :
354+ currated_schema .append (schema_field )
355+ continue
356+
357+ detected_type = ARROW_SCALAR_IDS_TO_BQ .get (
358+ arrow_schema_index .get (schema_field .name ).id
359+ )
360+ if detected_type is None :
361+ warnings .warn (
362+ u"Pyarrow could not determine the type of column '{}'." .format (
363+ schema_field .name
321364 )
322- return None
323-
324- new_field = schema .SchemaField (
325- name = schema_field .name ,
326- field_type = detected_type ,
327- mode = schema_field .mode ,
328- description = schema_field .description ,
329- fields = schema_field .fields ,
330365 )
331- currated_schema . append ( new_field )
366+ return None
332367
333- bq_schema_out = currated_schema
368+ new_field = schema .SchemaField (
369+ name = schema_field .name ,
370+ field_type = detected_type ,
371+ mode = schema_field .mode ,
372+ description = schema_field .description ,
373+ fields = schema_field .fields ,
374+ )
375+ currated_schema .append (new_field )
334376
335- return tuple ( bq_schema_out )
377+ return currated_schema
336378
337379
338380def dataframe_to_arrow (dataframe , bq_schema ):
0 commit comments