-
Notifications
You must be signed in to change notification settings - Fork 308
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
BigQuery: make jobs awaitable #18
Comments
@dkapitan Re: the We might be able to tweak the Also, for your example above, your callback isn't doing anything with |
@tseaver thanks for clarifying. I have read up more on |
@tseaver here's traceback from the AttributeError
|
@tseaver
Rather than using Downside is I need to keep track of all the Any suggestions are welcome. Would be nice if at some point in the future, Google's api |
@dkapitan Thanks for the follow-up. I agree that it would be ideal if jobs could be You could simplify your example code using a set, e.g.: from time import sleep
query1 = """
SELECT
language.name,
average(language.bytes)
FROM `bigquery-public-data.github_repos.languages`
, UNNEST(language) AS language
GROUP BY language.name"""
query2 = 'SELECT 2'
queries = [query_1, query_2]
awaiting_jobs = set()
def callback(future):
awaiting_jobs.discard(future.job_id)
for query in queries:
job = bq.query(query)
awaiting_jobs.add(job.job_id)
job.add_done_callback(callback)
while awaiting_jobs:
print('waiting for jobs to finish ... sleeping for 1s')
sleep(1)
print('all jobs done, do your stuff') |
@tseaver |
@tseaver Thanks for the simplify example. Could you fix two typos?
|
@northtree Thanks for catching those: updated. |
How do you get the rows from your different queries? Won't this block on job.result() What am I missing? |
@sambcom I understand that generically speaking, you could write query results to (temporary) tables in BigQuery, so this should not be a blocking issue. |
I just ran into this problem myself and found out that even though the job itself is "asynchronous" there are actually two places where synchronous I/O is happening. From the example usage on https://googleapis.dev/python/bigquery/latest/index.html from google.cloud import bigquery
client = bigquery.Client()
# Perform a query.
QUERY = (
'SELECT name FROM `bigquery-public-data.usa_names.usa_1910_2013` '
'WHERE state = "TX" '
'LIMIT 100')
query_job = client.query(QUERY) # API request
rows = query_job.result() # Waits for query to finish
for row in rows:
print(row.name)
Usually, blocking on As a workaround for now, I'm running all the BigQuery data fetching code in a separate thread to free up the event loop but it would be fantastic if the BigQuery API supported async I/O. Example workaround (code fragment): def get_data(client, query):
return client.query(query).result()
loop = asyncio.get_running_loop()
data = await loop.run_in_executor(None, get_data, client, query) |
We too have hundreds of load and dedupe queries running async. We do use From there we poke regularly the Stackdriver logging (using airflow) for the status of all in one API call. I wish |
We've identified EOL for new versions of the BigQuery client library as January 1, 2020 in googleapis/google-cloud-python#9036 We can revisit this request after that date. |
Wanted to bump this thread, I am currently trying to write async BigQuery jobs myself and it would be great if the job were |
Any news here? You've already dropped support for Python 2. |
still waiting here... do you have an approximate timeline for this? |
@yanhong-zhao-ef I'd love to have this feature, but it's going to take some design work to get right. In other Google Cloud libraries, asynchronous support is provided by a completely separate "AsyncClient". Since this BigQuery library is handwritten, I'd like to avoid having to maintain two copies of the same methods. |
I talked with some folks on Firestore who've implemented this. They suggest a trio of classes: async_batch, base_batch, batch https://github.com/googleapis/python-firestore/tree/master/google/cloud/firestore_v1 in order to avoid too much duplicated effort. |
I tried implementing this with copy_table, but I added "assert future.done()" as the first line in callback() and found that the assertion was failing. This must mean that the callback is getting executed before the job is actually done. Can you confirm this isn't the intended functionality? |
@jarednash0 Definitely not intended. It'd be worth filing a separate issue for this so that we can address it, ideally with the code example that reproduces it. |
Any news on this issue? It would be a very useful feature |
Would definitely appreciate this as well |
Any update from Google on |
This is probably not the best, but right now a simple way to get queries running synchronously is to chain the queries together in one mega(big)query haha. My clunky example:
|
AFAICT the above proposal isn't actually making the query awaitable though, is it? we want to run the query asynchronously, not synchronously. |
My bad, I thought my data was getting stuffed around because these bigquery processes were asynchronous already |
Gonna close this out as "Will not work", due to conflicting priorities. |
+1 for reopening! |
For awareness: This is an internship project and my internship ends in 2 weeks. I plan on implementing an AsyncClient for async_query_and_wait rpc, there has been other rpc's requested to be made asynchronous which I may not have time for. For future maintainers I will document my progress here, but please refer to the internal design doc on this feature which can be found at my internship documentation website go/kirnendra-internship RPCs to make async:
|
Amazing @kiraksi, that is awesome. But, The message I take from it is that the Google leadership simply doesn't care about supporting Python as a first-class citizen of the Google Cloud Developer Experience. |
@adamserafini Thank you for your input. |
New updates: there are some blocking issues in that in order to make a completely asynchronous RowIterator object for the query and query_and_wait methods, which is a child of HTTPIterator in google-api-core library, I would need an AsyncHTTPIterator, I have optioned this issue here: googleapis/python-api-core#627 and I may make this PR myself. However until then I will be focusing on the get_ methods that won't have this blocking issue. Additionally, there is current work on this but I would like this method exposed: googleapis/google-auth-library-python#613 |
Any news kiraksi? I guess that your internship has ended. Will the PR still go forward? Can you elaborate more on what we will see? You mention that your solution will not be completely asynchronous? |
Hey @tswast @chalmerlowe |
At this moment, this task is on hold. We will revisit it when we have manpower. I am not gonna close it so that it remains on the radar. |
Before the async libraries are ready, it seems the done callback is the best way to run multiple queries in parallel unless the developer is willing to do multi-threading. Can you please improve the documentation on add_done_callback? It is unclear what the argument to the callback is, and what the callback can do with it. It's a |
I have just red the entire thread and I'm not sure if I'm reading comments written by paid devs working for one of the wealthiest organizations in the world (certainly one of the top 3 cloud providers) or by a couple of people working on their spare time on their least important side project. I just refuse to believe that Google is capable of neglecting the official library for BigQuery in such a way. What a sorry state. 😞 |
I know BigQuery jobs are asynchronous by default. However, I am struggling to make my datapipeline async end-to-end.
Looking at this JS example, I thought it would be the most Pythonic to make a BigQuery job awaitable. However, I can't get that to work in Python i.e. errors when
await client.query(query)
. Looking at the source code, I don't see which method returns an awaitable object.I have little experience in writing async Python code and found this example that wraps jobs in a
async def coroutine
.The
google.api_core.operation.Operation
shows how to useadd_done_callback
to asynchronously wait for long-running operations. I have tried that, but the following yieldsAttributeError: 'QueryJob' object has no attribute '_condition'
:Given that jobs are already asynchronous, would it make sense to add a method that returns an awaitable?
Or am I missing something and is there an Pythonic way to use the BigQuery client with the async/await pattern?
The text was updated successfully, but these errors were encountered: