Skip to content

[Bug]: WriteToBigQuery raises AttributeError: 'ExternalTransform' object has no attribute '_type_hints'. Did you mean: 'get_type_hints'? #37289

@cfowler1woolworths

Description

@cfowler1woolworths

What happened?

Version details

Python 3.13.9
Java OpenJDK Runtime Environment Temurin-17.0.17+10 (build 17.0.17+10)
Beam 2.70.0

poetry run pip show apache-beam
Name: apache-beam
Version: 2.70.0
Summary: Apache Beam SDK for Python
Home-page: https://beam.apache.org
Author: Apache Software Foundation
Author-email: [email protected]
License: Apache License, Version 2.0
Location: C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages
Requires: beartype, cryptography, fastavro, fasteners, grpcio, httplib2, jsonpickle, numpy, objsize, packaging, proto-plus, protobuf, pyarrow, pyarrow-hotfix, pymongo, python-dateutil, pytz, pyyaml, requests, sortedcontainers, typing-extensions, zstandard
Required-by: 
Traceback ``` Traceback (most recent call last): File "", line 198, in _run_module_as_main File "", line 88, in _run_code File "C:\Users\1375986\Documents\Projects\rerankers\cf-realtime-reranking\click_aggregator_service\click_aggregator\pipeline.py", line 234, in run_pipeline( ~~~~~~~~~~~~^ subscription="projects/your-project/subscriptions/your-subscription", ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ ...<5 lines>... ), ^^ ) ^ File "C:\Users\1375986\Documents\Projects\rerankers\cf-realtime-reranking\click_aggregator_service\click_aggregator\pipeline.py", line 229, in run_pipeline _ = all_errors | "Write to Dead Letter" >> ErrorMsgSink.create_sink(dead_letter_table=dead_letter_table) ~~~~~~~~~~~^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ File "C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\pvalue.py", line 139, in __or__ return self.pipeline.apply(ptransform, self) ~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^ File "C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\pipeline.py", line 702, in apply return self.apply( ~~~~~~~~~~^ transform.transform, pvalueish, label or transform.label) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\pipeline.py", line 717, in apply return self.apply(transform, pvalueish) ~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^ File "C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\pipeline.py", line 797, in apply pvalueish_result = self.runner.apply(transform, pvalueish, self._options) File "C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\runners\runner.py", line 191, in apply return self.apply_PTransform(transform, input, options) ~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\runners\runner.py", line 195, in apply_PTransform return transform.expand(input) ~~~~~~~~~~~~~~~~^^^^^^^ File "C:\Users\1375986\Documents\Projects\rerankers\cf-realtime-reranking\click_aggregator_service\click_aggregator\error_msg_sink.py", line 69, in expand error_msgs ~~~~~~~~~~ | "Convert to Dict" >> beam.Map(lambda msg: msg.model_dump()) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | "Write Errors to BigQuery" ^~~~~~~~~~~~~~~~~~~~~~~~~~~~ >> WriteToBigQuery( ~~~~~~~~~~~~~~~~~~~ ...<6 lines>... ) ~ File "C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\pvalue.py", line 139, in __or__ return self.pipeline.apply(ptransform, self) ~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^ File "C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\pipeline.py", line 702, in apply return self.apply( ~~~~~~~~~~^ transform.transform, pvalueish, label or transform.label) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\pipeline.py", line 717, in apply return self.apply(transform, pvalueish) ~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^ File "C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\pipeline.py", line 797, in apply pvalueish_result = self.runner.apply(transform, pvalueish, self._options) File "C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\runners\runner.py", line 191, in apply return self.apply_PTransform(transform, input, options) ~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\runners\runner.py", line 195, in apply_PTransform return transform.expand(input) ~~~~~~~~~~~~~~~~^^^^^^^ File "C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\io\gcp\bigquery.py", line 2384, in expand return pcoll | StorageWriteToBigQuery( ~~~~~~^~~~~~~~~~~~~~~~~~~~~~~~~ table=self.table_reference, ~~~~~~~~~~~~~~~~~~~~~~~~~~~ ...<12 lines>... expansion_service=self.expansion_service) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ File "C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\pvalue.py", line 139, in __or__ return self.pipeline.apply(ptransform, self) ~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^ File "C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\pipeline.py", line 797, in apply pvalueish_result = self.runner.apply(transform, pvalueish, self._options) File "C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\runners\runner.py", line 191, in apply return self.apply_PTransform(transform, input, options) ~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\runners\runner.py", line 195, in apply_PTransform return transform.expand(input) ~~~~~~~~~~~~~~~~^^^^^^^ File "C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\io\gcp\bigquery.py", line 2740, in expand input_beam_rows ~~~~~~~~~~~~~~~ | SchemaAwareExternalTransform( ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ identifier=StorageWriteToBigQuery.IDENTIFIER, ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ...<15 lines>... })) ~~ File "C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\pvalue.py", line 139, in __or__ return self.pipeline.apply(ptransform, self) ~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^ File "C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\pipeline.py", line 797, in apply pvalueish_result = self.runner.apply(transform, pvalueish, self._options) File "C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\runners\runner.py", line 191, in apply return self.apply_PTransform(transform, input, options) ~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\runners\runner.py", line 195, in apply_PTransform return transform.expand(input) ~~~~~~~~~~~~~~~~^^^^^^^ File "C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\transforms\external.py", line 509, in expand return pcolls | self._payload_builder.identifier() >> ExternalTransform( ~~~~~~~^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ common_urns.schematransform_based_expand.urn, ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ payload_builder, ~~~~~~~~~~~~~~~~ expansion_service) ~~~~~~~~~~~~~~~~~~ File "C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\pvalue.py", line 139, in __or__ return self.pipeline.apply(ptransform, self) ~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^ File "C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\pipeline.py", line 702, in apply return self.apply( ~~~~~~~~~~^ transform.transform, pvalueish, label or transform.label) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\pipeline.py", line 717, in apply return self.apply(transform, pvalueish) ~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^ File "C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\pipeline.py", line 797, in apply pvalueish_result = self.runner.apply(transform, pvalueish, self._options) File "C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\runners\runner.py", line 191, in apply return self.apply_PTransform(transform, input, options) ~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\runners\runner.py", line 195, in apply_PTransform return transform.expand(input) ~~~~~~~~~~~~~~~~^^^^^^^ File "C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\transforms\external.py", line 808, in expand if self._type_hints.output_types: ^^^^^^^^^^^^^^^^ AttributeError: 'ExternalTransform' object has no attribute '_type_hints'. Did you mean: 'get_type_hints'? ```
Snippets
poetry run python -m click_aggregator_service.click_aggregator.pipeline \
	--runner DirectRunner \
	--streaming \
	--no_pipeline_type_check
