From e45ec4b217945dc08437c7be84a91005def240db Mon Sep 17 00:00:00 2001 From: Luke Else Date: Sun, 4 Feb 2024 23:47:00 +0000 Subject: [PATCH] REFACTOR: Moved more functionality into database base type --- controllers/database/category.py | 41 ++---------- controllers/database/database.py | 57 +++++++++++++++- controllers/database/product.py | 107 +++++++------------------------ controllers/database/stats.py | 79 +++++------------------ controllers/database/user.py | 71 ++++++++------------ 5 files changed, 130 insertions(+), 225 deletions(-) diff --git a/controllers/database/category.py b/controllers/database/category.py index ab02aad..7c70478 100644 --- a/controllers/database/category.py +++ b/controllers/database/category.py @@ -4,6 +4,7 @@ from models.category import Category class CategoryController(DatabaseController): FIELDS = ['id', 'name'] + TYPE = Category def __init__(self): super().__init__() @@ -12,49 +13,21 @@ class CategoryController(DatabaseController): params = [ category.name, ] + query = "INSERT INTO Categories (name) VALUES (?)" - self._conn.execute( - "INSERT INTO Categories (name) VALUES (?)", - params - ) - self._conn.commit() + self.do(query, params) def read(self, id: int = 0) -> Category | None: params = [ id ] + query = "SELECT * FROM Categories WHERE id = ?" - cursor = self._conn.execute( - "SELECT * FROM Categories WHERE id = ?", - params - ) - row = cursor.fetchone() - - if row is None: - return None - - params = dict(zip(self.FIELDS, row)) - obj = self.new_instance(Category, params) - - return obj + return self.get_one(query, params) def read_all(self) -> list[Category] | None: - cursor = self._conn.execute( - "SELECT * FROM Categories", - ) - rows = cursor.fetchall() - - if rows is None: - return None - - categories = list() - - for category in rows: - params = dict(zip(self.FIELDS, category)) - obj = self.new_instance(Category, params) - categories.append(obj) - - return categories + query = "SELECT * FROM Categories" + return self.get_many(query, []) def update(self): print("Doing work") diff --git a/controllers/database/database.py b/controllers/database/database.py index f4c76ca..2e17940 100644 --- a/controllers/database/database.py +++ b/controllers/database/database.py @@ -8,7 +8,6 @@ class DatabaseController(ABC): """ Abstract Base Class to handle database access for each component in the web app """ - __data_dir = "./data/" __db_name = "wmgzon.db" @@ -48,8 +47,64 @@ class DatabaseController(ABC): setattr(obj, attr, value) return obj + def do(self, query: str, params: list[str]): + """ Function to run a query that returns no response """ + self._conn.execute( + query, + params + ) + self._conn.commit() + + def get_one(self, query: str, params: list[str], type: type = None + ) -> type | None: + """ Returns one item from the given query """ + if type is None: + type = self.TYPE + + cursor = self._conn.execute( + query, + params + ) + row = cursor.fetchone() + + if row is None: + return None + + # Construct the row into a single object + params = dict(zip(self.FIELDS, row)) + obj = self.new_instance(type, params) + + return obj + + def get_many(self, query: str, params: list[str], type: type = None + ) -> list[type] | None: + """ Returns all items matching the given query """ + if type is None: + type = self.TYPE + + cursor = self._conn.execute( + query, + params + ) + rows = cursor.fetchall() + + if rows is None or len(rows) == 0: + return None + + objs = list() + + # Construct the row into a list of object + for row in rows: + params = dict(zip(self.FIELDS, row)) + obj = self.new_instance(type, params) + objs.append(obj) + + return objs + """ Set of CRUD methods to allow for Data manipulation on the backend + + These items MUST be implemented by anything inheriting from this class """ @abstractmethod diff --git a/controllers/database/product.py b/controllers/database/product.py index 469f49a..b11c979 100644 --- a/controllers/database/product.py +++ b/controllers/database/product.py @@ -5,6 +5,7 @@ from models.products.product import Product class ProductController(DatabaseController): FIELDS = ['id', 'name', 'image', 'description', 'cost', 'sellerID', 'category', 'postedDate', 'quantityAvailable'] + TYPE = Product def __init__(self): super().__init__() @@ -20,60 +21,30 @@ class ProductController(DatabaseController): product.postedDate, product.quantityAvailable ] - - self._conn.execute( - """ + query = """ INSERT INTO Products (name, image, description, cost, categoryID, sellerID, postedDate, quantityAvailable) - VALUES (?, ?, ?, ?, ?, ?, ?, ?)""", - params - ) - self._conn.commit() + VALUES (?, ?, ?, ?, ?, ?, ?, ?) + """ + + self.do(query, params) def read(self, name: str = "") -> list[Product] | None: params = [ "%" + name + "%" ] + query = "SELECT * FROM Products WHERE name like ?" - cursor = self._conn.execute( - "SELECT * FROM Products WHERE name like ?", - params - ) - rows = cursor.fetchmany() - - if rows is None: - return None - - products = list() - - # Create an object for each row - for product in rows: - params = dict(zip(self.FIELDS, product)) - obj = self.new_instance(Product, params) - products.append(obj) - - return products + return self.get_many(query, params) def read_id(self, id: int) -> Product | None: params = [ id ] + query = "SELECT * FROM Products WHERE id == ?" - cursor = self._conn.execute( - "SELECT * FROM Products WHERE id == ?", - params - ) - row = cursor.fetchone() - - if row is None: - return None - - # Create an object with the row - params = dict(zip(self.FIELDS, row)) - obj = self.new_instance(Product, params) - - return obj + return self.get_one(query, params) def read_all(self, category: str = "", search_term: str = "") -> list[Product] | None: @@ -81,55 +52,25 @@ class ProductController(DatabaseController): "%" + category + "%", "%" + search_term + "%" ] - - cursor = self._conn.execute( - """SELECT * FROM Products + query = """ + SELECT * FROM Products INNER JOIN Categories ON Products.categoryID = Categories.id WHERE Categories.name LIKE ? AND Products.name LIKE ? - """, - params - ) - rows = cursor.fetchall() + """ - if len(rows) == 0: - return None - - products = list() - - # Create an object for each row - for product in rows: - params = dict(zip(self.FIELDS, product)) - obj = self.new_instance(Product, params) - products.append(obj) - - return products + return self.get_many(query, params) def read_user(self, user_id: int) -> list[Product] | None: params = [ user_id ] - - cursor = self._conn.execute( - """SELECT * FROM Products + query = """ + SELECT * FROM Products WHERE sellerID = ? - """, - params - ) - rows = cursor.fetchall() + """ - if len(rows) == 0: - return None - - products = list() - - # Create an object for each row - for product in rows: - params = dict(zip(self.FIELDS, product)) - obj = self.new_instance(Product, params) - products.append(obj) - - return products + return self.get_many(query, params) def update(self, product: Product): params = [ @@ -141,9 +82,8 @@ class ProductController(DatabaseController): product.category, product.id ] - - cursor = self._conn.execute( - """UPDATE Products + query = """ + UPDATE Products SET name = ?, description = ?, image = ?, @@ -151,10 +91,9 @@ class ProductController(DatabaseController): quantityAvailable = ?, categoryID = ? WHERE id = ? - """, - params - ) - self._conn.commit() + """ + + self.do(query, params) def delete(self): print("Doing work") diff --git a/controllers/database/stats.py b/controllers/database/stats.py index c8ab8d3..18f6fe6 100644 --- a/controllers/database/stats.py +++ b/controllers/database/stats.py @@ -4,6 +4,7 @@ from models.stats import Stats class StatsController(DatabaseController): FIELDS = ['id', 'userID', 'productID', 'viewDate'] + TYPE = Stats def __init__(self): super().__init__() @@ -14,87 +15,39 @@ class StatsController(DatabaseController): view.productID, view.viewDate ] - - self._conn.execute( - """ + query = """ INSERT INTO Views (userID, productID, viewDate) - VALUES (?, ?, ?)""", - params - ) - self._conn.commit() + VALUES (?, ?, ?) + """ + + self.do(query, params) def read(self) -> list[Stats] | None: - - cursor = self._conn.execute( - "SELECT * FROM Views", - ) - rows = cursor.fetchall() - - if rows is None: - return None - - views = list() - - # Create an object for each row - for view in rows: - params = dict(zip(self.FIELDS, view)) - obj = self.new_instance(Stats, params) - views.append(obj) - - return views + query = "SELECT * FROM Views" + self.get_many(query, []) def read_product(self, product_id: int = 0) -> list[Stats] | None: params = [ product_id ] - - cursor = self._conn.execute( - """SELECT * FROM Views + query = """ + SELECT * FROM Views WHERE productID = ? - """, - params - ) - rows = cursor.fetchall() + """ - if len(rows) == 0: - return None - - views = list() - - # Create an object for each row - for view in rows: - params = dict(zip(self.FIELDS, view)) - obj = self.new_instance(Stats, params) - views.append(obj) - - return views + return self.get_many(query, params) def read_user(self, user_id: int) -> list[Stats] | None: params = [ user_id ] - - cursor = self._conn.execute( - """SELECT * FROM Views + query = """ + SELECT * FROM Views WHERE userID = ? - """, - params - ) - rows = cursor.fetchall() + """ - if len(rows) == 0: - return None - - views = list() - - # Create an object for each row - for view in rows: - params = dict(zip(self.FIELDS, view)) - obj = self.new_instance(Stats, params) - views.append(obj) - - return views + self.get_many(query, params) def update(self): print("Doing work") diff --git a/controllers/database/user.py b/controllers/database/user.py index 6f0c09d..ab8c54c 100644 --- a/controllers/database/user.py +++ b/controllers/database/user.py @@ -2,15 +2,33 @@ from .database import DatabaseController from models.users.user import User from models.users.customer import Customer from models.users.seller import Seller +from models.users.admin import Admin class UserController(DatabaseController): FIELDS = ['id', 'username', 'password', 'firstName', 'lastName', 'email', 'phone', 'role'] + TYPE = User def __init__(self): super().__init__() + def convert_type(self, user: User | None + ) -> User | Customer | Seller | Admin | None: + """ Function to convert a given user to their correct subtype """ + if user is None: + return None + + # Set the object to the correct type + type = Customer + if user.role == "Seller": + type = Seller + if user.role == "Admin": + type = Admin + + obj = self.new_instance(type, user.__dict__) + return obj + def create(self, user: User): params = [ user.username, @@ -21,62 +39,29 @@ class UserController(DatabaseController): user.phone, user.role ] - - self._conn.execute( - """INSERT INTO Users + query = """ + INSERT INTO Users (username, password, first_name, last_name, email, phone, role) - VALUES (?, ?, ?, ?, ?, ?, ?)""", - params - ) - self._conn.commit() + VALUES (?, ?, ?, ?, ?, ?, ?) + """ + + self.do(query, params) def read(self, username: str) -> User | None: params = [ username ] + query = "SELECT * FROM Users WHERE Username = ?" - cursor = self._conn.execute( - "SELECT * FROM Users WHERE Username = ?", - params - ) - row = cursor.fetchone() - - if row is not None: - params = dict(zip(self.FIELDS, row)) - - # Is user a seller - type = Customer - if row[7] == "Seller": - type = Seller - - obj = self.new_instance(type, params) - return obj - - return None + return self.convert_type(self.get_one(query, params)) def read_id(self, id: int) -> User | None: params = [ id ] + query = "SELECT * FROM Users WHERE id = ?" - cursor = self._conn.execute( - "SELECT * FROM Users WHERE id = ?", - params - ) - row = cursor.fetchone() - - if row is not None: - params = dict(zip(self.FIELDS, row)) - - # Is user a seller - type = Customer - if row[7] == "Seller": - type = Seller - - obj = self.new_instance(type, params) - return obj - - return None + return self.convert_type(self.get_one(query, params)) def update(self): print("Doing work")