diff options
Diffstat (limited to 'pplib.py')
-rw-r--r-- | pplib.py | 173 |
1 files changed, 173 insertions, 0 deletions
diff --git a/pplib.py b/pplib.py new file mode 100644 index 0000000..7d22038 --- /dev/null +++ b/pplib.py @@ -0,0 +1,173 @@ +#!/usr/bin/env python3 +# File: pplib.py +# Location: photoprismpull/ +# Author: bgstack15 +# SPDX-License-Identifier: GPL-3.0 +# Startdate: 2022-07-06 18:29 +# Title: Photoprism Pull library +# Project: PhotoprismPull +# Purpose: low-level interactions with Photoprism API +# History: +# Usage: +# Either in a python shell or from pp.py +# Reference: +# https://github.com/photoprism/photoprism/blob/develop/internal/form/album.go +# Improve: +# Dependencies: +# python3 +# Documentation: see README.md +import json, requests, shutil, os, datetime, time, hashlib + +MAX_COUNT = 1000 +DLTOKEN = "" # will be populated by get_session + +def get_session(url = "", username = "", password = "", session = None): + """ Return session object with headers that store the X-Session-ID. Additionally, sets global var DLTOKEN which must be passed as a parameter in every download request, f"?t={session.headers['dltoken']}". """ + s = session if isinstance(session,requests.sessions.Session) else requests.session() + #print(f"DEBUG (get_session): username {username} password {password}") + a = s.post(f"{url}/api/v1/session",json={"username": username, "password": password}) + if 200 == a.status_code: + global DLTOKEN + DLTOKEN = json.loads(a.content)['config']['downloadToken'] + s.headers['X-Session-ID'] = a.headers['X-Session-Id'] + s.headers['url'] = url + else: + print(f"Error! Bad response when requesting session: {a.status_code}.") + return s + +def get_albums(session): + s = session + global MAX_COUNT + r = json.loads(s.get(f"{s.headers['url']}/api/v1/albums?count={MAX_COUNT}").content) + try: + r = [f for f in r if f['Type'] == 'album'] + except: + pass + return r + +def get_album_by_title(session,title): + s = session + global MAX_COUNT + r = get_albums(s) + try: + # cheater method to get the one whose attribute is what we want + r = [f for f in r if f['Title'] == title][0] + except: + print(f"Warning! Unable to find an album with title \"{title}\"") + return r + +def get_photos_by_album_uid(session, album_uid, extraparams = ""): + s = session + global MAX_COUNT + r = s.get(f"{s.headers['url']}/api/v1/photos?count={MAX_COUNT}&album={album_uid}{extraparams}") + try: + r = json.loads(r.content) + except: + print(f"Warning! Unable to list photos for album uid \"{album_uid}\".") + return r + +def download_image_to_file(session, _hash, filename, date = None): + """ Given the unique hash to a file, and filename, and optionally a date string (ISO8601), download that file to the provided filename. + Skips the download if the file exists and has the exact same sha1sum (hash). + This is a low-level function that does parse the downloaded filename. You must provide the filename. """ + global DLTOKEN + s = session + # Need to remove the "gzip" from Accept-Encoding so the file is not gzipped. + headers = s.headers + try: + headers.pop('Accept-Encoding') + except: + pass + #print(headers) + have = False + #print(f"Fetching {_hash} as {filename}") + if os.path.exists(filename): + hash = hashlib.sha1() + hash.update(open(filename,"rb").read()) + if hash.hexdigest() == _hash: + # we already have the file + have = True + print(f"Have: {_hash}",end=' ') + if not have: + print(f"Fetching {_hash}",end=' ') + with s.get(f"{s.headers['url']}/api/v1/dl/{_hash}?t={DLTOKEN}", stream=True, headers = headers) as r: + with open(filename,"wb") as f: + shutil.copyfileobj(r.raw, f) + if date: + try: + # Photoprism returns nice ISO8601 dates + date = datetime.datetime.strptime(date, "%Y-%m-%dT%H:%M:%SZ") + except: + pass + try: + date = time.mktime(date.timetuple()) + except: + pass + os.utime(filename, (date, date)) + hash = hashlib.sha1() + hash.update(open(filename,"rb").read()) + if hash.hexdigest() != _hash: + # This can happen if the DLTOKEN or session has expired, and we got the 420-byte svg for "invalid" + print(f"\rfailed {_hash}",end=' ') + os.remove(filename) + print(filename) + +def get_image(session, image, directory = None): + """ + Given a dict of an image, such as the one returned from an image search, download the file to a nicely named file. + This is the higher-level function that wraps around download_image_to_file. + """ + if None == directory: + directory = os.curdir + if not os.path.isdir(directory): + try: + os.makedirs(directory) + except: + print(f"Error! Unable to make directory {directory}") + #print(f"download_image_to_file({session},{image['Hash']},{image['FileName'].split('/')[-1]},{directory})") + download_image_to_file(session, image['Hash'], os.path.join(directory,image['FileName'].split('/')[-1]), image['TakenAt']) + +def download_album_to_directory(albumname,directory = ".",extraparams = "", session = None, username = "", password = "", url = ""): + """ Given extraparams (example "&after=2022-07-01") and albumname, download that album to directory. """ + s = session if isinstance(session,requests.sessions.Session) else get_session(url, username, password) + album = get_album_by_title(s, albumname) + photos = get_photos_by_album_uid(s, album_uid = album['UID'], extraparams = extraparams) + if not os.path.isdir(directory): + try: + os.makedirs(directory) + except: + print(f"Error! Unable to make directory {directory}") + for p in photos: + get_image(s, p, directory) + return photos + +def add_hashes_to_album(session, albumname, hashes_file, apply = False): + """ Given album name, and file that contains list of sha1sums of files that should exist already in photoprism, add these files to the album. """ + s = session if isinstance(session,requests.sessions.Session) else get_session(url, username, password) + album = get_album_by_title(s, albumname) + album_uid = album['UID'] + with open(hashes_file,"r") as f: + lines = f.readlines() + x = 0 + for line in lines: + image = None + image_uid = None + line=line.rstrip() + try: + short = line.split(' ')[0].rstrip() + except: + short = line.rstrip() + if short is not None and len(short) > 0 and short.strip() != "": + r = s.get(f"{s.headers['url']}/api/v1/photos?count={MAX_COUNT}&hash={short}") + try: + image = json.loads(r.content)[0] + #print(image) + image_uid = image['UID'] + print(f"Found image for {line}") + if apply: + add = json.loads(s.post(f"{s.headers['url']}/api/v1/albums/{album_uid}/photos",json={'photos':[image_uid]}).content) + print(add) + except: + x = x + 1 + print(f"MISSING: {line}") + return x |