Exceptions:
Exception in thread Thread-ConsumeBidirectionalStream: grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with (StatusCode.UNAVAILABLE, The service was unable to fulfill your request. Please try again. [code=8a75])>
The above exception was the direct cause of the following exception:
google.api_core.exceptions.ServiceUnavailable: 503 The service was unable to fulfill your request. Please try again. [code=8a75]
I'm trying to build an IoT prototype that roughly follows Google's end-to-end sample (docs | code) and I am encountering an error in the subscriber when there are no messages in the queue. This can happen both when the subscriber first starts against an empty queue after about a minute and also after processing any number of messages and a minute or so after the queue is emptied.
I have found a workaround here on StackOverflow but can't get it working. So my question is how to get this workaround policy working since all it seems to do is hide the error - my subscriber still hangs and doesn't process any further messages.
The relevant bits of code look like this:
from google.cloud import pubsub
import google.cloud.pubsub_v1.subscriber.message as Message
from google.cloud.pubsub_v1.subscriber.policy import thread
import grpc
class WorkaroundPolicy(thread.Policy):
def on_exception(self, exception):
# If we are seeing UNAVALABLE then we need to retry (so return None)
unavailable = grpc.StatusCode.UNAVAILABLE
if isinstance(exception, ServiceUnavailable):
logger.warning('Ignoring google.api_core.exceptions.ServiceUnavailable exception: {}'.format(exception))
return
elif getattr(exception, 'code', lambda: None)() in [unavailable]:
logger.warning('Ignoring grpc.StatusCode.UNAVAILABLE (Orginal exception: {})'.format(exception))
return
# For anything else fall back on the parent class behaviour
super(WorkaroundPolicy, self).on_exception(exception)
# Other imports and code ommitted for brevity
def callback(msg: Message):
try:
data = json.loads(msg.data)
except ValueError as e:
logger.error('Loading Payload ({}) threw an Exception: {}.'.format(msg.data, e))
# For the prototype, if we can't read it, then discard it
msg.ack()
return
device_project_id = msg.attributes['projectId']
device_registry_id = msg.attributes['deviceRegistryId']
device_id = msg.attributes['deviceId']
device_region = msg.attributes['deviceRegistryLocation']
self._update_device_config(
device_project_id,
device_region,
device_registry_id,
device_id,
data)
msg.ack()
def run(self, project_name, subscription_name):
# Specify a workaround policy to handle StatusCode.UNAVAILABLE [code=8a75] error (but may get CPU issues)
#subscriber = pubsub.SubscriberClient(policy_class = WorkaroundPolicy)
# Alternatively, instantiate subscriber without the workaround to see full exception stack
subscriber = pubsub.SubscriberClient()
subscription = subscriber.subscribe(subscription_path, callback)
subscription.future.result()
while True:
time.sleep(60)
If it helps, the full version of this can be found in GitHub.
Stack trace/command line output (without policy workaround)
(venv) Freds-MBP:iot-subscriber-issue Fred$ python Controller.py \
--project_id=xyz-tests \
--pubsub_subscription=simple-mqtt-controller \
--service_account_json=/Users/Fred/_dev/gcp-credentials/simple-mqtt-controller-service-account.json
2018-03-21 09:36:20,975 INFO Controller Creating credentials from JSON Key File: "/Users/Fred/_dev/gcp-credentials/simple-mqtt-controller-service-account.json"...
2018-03-21 09:36:20,991 INFO Controller Creating service from discovery URL: "https://cloudiot.googleapis.com/$discovery/rest?version=v1"...
2018-03-21 09:36:20,992 INFO googleapiclient.discovery URL being requested: GET https://cloudiot.googleapis.com/$discovery/rest?version=v1
2018-03-21 09:36:21,508 INFO Controller Creating subscriber for project: "xyz-tests" and subscription: "simple-mqtt-controller"...
2018-03-21 09:36:23,200 INFO Controller Listening for messages on projects/xyz-tests/subscriptions/simple-mqtt-controller...
# This then occurs typically 60 seconds or so (sometimes up to 2 mins) later:
Exception in thread Thread-ConsumeBidirectionalStream:
Traceback (most recent call last):
File "/Users/Fred/_dev/datacentricity-public-samples/iot-subscriber-issue/venv/lib/python3.6/site-packages/google/api_core/grpc_helpers.py", line 76, in next
return six.next(self._wrapped)
File "/Users/Fred/_dev/datacentricity-public-samples/iot-subscriber-issue/venv/lib/python3.6/site-packages/grpc/_channel.py", line 347, in __next__
return self._next()
File "/Users/Fred/_dev/datacentricity-public-samples/iot-subscriber-issue/venv/lib/python3.6/site-packages/grpc/_channel.py", line 341, in _next
raise self
grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with (StatusCode.UNAVAILABLE, The service was unable to fulfill your request. Please try again. [code=8a75])>