aboutsummaryrefslogtreecommitdiff
path: root/pplib.py
diff options
context:
space:
mode:
Diffstat (limited to 'pplib.py')
-rw-r--r--pplib.py173
1 files changed, 173 insertions, 0 deletions
diff --git a/pplib.py b/pplib.py
new file mode 100644
index 0000000..7d22038
--- /dev/null
+++ b/pplib.py
@@ -0,0 +1,173 @@
+#!/usr/bin/env python3
+# File: pplib.py
+# Location: photoprismpull/
+# Author: bgstack15
+# SPDX-License-Identifier: GPL-3.0
+# Startdate: 2022-07-06 18:29
+# Title: Photoprism Pull library
+# Project: PhotoprismPull
+# Purpose: low-level interactions with Photoprism API
+# History:
+# Usage:
+# Either in a python shell or from pp.py
+# Reference:
+# https://github.com/photoprism/photoprism/blob/develop/internal/form/album.go
+# Improve:
+# Dependencies:
+# python3
+# Documentation: see README.md
+import json, requests, shutil, os, datetime, time, hashlib
+
+MAX_COUNT = 1000
+DLTOKEN = "" # will be populated by get_session
+
+def get_session(url = "", username = "", password = "", session = None):
+ """ Return session object with headers that store the X-Session-ID. Additionally, sets global var DLTOKEN which must be passed as a parameter in every download request, f"?t={session.headers['dltoken']}". """
+ s = session if isinstance(session,requests.sessions.Session) else requests.session()
+ #print(f"DEBUG (get_session): username {username} password {password}")
+ a = s.post(f"{url}/api/v1/session",json={"username": username, "password": password})
+ if 200 == a.status_code:
+ global DLTOKEN
+ DLTOKEN = json.loads(a.content)['config']['downloadToken']
+ s.headers['X-Session-ID'] = a.headers['X-Session-Id']
+ s.headers['url'] = url
+ else:
+ print(f"Error! Bad response when requesting session: {a.status_code}.")
+ return s
+
+def get_albums(session):
+ s = session
+ global MAX_COUNT
+ r = json.loads(s.get(f"{s.headers['url']}/api/v1/albums?count={MAX_COUNT}").content)
+ try:
+ r = [f for f in r if f['Type'] == 'album']
+ except:
+ pass
+ return r
+
+def get_album_by_title(session,title):
+ s = session
+ global MAX_COUNT
+ r = get_albums(s)
+ try:
+ # cheater method to get the one whose attribute is what we want
+ r = [f for f in r if f['Title'] == title][0]
+ except:
+ print(f"Warning! Unable to find an album with title \"{title}\"")
+ return r
+
+def get_photos_by_album_uid(session, album_uid, extraparams = ""):
+ s = session
+ global MAX_COUNT
+ r = s.get(f"{s.headers['url']}/api/v1/photos?count={MAX_COUNT}&album={album_uid}{extraparams}")
+ try:
+ r = json.loads(r.content)
+ except:
+ print(f"Warning! Unable to list photos for album uid \"{album_uid}\".")
+ return r
+
+def download_image_to_file(session, _hash, filename, date = None):
+ """ Given the unique hash to a file, and filename, and optionally a date string (ISO8601), download that file to the provided filename.
+ Skips the download if the file exists and has the exact same sha1sum (hash).
+ This is a low-level function that does parse the downloaded filename. You must provide the filename. """
+ global DLTOKEN
+ s = session
+ # Need to remove the "gzip" from Accept-Encoding so the file is not gzipped.
+ headers = s.headers
+ try:
+ headers.pop('Accept-Encoding')
+ except:
+ pass
+ #print(headers)
+ have = False
+ #print(f"Fetching {_hash} as {filename}")
+ if os.path.exists(filename):
+ hash = hashlib.sha1()
+ hash.update(open(filename,"rb").read())
+ if hash.hexdigest() == _hash:
+ # we already have the file
+ have = True
+ print(f"Have: {_hash}",end=' ')
+ if not have:
+ print(f"Fetching {_hash}",end=' ')
+ with s.get(f"{s.headers['url']}/api/v1/dl/{_hash}?t={DLTOKEN}", stream=True, headers = headers) as r:
+ with open(filename,"wb") as f:
+ shutil.copyfileobj(r.raw, f)
+ if date:
+ try:
+ # Photoprism returns nice ISO8601 dates
+ date = datetime.datetime.strptime(date, "%Y-%m-%dT%H:%M:%SZ")
+ except:
+ pass
+ try:
+ date = time.mktime(date.timetuple())
+ except:
+ pass
+ os.utime(filename, (date, date))
+ hash = hashlib.sha1()
+ hash.update(open(filename,"rb").read())
+ if hash.hexdigest() != _hash:
+ # This can happen if the DLTOKEN or session has expired, and we got the 420-byte svg for "invalid"
+ print(f"\rfailed {_hash}",end=' ')
+ os.remove(filename)
+ print(filename)
+
+def get_image(session, image, directory = None):
+ """
+ Given a dict of an image, such as the one returned from an image search, download the file to a nicely named file.
+ This is the higher-level function that wraps around download_image_to_file.
+ """
+ if None == directory:
+ directory = os.curdir
+ if not os.path.isdir(directory):
+ try:
+ os.makedirs(directory)
+ except:
+ print(f"Error! Unable to make directory {directory}")
+ #print(f"download_image_to_file({session},{image['Hash']},{image['FileName'].split('/')[-1]},{directory})")
+ download_image_to_file(session, image['Hash'], os.path.join(directory,image['FileName'].split('/')[-1]), image['TakenAt'])
+
+def download_album_to_directory(albumname,directory = ".",extraparams = "", session = None, username = "", password = "", url = ""):
+ """ Given extraparams (example "&after=2022-07-01") and albumname, download that album to directory. """
+ s = session if isinstance(session,requests.sessions.Session) else get_session(url, username, password)
+ album = get_album_by_title(s, albumname)
+ photos = get_photos_by_album_uid(s, album_uid = album['UID'], extraparams = extraparams)
+ if not os.path.isdir(directory):
+ try:
+ os.makedirs(directory)
+ except:
+ print(f"Error! Unable to make directory {directory}")
+ for p in photos:
+ get_image(s, p, directory)
+ return photos
+
+def add_hashes_to_album(session, albumname, hashes_file, apply = False):
+ """ Given album name, and file that contains list of sha1sums of files that should exist already in photoprism, add these files to the album. """
+ s = session if isinstance(session,requests.sessions.Session) else get_session(url, username, password)
+ album = get_album_by_title(s, albumname)
+ album_uid = album['UID']
+ with open(hashes_file,"r") as f:
+ lines = f.readlines()
+ x = 0
+ for line in lines:
+ image = None
+ image_uid = None
+ line=line.rstrip()
+ try:
+ short = line.split(' ')[0].rstrip()
+ except:
+ short = line.rstrip()
+ if short is not None and len(short) > 0 and short.strip() != "":
+ r = s.get(f"{s.headers['url']}/api/v1/photos?count={MAX_COUNT}&hash={short}")
+ try:
+ image = json.loads(r.content)[0]
+ #print(image)
+ image_uid = image['UID']
+ print(f"Found image for {line}")
+ if apply:
+ add = json.loads(s.post(f"{s.headers['url']}/api/v1/albums/{album_uid}/photos",json={'photos':[image_uid]}).content)
+ print(add)
+ except:
+ x = x + 1
+ print(f"MISSING: {line}")
+ return x
bgstack15