filter_str = 'resource.type="dataflow_step" AND severity>=ERROR AND textPayload:"hotkey"'
Define the sink name:
Copy
Ask AI
sink_name = 'hotkey-sink'
Check if the sink exists:
Copy
Ask AI
sinks = service.sinks().list(projectId=project_id).execute()if sink_name in [sink['name'] for sink in sinks['sinks']]: print(f"Sink {sink_name} already exists.")else: print(f"Sink {sink_name} does not exist.")
If the sink does not exist, create it:
Copy
Ask AI
if sink_name not in [sink['name'] for sink in sinks['sinks']]: sink = { 'name': sink_name, 'destination': f'bigquery.googleapis.com/projects/{project_id}/datasets/hotkey_sink', 'filter': filter_str, 'outputVersionFormat': 'V2' } service.sinks().create(projectId=project_id, body=sink).execute() print(f"Sink {sink_name} created.")
If the sink exists, update it:
Copy
Ask AI
else: sink_uri = f"projects/{project_id}/sinks/{sink_name}" sink = service.sinks().get(sinkName=sink_uri).execute() if sink['filter'] == filter_str: print(f"Sink {sink_name} already has the correct filter.") else: sink['filter'] = filter_str service.sinks().update(sinkName=sink_uri, body=sink).execute() print(f"Sink {sink_name} updated.")
Verify that the sink has been created or updated:
Copy
Ask AI
sinks = service.sinks().list(projectId=project_id).execute()if sink_name in [sink['name'] for sink in sinks['sinks']]: sink_uri = f"projects/{project_id}/sinks/{sink_name}" sink = service.sinks().get(sinkName=sink_uri).execute() print(f"Sink {sink_name} has filter: {sink['filter']}")else: print(f"Sink {sink_name} not found.")
These steps will create or update a sink in GCP that logs hotkeys for Dataflow jobs.
Assistant
Responses are generated using AI and may contain mistakes.