REFACTOR: Moved more functionality into database base type
This commit is contained in:
		| @@ -4,6 +4,7 @@ from models.category import Category | |||||||
|  |  | ||||||
| class CategoryController(DatabaseController): | class CategoryController(DatabaseController): | ||||||
|     FIELDS = ['id', 'name'] |     FIELDS = ['id', 'name'] | ||||||
|  |     TYPE = Category | ||||||
|  |  | ||||||
|     def __init__(self): |     def __init__(self): | ||||||
|         super().__init__() |         super().__init__() | ||||||
| @@ -12,49 +13,21 @@ class CategoryController(DatabaseController): | |||||||
|         params = [ |         params = [ | ||||||
|             category.name, |             category.name, | ||||||
|         ] |         ] | ||||||
|  |         query = "INSERT INTO Categories (name) VALUES (?)" | ||||||
|  |  | ||||||
|         self._conn.execute( |         self.do(query, params) | ||||||
|             "INSERT INTO Categories (name) VALUES (?)", |  | ||||||
|             params |  | ||||||
|         ) |  | ||||||
|         self._conn.commit() |  | ||||||
|  |  | ||||||
|     def read(self, id: int = 0) -> Category | None: |     def read(self, id: int = 0) -> Category | None: | ||||||
|         params = [ |         params = [ | ||||||
|             id |             id | ||||||
|         ] |         ] | ||||||
|  |         query = "SELECT * FROM Categories WHERE id = ?" | ||||||
|  |  | ||||||
|         cursor = self._conn.execute( |         return self.get_one(query, params) | ||||||
|             "SELECT * FROM Categories WHERE id = ?", |  | ||||||
|             params |  | ||||||
|         ) |  | ||||||
|         row = cursor.fetchone() |  | ||||||
|  |  | ||||||
|         if row is None: |  | ||||||
|             return None |  | ||||||
|  |  | ||||||
|         params = dict(zip(self.FIELDS, row)) |  | ||||||
|         obj = self.new_instance(Category, params) |  | ||||||
|  |  | ||||||
|         return obj |  | ||||||
|  |  | ||||||
|     def read_all(self) -> list[Category] | None: |     def read_all(self) -> list[Category] | None: | ||||||
|         cursor = self._conn.execute( |         query = "SELECT * FROM Categories" | ||||||
|             "SELECT * FROM Categories", |         return self.get_many(query, []) | ||||||
|         ) |  | ||||||
|         rows = cursor.fetchall() |  | ||||||
|  |  | ||||||
|         if rows is None: |  | ||||||
|             return None |  | ||||||
|  |  | ||||||
|         categories = list() |  | ||||||
|  |  | ||||||
|         for category in rows: |  | ||||||
|             params = dict(zip(self.FIELDS, category)) |  | ||||||
|             obj = self.new_instance(Category, params) |  | ||||||
|             categories.append(obj) |  | ||||||
|  |  | ||||||
|         return categories |  | ||||||
|  |  | ||||||
|     def update(self): |     def update(self): | ||||||
|         print("Doing work") |         print("Doing work") | ||||||
|   | |||||||
| @@ -8,7 +8,6 @@ class DatabaseController(ABC): | |||||||
|     """ Abstract Base Class to handle database access for each component |     """ Abstract Base Class to handle database access for each component | ||||||
|         in the web app |         in the web app | ||||||
|     """ |     """ | ||||||
|  |  | ||||||
|     __data_dir = "./data/" |     __data_dir = "./data/" | ||||||
|     __db_name = "wmgzon.db" |     __db_name = "wmgzon.db" | ||||||
|  |  | ||||||
| @@ -48,8 +47,64 @@ class DatabaseController(ABC): | |||||||
|             setattr(obj, attr, value) |             setattr(obj, attr, value) | ||||||
|         return obj |         return obj | ||||||
|  |  | ||||||
|  |     def do(self, query: str, params: list[str]): | ||||||
|  |         """ Function to run a query that returns no response """ | ||||||
|  |         self._conn.execute( | ||||||
|  |             query, | ||||||
|  |             params | ||||||
|  |         ) | ||||||
|  |         self._conn.commit() | ||||||
|  |  | ||||||
|  |     def get_one(self, query: str, params: list[str], type: type = None | ||||||
|  |                 ) -> type | None: | ||||||
|  |         """ Returns one item from the given query """ | ||||||
|  |         if type is None: | ||||||
|  |             type = self.TYPE | ||||||
|  |  | ||||||
|  |         cursor = self._conn.execute( | ||||||
|  |             query, | ||||||
|  |             params | ||||||
|  |         ) | ||||||
|  |         row = cursor.fetchone() | ||||||
|  |  | ||||||
|  |         if row is None: | ||||||
|  |             return None | ||||||
|  |  | ||||||
|  |         # Construct the row into a single object | ||||||
|  |         params = dict(zip(self.FIELDS, row)) | ||||||
|  |         obj = self.new_instance(type, params) | ||||||
|  |  | ||||||
|  |         return obj | ||||||
|  |  | ||||||
|  |     def get_many(self, query: str, params: list[str], type: type = None | ||||||
|  |                  ) -> list[type] | None: | ||||||
|  |         """ Returns all items matching the given query """ | ||||||
|  |         if type is None: | ||||||
|  |             type = self.TYPE | ||||||
|  |  | ||||||
|  |         cursor = self._conn.execute( | ||||||
|  |             query, | ||||||
|  |             params | ||||||
|  |         ) | ||||||
|  |         rows = cursor.fetchall() | ||||||
|  |  | ||||||
|  |         if rows is None or len(rows) == 0: | ||||||
|  |             return None | ||||||
|  |  | ||||||
|  |         objs = list() | ||||||
|  |  | ||||||
|  |         # Construct the row into a list of object | ||||||
|  |         for row in rows: | ||||||
|  |             params = dict(zip(self.FIELDS, row)) | ||||||
|  |             obj = self.new_instance(type, params) | ||||||
|  |             objs.append(obj) | ||||||
|  |  | ||||||
|  |         return objs | ||||||
|  |  | ||||||
|     """ |     """ | ||||||
|         Set of CRUD methods to allow for Data manipulation on the backend |         Set of CRUD methods to allow for Data manipulation on the backend | ||||||
|  |  | ||||||
|  |         These items MUST be implemented by anything inheriting from this class | ||||||
|     """ |     """ | ||||||
|  |  | ||||||
|     @abstractmethod |     @abstractmethod | ||||||
|   | |||||||
| @@ -5,6 +5,7 @@ from models.products.product import Product | |||||||
| class ProductController(DatabaseController): | class ProductController(DatabaseController): | ||||||
|     FIELDS = ['id', 'name', 'image', 'description', 'cost', |     FIELDS = ['id', 'name', 'image', 'description', 'cost', | ||||||
|               'sellerID', 'category', 'postedDate', 'quantityAvailable'] |               'sellerID', 'category', 'postedDate', 'quantityAvailable'] | ||||||
|  |     TYPE = Product | ||||||
|  |  | ||||||
|     def __init__(self): |     def __init__(self): | ||||||
|         super().__init__() |         super().__init__() | ||||||
| @@ -20,60 +21,30 @@ class ProductController(DatabaseController): | |||||||
|             product.postedDate, |             product.postedDate, | ||||||
|             product.quantityAvailable |             product.quantityAvailable | ||||||
|         ] |         ] | ||||||
|  |         query = """ | ||||||
|         self._conn.execute( |  | ||||||
|             """ |  | ||||||
|             INSERT INTO Products |             INSERT INTO Products | ||||||
|             (name, image, description, cost, categoryID, |             (name, image, description, cost, categoryID, | ||||||
|             sellerID, postedDate, quantityAvailable) |             sellerID, postedDate, quantityAvailable) | ||||||
|             VALUES (?, ?, ?, ?, ?, ?, ?, ?)""", |             VALUES (?, ?, ?, ?, ?, ?, ?, ?) | ||||||
|             params |         """ | ||||||
|         ) |  | ||||||
|         self._conn.commit() |         self.do(query, params) | ||||||
|  |  | ||||||
|     def read(self, name: str = "") -> list[Product] | None: |     def read(self, name: str = "") -> list[Product] | None: | ||||||
|         params = [ |         params = [ | ||||||
|             "%" + name + "%" |             "%" + name + "%" | ||||||
|         ] |         ] | ||||||
|  |         query = "SELECT * FROM Products WHERE name like ?" | ||||||
|  |  | ||||||
|         cursor = self._conn.execute( |         return self.get_many(query, params) | ||||||
|             "SELECT * FROM Products WHERE name like ?", |  | ||||||
|             params |  | ||||||
|         ) |  | ||||||
|         rows = cursor.fetchmany() |  | ||||||
|  |  | ||||||
|         if rows is None: |  | ||||||
|             return None |  | ||||||
|  |  | ||||||
|         products = list() |  | ||||||
|  |  | ||||||
|         # Create an object for each row |  | ||||||
|         for product in rows: |  | ||||||
|             params = dict(zip(self.FIELDS, product)) |  | ||||||
|             obj = self.new_instance(Product, params) |  | ||||||
|             products.append(obj) |  | ||||||
|  |  | ||||||
|         return products |  | ||||||
|  |  | ||||||
|     def read_id(self, id: int) -> Product | None: |     def read_id(self, id: int) -> Product | None: | ||||||
|         params = [ |         params = [ | ||||||
|             id |             id | ||||||
|         ] |         ] | ||||||
|  |         query = "SELECT * FROM Products WHERE id == ?" | ||||||
|  |  | ||||||
|         cursor = self._conn.execute( |         return self.get_one(query, params) | ||||||
|             "SELECT * FROM Products WHERE id == ?", |  | ||||||
|             params |  | ||||||
|         ) |  | ||||||
|         row = cursor.fetchone() |  | ||||||
|  |  | ||||||
|         if row is None: |  | ||||||
|             return None |  | ||||||
|  |  | ||||||
|         # Create an object with the row |  | ||||||
|         params = dict(zip(self.FIELDS, row)) |  | ||||||
|         obj = self.new_instance(Product, params) |  | ||||||
|  |  | ||||||
|         return obj |  | ||||||
|  |  | ||||||
|     def read_all(self, category: str = "", |     def read_all(self, category: str = "", | ||||||
|                  search_term: str = "") -> list[Product] | None: |                  search_term: str = "") -> list[Product] | None: | ||||||
| @@ -81,55 +52,25 @@ class ProductController(DatabaseController): | |||||||
|             "%" + category + "%", |             "%" + category + "%", | ||||||
|             "%" + search_term + "%" |             "%" + search_term + "%" | ||||||
|         ] |         ] | ||||||
|  |         query = """ | ||||||
|         cursor = self._conn.execute( |             SELECT * FROM Products | ||||||
|             """SELECT * FROM Products |  | ||||||
|             INNER JOIN Categories ON Products.categoryID = Categories.id |             INNER JOIN Categories ON Products.categoryID = Categories.id | ||||||
|             WHERE Categories.name LIKE ? |             WHERE Categories.name LIKE ? | ||||||
|             AND Products.name LIKE ? |             AND Products.name LIKE ? | ||||||
|             """, |         """ | ||||||
|             params |  | ||||||
|         ) |  | ||||||
|         rows = cursor.fetchall() |  | ||||||
|  |  | ||||||
|         if len(rows) == 0: |         return self.get_many(query, params) | ||||||
|             return None |  | ||||||
|  |  | ||||||
|         products = list() |  | ||||||
|  |  | ||||||
|         # Create an object for each row |  | ||||||
|         for product in rows: |  | ||||||
|             params = dict(zip(self.FIELDS, product)) |  | ||||||
|             obj = self.new_instance(Product, params) |  | ||||||
|             products.append(obj) |  | ||||||
|  |  | ||||||
|         return products |  | ||||||
|  |  | ||||||
|     def read_user(self, user_id: int) -> list[Product] | None: |     def read_user(self, user_id: int) -> list[Product] | None: | ||||||
|         params = [ |         params = [ | ||||||
|             user_id |             user_id | ||||||
|         ] |         ] | ||||||
|  |         query = """ | ||||||
|         cursor = self._conn.execute( |             SELECT * FROM Products | ||||||
|             """SELECT * FROM Products |  | ||||||
|             WHERE sellerID = ? |             WHERE sellerID = ? | ||||||
|             """, |         """ | ||||||
|             params |  | ||||||
|         ) |  | ||||||
|         rows = cursor.fetchall() |  | ||||||
|  |  | ||||||
|         if len(rows) == 0: |         return self.get_many(query, params) | ||||||
|             return None |  | ||||||
|  |  | ||||||
|         products = list() |  | ||||||
|  |  | ||||||
|         # Create an object for each row |  | ||||||
|         for product in rows: |  | ||||||
|             params = dict(zip(self.FIELDS, product)) |  | ||||||
|             obj = self.new_instance(Product, params) |  | ||||||
|             products.append(obj) |  | ||||||
|  |  | ||||||
|         return products |  | ||||||
|  |  | ||||||
|     def update(self, product: Product): |     def update(self, product: Product): | ||||||
|         params = [ |         params = [ | ||||||
| @@ -141,9 +82,8 @@ class ProductController(DatabaseController): | |||||||
|             product.category, |             product.category, | ||||||
|             product.id |             product.id | ||||||
|         ] |         ] | ||||||
|  |         query = """ | ||||||
|         cursor = self._conn.execute( |             UPDATE Products | ||||||
|             """UPDATE Products |  | ||||||
|             SET name = ?, |             SET name = ?, | ||||||
|                 description = ?, |                 description = ?, | ||||||
|                 image = ?, |                 image = ?, | ||||||
| @@ -151,10 +91,9 @@ class ProductController(DatabaseController): | |||||||
|                 quantityAvailable = ?, |                 quantityAvailable = ?, | ||||||
|                 categoryID = ? |                 categoryID = ? | ||||||
|             WHERE id = ? |             WHERE id = ? | ||||||
|             """, |         """ | ||||||
|             params |  | ||||||
|         ) |         self.do(query, params) | ||||||
|         self._conn.commit() |  | ||||||
|  |  | ||||||
|     def delete(self): |     def delete(self): | ||||||
|         print("Doing work") |         print("Doing work") | ||||||
|   | |||||||
| @@ -4,6 +4,7 @@ from models.stats import Stats | |||||||
|  |  | ||||||
| class StatsController(DatabaseController): | class StatsController(DatabaseController): | ||||||
|     FIELDS = ['id', 'userID', 'productID', 'viewDate'] |     FIELDS = ['id', 'userID', 'productID', 'viewDate'] | ||||||
|  |     TYPE = Stats | ||||||
|  |  | ||||||
|     def __init__(self): |     def __init__(self): | ||||||
|         super().__init__() |         super().__init__() | ||||||
| @@ -14,87 +15,39 @@ class StatsController(DatabaseController): | |||||||
|             view.productID, |             view.productID, | ||||||
|             view.viewDate |             view.viewDate | ||||||
|         ] |         ] | ||||||
|  |         query = """ | ||||||
|         self._conn.execute( |  | ||||||
|             """ |  | ||||||
|             INSERT INTO Views |             INSERT INTO Views | ||||||
|             (userID, productID, viewDate) |             (userID, productID, viewDate) | ||||||
|             VALUES (?, ?, ?)""", |             VALUES (?, ?, ?) | ||||||
|             params |         """ | ||||||
|         ) |  | ||||||
|         self._conn.commit() |         self.do(query, params) | ||||||
|  |  | ||||||
|     def read(self) -> list[Stats] | None: |     def read(self) -> list[Stats] | None: | ||||||
|  |         query = "SELECT * FROM Views" | ||||||
|         cursor = self._conn.execute( |         self.get_many(query, []) | ||||||
|             "SELECT * FROM Views", |  | ||||||
|         ) |  | ||||||
|         rows = cursor.fetchall() |  | ||||||
|  |  | ||||||
|         if rows is None: |  | ||||||
|             return None |  | ||||||
|  |  | ||||||
|         views = list() |  | ||||||
|  |  | ||||||
|         # Create an object for each row |  | ||||||
|         for view in rows: |  | ||||||
|             params = dict(zip(self.FIELDS, view)) |  | ||||||
|             obj = self.new_instance(Stats, params) |  | ||||||
|             views.append(obj) |  | ||||||
|  |  | ||||||
|         return views |  | ||||||
|  |  | ||||||
|     def read_product(self, product_id: int = 0) -> list[Stats] | None: |     def read_product(self, product_id: int = 0) -> list[Stats] | None: | ||||||
|         params = [ |         params = [ | ||||||
|             product_id |             product_id | ||||||
|         ] |         ] | ||||||
|  |         query = """ | ||||||
|         cursor = self._conn.execute( |             SELECT * FROM Views | ||||||
|             """SELECT * FROM Views |  | ||||||
|             WHERE productID = ? |             WHERE productID = ? | ||||||
|             """, |         """ | ||||||
|             params |  | ||||||
|         ) |  | ||||||
|         rows = cursor.fetchall() |  | ||||||
|  |  | ||||||
|         if len(rows) == 0: |         return self.get_many(query, params) | ||||||
|             return None |  | ||||||
|  |  | ||||||
|         views = list() |  | ||||||
|  |  | ||||||
|         # Create an object for each row |  | ||||||
|         for view in rows: |  | ||||||
|             params = dict(zip(self.FIELDS, view)) |  | ||||||
|             obj = self.new_instance(Stats, params) |  | ||||||
|             views.append(obj) |  | ||||||
|  |  | ||||||
|         return views |  | ||||||
|  |  | ||||||
|     def read_user(self, user_id: int) -> list[Stats] | None: |     def read_user(self, user_id: int) -> list[Stats] | None: | ||||||
|         params = [ |         params = [ | ||||||
|             user_id |             user_id | ||||||
|         ] |         ] | ||||||
|  |         query = """ | ||||||
|         cursor = self._conn.execute( |             SELECT * FROM Views | ||||||
|             """SELECT * FROM Views |  | ||||||
|             WHERE userID = ? |             WHERE userID = ? | ||||||
|             """, |         """ | ||||||
|             params |  | ||||||
|         ) |  | ||||||
|         rows = cursor.fetchall() |  | ||||||
|  |  | ||||||
|         if len(rows) == 0: |         self.get_many(query, params) | ||||||
|             return None |  | ||||||
|  |  | ||||||
|         views = list() |  | ||||||
|  |  | ||||||
|         # Create an object for each row |  | ||||||
|         for view in rows: |  | ||||||
|             params = dict(zip(self.FIELDS, view)) |  | ||||||
|             obj = self.new_instance(Stats, params) |  | ||||||
|             views.append(obj) |  | ||||||
|  |  | ||||||
|         return views |  | ||||||
|  |  | ||||||
|     def update(self): |     def update(self): | ||||||
|         print("Doing work") |         print("Doing work") | ||||||
|   | |||||||
| @@ -2,15 +2,33 @@ from .database import DatabaseController | |||||||
| from models.users.user import User | from models.users.user import User | ||||||
| from models.users.customer import Customer | from models.users.customer import Customer | ||||||
| from models.users.seller import Seller | from models.users.seller import Seller | ||||||
|  | from models.users.admin import Admin | ||||||
|  |  | ||||||
|  |  | ||||||
| class UserController(DatabaseController): | class UserController(DatabaseController): | ||||||
|     FIELDS = ['id', 'username', 'password', 'firstName', |     FIELDS = ['id', 'username', 'password', 'firstName', | ||||||
|               'lastName', 'email', 'phone', 'role'] |               'lastName', 'email', 'phone', 'role'] | ||||||
|  |     TYPE = User | ||||||
|  |  | ||||||
|     def __init__(self): |     def __init__(self): | ||||||
|         super().__init__() |         super().__init__() | ||||||
|  |  | ||||||
|  |     def convert_type(self, user: User | None | ||||||
|  |                      ) -> User | Customer | Seller | Admin | None: | ||||||
|  |         """ Function to convert a given user to their correct subtype """ | ||||||
|  |         if user is None: | ||||||
|  |             return None | ||||||
|  |  | ||||||
|  |         # Set the object to the correct type | ||||||
|  |         type = Customer | ||||||
|  |         if user.role == "Seller": | ||||||
|  |             type = Seller | ||||||
|  |         if user.role == "Admin": | ||||||
|  |             type = Admin | ||||||
|  |  | ||||||
|  |         obj = self.new_instance(type, user.__dict__) | ||||||
|  |         return obj | ||||||
|  |  | ||||||
|     def create(self, user: User): |     def create(self, user: User): | ||||||
|         params = [ |         params = [ | ||||||
|             user.username, |             user.username, | ||||||
| @@ -21,62 +39,29 @@ class UserController(DatabaseController): | |||||||
|             user.phone, |             user.phone, | ||||||
|             user.role |             user.role | ||||||
|         ] |         ] | ||||||
|  |         query = """ | ||||||
|         self._conn.execute( |             INSERT INTO Users | ||||||
|             """INSERT INTO Users |  | ||||||
|             (username, password, first_name, last_name, email, phone, role) |             (username, password, first_name, last_name, email, phone, role) | ||||||
|             VALUES (?, ?, ?, ?, ?, ?, ?)""", |             VALUES (?, ?, ?, ?, ?, ?, ?) | ||||||
|             params |         """ | ||||||
|         ) |  | ||||||
|         self._conn.commit() |         self.do(query, params) | ||||||
|  |  | ||||||
|     def read(self, username: str) -> User | None: |     def read(self, username: str) -> User | None: | ||||||
|         params = [ |         params = [ | ||||||
|             username |             username | ||||||
|         ] |         ] | ||||||
|  |         query = "SELECT * FROM Users WHERE Username = ?" | ||||||
|  |  | ||||||
|         cursor = self._conn.execute( |         return self.convert_type(self.get_one(query, params)) | ||||||
|             "SELECT * FROM Users WHERE Username = ?", |  | ||||||
|             params |  | ||||||
|         ) |  | ||||||
|         row = cursor.fetchone() |  | ||||||
|  |  | ||||||
|         if row is not None: |  | ||||||
|             params = dict(zip(self.FIELDS, row)) |  | ||||||
|  |  | ||||||
|             # Is user a seller |  | ||||||
|             type = Customer |  | ||||||
|             if row[7] == "Seller": |  | ||||||
|                 type = Seller |  | ||||||
|  |  | ||||||
|             obj = self.new_instance(type, params) |  | ||||||
|             return obj |  | ||||||
|  |  | ||||||
|         return None |  | ||||||
|  |  | ||||||
|     def read_id(self, id: int) -> User | None: |     def read_id(self, id: int) -> User | None: | ||||||
|         params = [ |         params = [ | ||||||
|             id |             id | ||||||
|         ] |         ] | ||||||
|  |         query = "SELECT * FROM Users WHERE id = ?" | ||||||
|  |  | ||||||
|         cursor = self._conn.execute( |         return self.convert_type(self.get_one(query, params)) | ||||||
|             "SELECT * FROM Users WHERE id = ?", |  | ||||||
|             params |  | ||||||
|         ) |  | ||||||
|         row = cursor.fetchone() |  | ||||||
|  |  | ||||||
|         if row is not None: |  | ||||||
|             params = dict(zip(self.FIELDS, row)) |  | ||||||
|  |  | ||||||
|             # Is user a seller |  | ||||||
|             type = Customer |  | ||||||
|             if row[7] == "Seller": |  | ||||||
|                 type = Seller |  | ||||||
|  |  | ||||||
|             obj = self.new_instance(type, params) |  | ||||||
|             return obj |  | ||||||
|  |  | ||||||
|         return None |  | ||||||
|  |  | ||||||
|     def update(self): |     def update(self): | ||||||
|         print("Doing work") |         print("Doing work") | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user