summaryrefslogtreecommitdiff
path: root/gmm_lib.py
diff options
context:
space:
mode:
Diffstat (limited to 'gmm_lib.py')
-rw-r--r--gmm_lib.py346
1 files changed, 346 insertions, 0 deletions
diff --git a/gmm_lib.py b/gmm_lib.py
new file mode 100644
index 0000000..20465cf
--- /dev/null
+++ b/gmm_lib.py
@@ -0,0 +1,346 @@
+#!/usr/bin/env python3
+# vim: set et ts=4 sts=4 sw=4:
+# Startdate: 2024-11-20-4 12:50
+# Title: Library for Graphical Mount Manager
+# Dependencies:
+# req-devuan: python3, python3-psutil
+import argparse, sys, os, psutil, subprocess, json, configparser
+
+# LOW-LEVEL values
+appname_full = "Graphical Mount Manager"
+appname = "gmm"
+appversion = "0.0.1"
+authors = ["bgstack15"]
+icon_name = "dvd_unmount"
+conffile = os.path.join(os.getenv("HOME"),".config",appname,"config")
+
+ABOUT_TEXT = """
+gmm "Graphical Mount Manager"
+(C) 2024 bgstack15
+SPDX-License-Identifier: GPL-3.0-only
+"""
+
+# LOW-LEVEL FUNCTIONS
+def ferror(*args, **kwargs):
+ print(*args, file=sys.stderr, **kwargs)
+
+def debuglev(_numbertocheck):
+ # if _numbertocheck <= debuglevel then return truthy
+ _debuglev = False
+ try:
+ if int(_numbertocheck) <= int(debuglevel):
+ _debuglev = True
+ except Exception as e:
+ pass
+ return _debuglev
+
+ # APP FUNCTIONS
+
+class Gmm():
+
+ def __init__(self,args=None):
+ # Parse config
+ self.conffile = conffile
+ try:
+ if args.conf:
+ self.conffile = args.conf
+ except:
+ pass
+ self.mounts = self.list_mounts()
+ self.args = args
+ self.config = configparser.ConfigParser()
+ self.load_config(self.conffile)
+ # will be set to False if --no-gui, or some action is given
+ self.show_gui = True
+
+ def list_mounts(self):
+ """
+ Return a python list of FILE,MOUNT pairs
+ """
+ # get list of devices
+ mounts = [
+ {
+ "device": i.device,
+ "mountpoint": i.mountpoint
+ } for i in psutil.disk_partitions(False) if i.fstype in ["iso9660"]
+ ]
+ # use losetup --json,
+ losetup_p = subprocess.Popen(
+ ["/usr/sbin/losetup","--json"], stdout=subprocess.PIPE
+ )
+ losetup_o, _ = losetup_p.communicate()
+ try:
+ losetup_j = json.loads(losetup_o)
+ except Exception as e:
+ ferror(f"Got error {e} when trying to read losetup output")
+ losetup_j = {}
+ if losetup_j:
+ losetup_j = losetup_j["loopdevices"]
+ for i in mounts:
+ #for j in losetup_j["loopdevices"]:
+ # if i.device == j.name:
+ try:
+ i["source"] = [j["back-file"] for j in losetup_j if j["name"] == i["device"]][0]
+ except Exception as e:
+ ferror(f"While looking for {i['device']} in {losetup_j}, got error {e}")
+ for i in mounts:
+ if "source" in i:
+ i.pop("device")
+ # Now we have a list of
+ # [{'mountpoint': '/mnt/foo', 'source': '/mnt/public/CDROMs/Games/Logical Journey of the Zoombinis.iso'}]
+ return mounts
+
+ def mount_iso_to_path(self, isofile, mount_point):
+ """
+ Mount the isofile to mount_point using sudo. No protections of already-mounted or something-else-mounted.
+ """
+ mount_p = subprocess.Popen(
+ ["sudo","mount","-v",isofile,mount_point], stdout=subprocess.PIPE
+ )
+ mount_o, mount_e = mount_p.communicate()
+ if debuglev(5):
+ if mount_e:
+ ferror(mount_e)
+
+ def mount_iso_to_default(self,isofile):
+ """
+ Use the gmm config to determine where to mount this.
+ """
+ md = self.config[appname]["mounts_dir"]
+ # determine next available number. 0-indexed. If you want to make this 1-indexed, set x to 0.
+ x = -1
+ safe = False
+ # safety valve: do not try mount more than 20 mount points.
+ while (not safe) or x < 20:
+ x = x + 1
+ testpath = os.path.join(md,str(x))
+ if not os.path.exists(testpath):
+ safe = True
+ break
+ else:
+ if os.path.ismount(testpath):
+ if self.iso_is_mounted_to_path(isofile, testpath):
+ if debuglev(2):
+ ferror(f"INFO: Found {isofile} mounted at {testpath} already. Skipping...")
+ # short-circuit
+ return True
+ # if a mountpoint but is not the requested isofile
+ if debuglev(8):
+ ferror(f"DEBUG: Found different mountpoint at {testpath}, continuing to look...")
+ # skip this number
+ continue
+ elif os.path.isdir(testpath):
+ if os.listdir(testpath):
+ # not empty so skip it
+ ferror(f"WARNING: non-mountpoint {testpath} is not empty, continuing to look...")
+ continue
+ else:
+ # empty directory, this is what we want
+ safe = True
+ break
+ else:
+ # so not a mountpoint and not a dir, so skip
+ ferror(f"WARNING: non-directory {testpath}, continuing to look...")
+ continue
+ # end while
+ # notably, because of all the continues and breaks, we cannot increment x at bottom of this loop.
+ if debuglev(8):
+ ferror(f"DEBUG: after loop, testpath is {testpath}")
+ if safe:
+ if debuglev(1):
+ print(f"INFO: will mount {isofile} to {testpath}.")
+ try:
+ os.mkdir(testpath)
+ # FUTUREIMPROVEMENT: might need to catch specific exceptions and pass them, like dir-already-exists.
+ except:
+ pass
+ self.mount_iso_to_path(isofile, testpath)
+
+ def unmount_iso_to_path(self, isofile, mount_point):
+ """
+ Unmount the isofile to mount_point using sudo.
+ This exploits that the "isofile" could be the actual iso file that is mounted, or the mount point. The umount command can either to unmount something.
+ """
+ params = ["sudo","umount","-v",isofile,mount_point]
+ params = [i for i in params if i]
+ if debuglev(1):
+ ferror(f"Running: {params}")
+ mount_p = subprocess.Popen(
+ params, stdout=subprocess.PIPE
+ )
+ mount_o, mount_e = mount_p.communicate()
+ if debuglev(5):
+ if mount_e:
+ ferror(mount_e)
+ # and now delete the empty directory
+ if not mount_point:
+ mount_point = isofile
+ if os.path.isdir(mount_point) and (not os.listdir(mount_point)):
+ try:
+ os.rmdir(mount_point)
+ except Exception as e:
+ if debuglev(5):
+ ferror(f"INFO: while trying to remove now-empty dir {mount_point}, got error {e}, continuing...")
+
+ def get_iso_mounted_to_path(self, mount_point):
+ """
+ Return the filename of what is mounted to mount_point.
+ """
+ if not os.path.ismount(mount_point):
+ return False
+ if not self.mounts:
+ self.mounts = self.list_mounts()
+ for i in self.mounts:
+ if i["mountpoint"] == mount_point:
+ # short-circuit because there can be only one thing mounted to a mount_point
+ return i["source"]
+ return None
+
+ def iso_is_mounted_to_path(self, iso_file, mount_point):
+ """
+ Return True if the named iso_file is mounted to mount_point.
+ """
+ if not os.path.ismount(mount_point):
+ return False
+ if not self.mounts:
+ self.mounts = self.list_mounts()
+ for i in self.mounts:
+ if i["mountpoint"] == mount_point and i["source"] == iso_file:
+ # short-circuit
+ return True
+ return False
+
+ def load_config(self, configfile):
+ if debuglev(1):
+ ferror(f"Loading config file {configfile}")
+ self.config.read(configfile)
+ if (not self.config) or self.config == configparser.ConfigParser():
+ ferror(f"Generating new config file, {configfile}!")
+ # This is setting a new config file.
+ self.config[appname] = {
+ "mounts_dir": os.path.expanduser("~/mnt")
+ }
+ self.save_config(configfile)
+ try:
+ #config["gmm"]["mounts_dir"] = os.path.expanduser(config["gmm"]["mounts_dir"])
+ self.config.set(appname,"mounts_dir", os.path.expanduser(self.config[appname]["mounts_dir"].strip('"')))
+ except:
+ pass
+ if debuglev(9):
+ ferror({section: dict(self.config[section]) for section in self.config.sections()})
+
+ def save_config(self, configfile = None):
+ if not configfile:
+ configfile = self.conffile
+ # un-expand the tilde
+ md = self.config[appname]["mounts_dir"]
+ # by adding the trailing slash, we make sure it is not just the HOME, but that would be an extreme situation.
+ if md.startswith(os.path.expanduser("~")+"/"):
+ md = md.replace(os.path.expanduser("~"),"~")
+ self.config.set(appname,"mounts_dir",md)
+ with open(configfile,"w") as cf:
+ self.config.write(cf)
+
+ def cli_main(self):
+ # default behavior is to show the gui, unless --no-gui, or given some paths to mount, or --list
+ args = self.args
+ paths = args.paths
+ umount = args.unmount
+ if (args.list) or (paths) or ((not paths) and args.all and umount):
+ self.show_gui = False
+ if args.gui == False or args.gui == True:
+ ferror(f"Setting show_gui to {args.gui}")
+ self.show_gui = args.gui
+ if args.list:
+ if args.output == "json":
+ print(json.dumps(self.mounts))
+ elif args.output == "tsv":
+ if self.mounts:
+ print("source\tpath")
+ for i in self.mounts:
+ print(f"{i['source']}\t{i['mountpoint']}")
+ else:
+ print(self.mounts)
+ if paths:
+ # calculate if paths.len = 2, then check if paths[0] is a file and paths[1] is a dir or underneath
+ if len(paths) == 2:
+ left = paths[0]
+ right = paths[1]
+ if os.path.isfile(left):
+ if os.path.isdir(right) or (not os.path.exists(right)):
+ if os.path.ismount(right):
+ if debuglev(8):
+ ferror(f"DEBUG: Investigating current mount {right} to see if it is same as {left}")
+ if iso_is_mounted_to_path(left, right, self.mounts):
+ if umount:
+ self.unmount_iso_to_path(left, right)
+ else:
+ if debuglev(1):
+ print(f"Already mounted!")
+ else:
+ ferror(f"ERROR: {get_iso_mounted_to_path(right,self.mounts)} is already mounted to {right}.")
+ # FUTUREIMPROVEMENT: If we want to implement --force to umount old, then mount this left, do it here.
+ else:
+ if umount:
+ if debuglev(2):
+ print(f"INFO: not a mount point, so nothing to do.")
+ else:
+ if debuglev(1):
+ print(f"INFO: will mount {left} to {right}.")
+ mount_iso_to_path(left, right)
+ elif os.path.isfile(right):
+ for i in paths:
+ self.mount_iso_to_default(i)
+ else:
+ ferror(f"ERROR: second path {right} is not a dir or file. Choose a different spot")
+ else:
+ ferror(f"ERROR: first path is not a file: {left}. Skipping...")
+ elif len(paths) == 1:
+ # main default method; the path to the iso to mount
+ isofile = paths[0]
+ # not ideal, but figure out that the user wanted "--list"
+ if isofile == "list" and not os.path.isfile(isofile):
+ ferror(f"WARNING: please use --list to list mounts. Continuing...")
+ print(self.mounts)
+ sys.exit(0)
+ if umount:
+ self.unmount_iso_to_path(isofile,"")
+ else:
+ mount_iso_to_default(isofile)
+ else:
+ # more than one path, so treat each as a file to mount
+ for isofile in paths:
+ if umount:
+ self.unmount_iso_to_path(isofile,"")
+ else:
+ self.mount_iso_to_default(isofile)
+ else:
+ # no paths, so check if --all and --umount
+ if umount and args.all:
+ for i in self.mounts:
+ self.unmount_iso_to_path(i["source"],i["mountpoint"])
+
+# PARSE ARGUMENTS
+parser = argparse.ArgumentParser(description="graphical mount manager",prog=appname,epilog=""" The value-add of this application is to use a configured directory for
+mountpoints, for easy mounting, e.g., from a graphical file manager. Run with an
+iso filename, and it will mount to ~/mnt/0, for example.""")
+parser.add_argument("-d","--debug", nargs='?', default=0, type=int, choices=range(0,11), help="Set debug level.")
+parser.add_argument("-V","--version", action="version", version="%(prog)s " + appversion)
+parser.add_argument("-g","--gui", action=argparse.BooleanOptionalAction, default=None, help="Show gui, even if also given any other actions.")
+parser.add_argument("-l","--list", action="store_true", help="List mounted isos and paths and then exit.")
+parser.add_argument("-o","--output", default="tsv", choices=("tsv","raw","json"), help="Change output format for --list.")
+parser.add_argument("-u","--unmount","--umount", action="store_true", help="If used with one or more paths, umount them instead of mounting.")
+parser.add_argument("-a","--all", action="store_true", help="Used only with --unmount. Unmount all mounted paths in default-configured dir.")
+parser.add_argument("paths", nargs="*",help="Positional parameters: ISO_FILE, MOUNT_PATH")
+#parser.add_argument("-f","--force", action="store_true", help="Force deploy all confs.")
+parser.add_argument("-c","--conf","--config", action="store", default=conffile, help="Use this config file.")
+#parser.add_argument("-r","--dry","--dryrun","--dry-run", action="store_true", help="Do not execute. Useful when debugging.")
+args = parser.parse_args()
+if args.debug is None:
+ debuglevel = 5
+elif args.debug:
+ debuglevel = args.debug
+if debuglev(1):
+ ferror("debug level", debuglevel)
+ ferror(args)
+umount = args.unmount
bgstack15