#!/usr/bin/env python # Startdate: 2021-12-15 # File: bws_db_sqlite.py # Usage: # always loaded from bws_db, so these classes exist: Bookmark, User # Reference: # get next pk number https://stackoverflow.com/questions/6950454/getting-the-next-auto-increment-value-of-a-sqlite-database/33285873#33285873 # get latest pk number https://stackoverflow.com/questions/8479315/sql-inserting-a-row-and-returning-primary-key/54310457#54310457 # Dependencies: # python3 import sqlite3, sys, os, re defaultfile = "bws.sqlite" non_decimal = re.compile(r'[^\d.]+') def db_init( source = defaultfile, ): print("DEBUG(sqlite): as sqlite file.") create_bookmarks_str = """ CREATE TABLE bookmarks( bid INTEGER PRIMARY KEY AUTOINCREMENT, url TEXT, title TEXT, uid TEXT, tags TEXT, notes TEXT, iid INT, timestamp TEXT, order_id INT, always_open_in_new_tab BOOLEAN ) """ create_users_str = """ CREATE TABLE users( uid INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT, always_open_in_new_tab BOOLEAN ) """ try: r = _execute(source,create_bookmarks_str) except sqlite3.OperationalError as e: print(f"DEBUG(sqlite): sqlite file {source} already has table bookmarks.") #print("DEBUG(sqlite): Response from making bookmarks table starts on next line.") #print(r) try: r = _execute(source,create_users_str) except sqlite3.OperationalError as e: print(f"DEBUG(sqlite): sqlite file {source} already has table users.") #print("DEBUG(sqlite): next line starts create users response.") #print(r) return source def db_wipe(source = defaultfile, i_really_mean_it = False): if i_really_mean_it == "Yes I Do": r = _execute(source,"DROP table users;") r = _execute(source,"DROP table bookmarks;") def _db_get_next_pk(source = defaultfile, table =""): if table == "": return -1 next_pk = -1 r = _execute(source,f"SELECT seq+1 FROM SQLITE_SEQUENCE WHERE name = '{table}';") for i in r: next_pk = non_decimal.sub('',str(i)) return next_pk def db_get_next_available_uid(source = defaultfile): next_uid = -1 return _db_get_next_pk(source,'users') def db_get_next_available_bid(source = defaultfile): next_bid = -1 return _db_get_next_pk(source,'bookmarks') def db_set_sample_data(source = defaultfile): sample_users_str = """ INSERT INTO users(name,always_open_in_new_tab) VALUES ('alfred',True), ('barney',True), ('charlie',False), ('dennis',True), ('edgar',True), ('bgirton',False); """ sample_bookmarks_str = """ INSERT INTO BOOKMARKS(url,title,uid,tags,order_id) VALUES ('https://startrek.com/','Star Trek front page',1,'home,fandom,star,trek',5), ('http://bugs.devuan.org','b.d.o',1,'bugs,devuan',11), ('https://slashdot.org/','/.',1,'news',3), ('https://alaskalinuxuser3.ddns.net','Alaska Linux User',6,'blog,alaska',8), ('https://google.com','Google search',4,'search',15), ('https://foxnews.com','Fox News',2,'news',12), ('https://distrowatch.com','Distrowatch',3,'news',2), ('https://freeipa.org','FreeIPA website',2,'freeipa',3), ('https://start.duckduckgo.com','ddg ',6,'search',4), ('https://google.com','Google search',5,'search',9), ('https://freefilesync.org/forum/','FreeFileSync forum',1,'forum,freefilesync',10), ('https://start.duckduckgo.com','ddg',3,'search,home',6), ('https://bgstack15.ddns.net','my site',6,'home,blog',7); """ r = _execute(source,sample_users_str) r = _execute(source,sample_bookmarks_str) def db_add_bookmark(source = defaultfile, url = "", title = "", uid = 0, tags = [], notes = "", iid = 0, timestamp = 0, order_id = 0, always_open_in_new_tab = False ): tags_str = _tags_to_db(tags) if type(timestamp) == tuple: timestamp = timestamp[0] # sqlite doesn't have a date datatype, so we will convert to ISO8601 string in the app side add_str = f""" INSERT INTO bookmarks( url, title, uid, tags, notes, iid, timestamp, order_id, always_open_in_new_tab ) VALUES ('{url}','{title}','{uid}','{tags_str}', '{notes}','{iid}','{timestamp}','{order_id}', '{always_open_in_new_tab}' ); """ r = _execute(source,add_str) #print("DEBUG(sqlite): next line is response to adding bookmark to sqlite table.") #print(r) # try this get new bid trick #print("DEBUG(sqlite): next line(s) are new bid trick") #r = _execute("SELECT last_insert_rowid()") #new_bid = -1 #for i in r: # new_bid = non_decimal.sub('',str(i)) #print("DEBUG(sqlite): DONE with new bid trick") print(f"R is {r}") new_bid = r.lastrowid return new_bid def db_get_bookmark_by_id(source = defaultfile, bid = -1 ): """Returns a Bookmark object for the given bid. Returns None if bid does not exist.""" if -1 == bid or "-1" == bid or bid is None: return None from bws_db import Bookmark count = -1 select_str = f"SELECT COUNT(bid) FROM bookmarks WHERE bid = {bid}" get_str = f"SELECT bid,url,title,uid,tags,notes,iid,timestamp,order_id,always_open_in_new_tab FROM bookmarks WHERE bid = {bid};" r = _execute(source,select_str) for i in r: count = non_decimal.sub('',str(i)) r = _execute(source,get_str) print(f"DEBUG(sqlite): found {count} records for bid {bid}.") b = None for i in r: print("Bookmark:",i) bid, url, title, uid, tags, notes, iid, timestamp, order_id, aoint = i b = Bookmark( source, bid = bid, url = url, title = title, uid = uid, tags = _tags_from_db(tags), notes = notes, iid = iid, timestamp = timestamp, order_id = order_id, always_open_in_new_tab = aoint, _from_db = True ) return b def db_get_all_bookmarks(source = defaultfile, user = "" ): """ Returns a list of Bookmark objects. If user matches a username or uid, the list includes bookmarks only for that user. Otherwise, returns the entire list. """ from bws_db import Bookmark bm_list = [] get_str = "SELECT bid,url,title,uid,tags,notes,iid,timestamp,order_id,always_open_in_new_tab FROM bookmarks" if "" != user: uid = db_get_uid_from_any(source = source, name = user) get_str += f" WHERE uid = '{uid}'" get_str += ";" #print(f"DEBUG(sqlite): looking for all bookmarks, possible for user \"{user}\" with query:") #print(get_str) r = _execute(source,get_str) for i in r: print("Bookmark:",i) bid, url, title, uid, tags, notes, iid, timestamp, order_id, aoint = i b = Bookmark( source, bid = bid, url = url, title = title, uid = uid, tags = _tags_from_db(tags), notes = notes, iid = iid, timestamp = timestamp, order_id = order_id, always_open_in_new_tab = aoint, _from_db = True ) bm_list.append(b) return bm_list def db_get_user_name_from_uid( source = defaultfile, uid = 0 ): get_user_str = f"SELECT name from users WHERE uid = '{uid}';" r = _execute(source,get_user_str) name = "" for i in r: j = i if type(i) == tuple: j = i[0] print("DEBUG(sqlite): db_get_username_from_uid got:",j) name = j.replace("(","").replace(",)","") return name def _db_get_uid_from_name( source = defaultfile, name = 0 ): get_uid_str = f"SELECT uid from users WHERE name = '{name}';" r = _execute(source,get_uid_str) for i in r: #print(f"DEBUG(sqlite): looking for user {name}, found row: {i}") uid = non_decimal.sub('',str(i)) return uid def db_get_uid_from_any( source = defaultfile, name = 0 ): uid = -1 get_uid_str = f"SELECT uid from users WHERE (name = '{name}' or uid = '{name}');" r = _execute(source,get_uid_str) for i in r: #print(f"DEBUG(sqlite): looking for user {name}, found row: {i}") uid = non_decimal.sub('',str(i)) return uid def _execute( source = defaultfile, command = "" ): print(f"DEBUG(sqlite): _execute(command=\"{command}\")") if type(source) == sqlite3.Connection: return source.execute(command) else: with sqlite3.connect(source) as conn: return conn.execute(command) def db_delete_bookmark( source = defaultfile, bid = None ): count = -1 if bid is not None: select_str = f"SELECT COUNT(bid) FROM bookmarks WHERE bid = {bid};" delete_str = f"DELETE FROM bookmarks WHERE bid = {bid};" r = _execute(source,select_str) for i in r: count = non_decimal.sub('',str(i)) r = _execute(source,delete_str) print(f"DEBUG(sqlite): [bws_db_sqlite] deleted bid {bid}, which deleted {count} rows.") return count def _tags_to_db(tags): """ Convert a python list to comma-separated string for a single column in sqlite database """ # probably need better sanitization than this poor man's version. print(f"DEBUG(sqlite): trying to split tags \"{tags}\".") print(f"DEBUG(sqlite): which is type \"{type(tags)}\".") if type(tags) == list: return ','.join(tags).replace('"','').replace("'",'').replace(",,",",").rstrip(',') elif type(tags) == tuple: return ','.join(tags[0]).replace('"','').replace("'",'').replace(",,",",").rstrip(',') else: return tags def _tags_from_db(tags): """ Convert a comma-separated string from sqlite column into a python list """ return tags.split(',') def db_update_bookmark( source = defaultfile, bid = -1, url = "BAD", title = "BAD", uid = -1, tags = ["BAD"], notes = "BAD", iid = -1, timestamp = "BAD", order_id = -2, always_open_in_new_tab = -1, ): """Update a bookmark entry in the sqlite source. All fields are required.""" print(f"DEBUG(sqlite) db_update_bookmark: locals={locals()}") if "BAD" == url or "BAD" == title or ["BAD"] == tags or "BAD" == notes or "BAD" == timestamp or -1 == bid or -1 == uid or -1 == iid or -2 == order_id or -1 == always_open_in_new_tab: print("db_update_bookmark() needs all bookmark properties passed to it.") return -1 tags_str = _tags_to_db(tags) update_str = f""" UPDATE bookmarks SET url = '{url}', title = '{title}', uid = '{uid}', tags = '{tags_str}', notes = '{notes}', iid = '{iid}', timestamp = '{timestamp}', order_id = '{order_id}', always_open_in_new_tab = '{always_open_in_new_tab}' WHERE bid = '{bid}'; """ try: r = _execute(source,update_str) except e: print(f"Error! When trying to update bid {bid}, got error {e}") def db_add_user( source, name = "", always_open_in_new_tab = False ): print(f"DEBUG(sqlite): STUB for db_add_user") # return int(r) # should be new uid def db_get_all_users(source = defaultfile): """ Returns a list of users and their settings. """ from bws_db import User user_list = [] get_str = "SELECT uid,name,always_open_in_new_tab FROM users;" r = _execute(source,get_str) for i in r: print("User:",i) uid, name, aoint = i u = User(source, uid = uid, name = name, always_open_in_new_tab = aoint, _from_db = True ) user_list.append(u) return user_list