-
Notifications
You must be signed in to change notification settings - Fork 4.5k
Open
Description
What happened?
Version details
Python 3.13.9
Java OpenJDK Runtime Environment Temurin-17.0.17+10 (build 17.0.17+10)
Beam 2.70.0
poetry run pip show apache-beam
Name: apache-beam
Version: 2.70.0
Summary: Apache Beam SDK for Python
Home-page: https://beam.apache.org
Author: Apache Software Foundation
Author-email: [email protected]
License: Apache License, Version 2.0
Location: C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages
Requires: beartype, cryptography, fastavro, fasteners, grpcio, httplib2, jsonpickle, numpy, objsize, packaging, proto-plus, protobuf, pyarrow, pyarrow-hotfix, pymongo, python-dateutil, pytz, pyyaml, requests, sortedcontainers, typing-extensions, zstandard
Required-by:
Traceback
``` Traceback (most recent call last): File "", line 198, in _run_module_as_main File "", line 88, in _run_code File "C:\Users\1375986\Documents\Projects\rerankers\cf-realtime-reranking\click_aggregator_service\click_aggregator\pipeline.py", line 234, in run_pipeline( ~~~~~~~~~~~~^ subscription="projects/your-project/subscriptions/your-subscription", ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ ...<5 lines>... ), ^^ ) ^ File "C:\Users\1375986\Documents\Projects\rerankers\cf-realtime-reranking\click_aggregator_service\click_aggregator\pipeline.py", line 229, in run_pipeline _ = all_errors | "Write to Dead Letter" >> ErrorMsgSink.create_sink(dead_letter_table=dead_letter_table) ~~~~~~~~~~~^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ File "C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\pvalue.py", line 139, in __or__ return self.pipeline.apply(ptransform, self) ~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^ File "C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\pipeline.py", line 702, in apply return self.apply( ~~~~~~~~~~^ transform.transform, pvalueish, label or transform.label) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\pipeline.py", line 717, in apply return self.apply(transform, pvalueish) ~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^ File "C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\pipeline.py", line 797, in apply pvalueish_result = self.runner.apply(transform, pvalueish, self._options) File "C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\runners\runner.py", line 191, in apply return self.apply_PTransform(transform, input, options) ~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\runners\runner.py", line 195, in apply_PTransform return transform.expand(input) ~~~~~~~~~~~~~~~~^^^^^^^ File "C:\Users\1375986\Documents\Projects\rerankers\cf-realtime-reranking\click_aggregator_service\click_aggregator\error_msg_sink.py", line 69, in expand error_msgs ~~~~~~~~~~ | "Convert to Dict" >> beam.Map(lambda msg: msg.model_dump()) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ | "Write Errors to BigQuery" ^~~~~~~~~~~~~~~~~~~~~~~~~~~~ >> WriteToBigQuery( ~~~~~~~~~~~~~~~~~~~ ...<6 lines>... ) ~ File "C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\pvalue.py", line 139, in __or__ return self.pipeline.apply(ptransform, self) ~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^ File "C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\pipeline.py", line 702, in apply return self.apply( ~~~~~~~~~~^ transform.transform, pvalueish, label or transform.label) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\pipeline.py", line 717, in apply return self.apply(transform, pvalueish) ~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^ File "C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\pipeline.py", line 797, in apply pvalueish_result = self.runner.apply(transform, pvalueish, self._options) File "C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\runners\runner.py", line 191, in apply return self.apply_PTransform(transform, input, options) ~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\runners\runner.py", line 195, in apply_PTransform return transform.expand(input) ~~~~~~~~~~~~~~~~^^^^^^^ File "C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\io\gcp\bigquery.py", line 2384, in expand return pcoll | StorageWriteToBigQuery( ~~~~~~^~~~~~~~~~~~~~~~~~~~~~~~~ table=self.table_reference, ~~~~~~~~~~~~~~~~~~~~~~~~~~~ ...<12 lines>... expansion_service=self.expansion_service) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ File "C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\pvalue.py", line 139, in __or__ return self.pipeline.apply(ptransform, self) ~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^ File "C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\pipeline.py", line 797, in apply pvalueish_result = self.runner.apply(transform, pvalueish, self._options) File "C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\runners\runner.py", line 191, in apply return self.apply_PTransform(transform, input, options) ~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\runners\runner.py", line 195, in apply_PTransform return transform.expand(input) ~~~~~~~~~~~~~~~~^^^^^^^ File "C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\io\gcp\bigquery.py", line 2740, in expand input_beam_rows ~~~~~~~~~~~~~~~ | SchemaAwareExternalTransform( ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ identifier=StorageWriteToBigQuery.IDENTIFIER, ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ...<15 lines>... })) ~~ File "C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\pvalue.py", line 139, in __or__ return self.pipeline.apply(ptransform, self) ~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^ File "C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\pipeline.py", line 797, in apply pvalueish_result = self.runner.apply(transform, pvalueish, self._options) File "C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\runners\runner.py", line 191, in apply return self.apply_PTransform(transform, input, options) ~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\runners\runner.py", line 195, in apply_PTransform return transform.expand(input) ~~~~~~~~~~~~~~~~^^^^^^^ File "C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\transforms\external.py", line 509, in expand return pcolls | self._payload_builder.identifier() >> ExternalTransform( ~~~~~~~^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ common_urns.schematransform_based_expand.urn, ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ payload_builder, ~~~~~~~~~~~~~~~~ expansion_service) ~~~~~~~~~~~~~~~~~~ File "C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\pvalue.py", line 139, in __or__ return self.pipeline.apply(ptransform, self) ~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^ File "C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\pipeline.py", line 702, in apply return self.apply( ~~~~~~~~~~^ transform.transform, pvalueish, label or transform.label) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\pipeline.py", line 717, in apply return self.apply(transform, pvalueish) ~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^ File "C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\pipeline.py", line 797, in apply pvalueish_result = self.runner.apply(transform, pvalueish, self._options) File "C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\runners\runner.py", line 191, in apply return self.apply_PTransform(transform, input, options) ~~~~~~~~~~~~~~~~~~~~~^^^^^^^^^^^^^^^^^^^^^^^^^^^ File "C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\runners\runner.py", line 195, in apply_PTransform return transform.expand(input) ~~~~~~~~~~~~~~~~^^^^^^^ File "C:\Users\1375986\AppData\Local\pypoetry\Cache\virtualenvs\cf-realtime-reranking-Y15eCGI9-py3.13\Lib\site-packages\apache_beam\transforms\external.py", line 808, in expand if self._type_hints.output_types: ^^^^^^^^^^^^^^^^ AttributeError: 'ExternalTransform' object has no attribute '_type_hints'. Did you mean: 'get_type_hints'? ```Snippets
poetry run python -m click_aggregator_service.click_aggregator.pipeline \
--runner DirectRunner \
--streaming \
--no_pipeline_type_check
if self.dead_letter_table:
bq_schema: dict[str, Any] = {
"fields": [
{"name": "job_name", "type": "STRING", "mode": "NULLABLE"},
{"name": "transform", "type": "STRING", "mode": "NULLABLE"},
{"name": "error", "type": "STRING", "mode": "NULLABLE"},
{"name": "data", "type": "STRING", "mode": "NULLABLE"},
{"name": "timestamp", "type": "TIMESTAMP", "mode": "NULLABLE"},
{"name": "id", "type": "STRING", "mode": "NULLABLE"},
]
}
# Write to BigQuery if table is provided
_ = (
error_msgs
| "Convert to Dict" >> beam.Map(lambda msg: msg.model_dump())
| "Write Errors to BigQuery"
>> WriteToBigQuery(
table=self.dead_letter_table,
write_disposition=BigQueryDisposition.WRITE_APPEND,
create_disposition=BigQueryDisposition.CREATE_NEVER,
method=WriteToBigQuery.Method.STORAGE_WRITE_API,
use_at_least_once=True,
schema=cast(Any, bq_schema),
)
)
Attempting to use apache_beam.io.gcp.bigquery.WriteToBigQuery fails and raises the above exception.
Bypassing the failing check still causes pipeline failure, but on the Java side.
Also checked Beam v2.69.0, still failed.
Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
Issue Components
- Component: Python SDK
- Component: Java SDK
- Component: Go SDK
- Component: Typescript SDK
- Component: IO connector
- Component: Beam YAML
- Component: Beam examples
- Component: Beam playground
- Component: Beam katas
- Component: Website
- Component: Infrastructure
- Component: Spark Runner
- Component: Flink Runner
- Component: Samza Runner
- Component: Twister2 Runner
- Component: Hazelcast Jet Runner
- Component: Google Cloud Dataflow Runner