python-ring-doorbell
python-ring-doorbell copied to clipboard
last_recording_id
Hi!
maybe we should add check like if event["recording"]["status"] == "ready" ?
for example, doorbell has motion. now we should be waiting for the complete video and after - we ready to process this video
raw (schematically) code:
n = 0
while True:
try:
event = doorbell.history(limit=1)[0]
last_recording_id = event['id']
if event["recording"]["status"] == "ready":
url = doorbell.recording_url(last_recording_id)
subprocess.call (['wget', url, '-O', '/home/pi/ring/last_video.mp4'])
break
except:
pass
n+=1
if n > 5:
break
time.sleep(1)
Hello @10der, thanks for your comment.
Could you elaborate what kind of issue you are hitting?
Thanks mmello
I'd saym he's been hitting the same issue, as I have. Once there's an alarm of motion or ring, the last_recording_fetched is always of the previous alarm, since the new video hasn't been made available yet. Further more, querying the print(doorbell.recording_url(doorbell.last_recording_id)), as statet in the examples always generated a new random URL, so one can't know, if the currently fetched URL really is the latest video.
I wonder, if this is even possible…
The code below worked for me
def download_ring_vid:
auth = Auth()
auth.fetch_token(username, password)
ring = Ring(auth)
doorbell = ring.doorbells[0]
json = live_streaming_json_fixed(doorbell)
try:
new_id = json['id']
except:
url = API_URI + DINGS_ENDPOINT
doorbell.update()
resp = doorbell._ring.query(url)
new_id = resp['id']
recording_id = doorbell.last_recording_id
while new_id != recording_id:
print("Waiting for Video File...")
time.sleep(2)
doorbell.update()
recording_id = doorbell.last_recording_id
print("New ID: "+str(new_id))
print("Old ID: "+str(recording_id))
for event in doorbell.history(limit=1):
for i in range(3):
try:
print('Downloading Event ID: %s' % event['id'])
filename = str(event['id'])+'.mp4'
doorbell.recording_download(event['id'],filename=working_dir + filename,override=True)
break
except:
print("Download error, retrying attempt "+str(i))
time.sleep(1)
continue
def live_streaming_json_fixed(self):
"""Return JSON for live streaming."""
url = API_URI + LIVE_STREAMING_ENDPOINT.format(self.account_id)
req = self._ring.query((url), method='POST')
print(req.status_code)
if req and req.status_code == 200:
url = API_URI + DINGS_ENDPOINT
try:
return self._ring.query(url).json()[0]
except (IndexError, TypeError):
pass
return None