if self.dead_letter_table:
            bq_schema: dict[str, Any] = {
                "fields": [
                    {"name": "job_name", "type": "STRING", "mode": "NULLABLE"},
                    {"name": "transform", "type": "STRING", "mode": "NULLABLE"},
                    {"name": "error", "type": "STRING", "mode": "NULLABLE"},
                    {"name": "data", "type": "STRING", "mode": "NULLABLE"},
                    {"name": "timestamp", "type": "TIMESTAMP", "mode": "NULLABLE"},
                    {"name": "id", "type": "STRING", "mode": "NULLABLE"},
                ]
            }

            # Write to BigQuery if table is provided
            _ = (
                error_msgs
                | "Convert to Dict" >> beam.Map(lambda msg: msg.model_dump())
                | "Write Errors to BigQuery"
                >> WriteToBigQuery(
                    table=self.dead_letter_table,
                    write_disposition=BigQueryDisposition.WRITE_APPEND,
                    create_disposition=BigQueryDisposition.CREATE_NEVER,
                    method=WriteToBigQuery.Method.STORAGE_WRITE_API,
                    use_at_least_once=True,
                    schema=cast(Any, bq_schema),
                )
            )

Attempting to use apache_beam.io.gcp.bigquery.WriteToBigQuery fails and raises the above exception.
Bypassing the failing check still causes pipeline failure, but on the Java side.
Also checked Beam v2.69.0, still failed.

Issue Priority

Priority: 2 (default / most bugs should be filed as P2)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Infrastructure
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions