Updated line endings to be in line with UNIX style
This commit is contained in:
@ -1,63 +1,63 @@
|
||||
from .database import DatabaseController
|
||||
from models.category import Category
|
||||
|
||||
|
||||
class CategoryController(DatabaseController):
|
||||
FIELDS = ['id', 'name']
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
def create(self, category: Category):
|
||||
params = [
|
||||
category.name,
|
||||
]
|
||||
|
||||
self._conn.execute(
|
||||
"INSERT INTO Categories (name) VALUES (?)",
|
||||
params
|
||||
)
|
||||
self._conn.commit()
|
||||
|
||||
def read(self, id: int = 0) -> Category | None:
|
||||
params = [
|
||||
id
|
||||
]
|
||||
|
||||
cursor = self._conn.execute(
|
||||
"SELECT * FROM Categories WHERE id = ?",
|
||||
params
|
||||
)
|
||||
row = cursor.fetchone()
|
||||
|
||||
if row is None:
|
||||
return None
|
||||
|
||||
params = dict(zip(self.FIELDS, row))
|
||||
obj = self.new_instance(Category, params)
|
||||
|
||||
return obj
|
||||
|
||||
def read_all(self) -> list[Category] | None:
|
||||
cursor = self._conn.execute(
|
||||
"SELECT * FROM Categories",
|
||||
)
|
||||
rows = cursor.fetchall()
|
||||
|
||||
if rows is None:
|
||||
return None
|
||||
|
||||
categories = list()
|
||||
|
||||
for category in rows:
|
||||
params = dict(zip(self.FIELDS, category))
|
||||
obj = self.new_instance(Category, params)
|
||||
categories.append(obj)
|
||||
|
||||
return categories
|
||||
|
||||
def update(self):
|
||||
print("Doing work")
|
||||
|
||||
def delete(self):
|
||||
print("Doing work")
|
||||
from .database import DatabaseController
|
||||
from models.category import Category
|
||||
|
||||
|
||||
class CategoryController(DatabaseController):
|
||||
FIELDS = ['id', 'name']
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
def create(self, category: Category):
|
||||
params = [
|
||||
category.name,
|
||||
]
|
||||
|
||||
self._conn.execute(
|
||||
"INSERT INTO Categories (name) VALUES (?)",
|
||||
params
|
||||
)
|
||||
self._conn.commit()
|
||||
|
||||
def read(self, id: int = 0) -> Category | None:
|
||||
params = [
|
||||
id
|
||||
]
|
||||
|
||||
cursor = self._conn.execute(
|
||||
"SELECT * FROM Categories WHERE id = ?",
|
||||
params
|
||||
)
|
||||
row = cursor.fetchone()
|
||||
|
||||
if row is None:
|
||||
return None
|
||||
|
||||
params = dict(zip(self.FIELDS, row))
|
||||
obj = self.new_instance(Category, params)
|
||||
|
||||
return obj
|
||||
|
||||
def read_all(self) -> list[Category] | None:
|
||||
cursor = self._conn.execute(
|
||||
"SELECT * FROM Categories",
|
||||
)
|
||||
rows = cursor.fetchall()
|
||||
|
||||
if rows is None:
|
||||
return None
|
||||
|
||||
categories = list()
|
||||
|
||||
for category in rows:
|
||||
params = dict(zip(self.FIELDS, category))
|
||||
obj = self.new_instance(Category, params)
|
||||
categories.append(obj)
|
||||
|
||||
return categories
|
||||
|
||||
def update(self):
|
||||
print("Doing work")
|
||||
|
||||
def delete(self):
|
||||
print("Doing work")
|
||||
|
@ -1,81 +1,81 @@
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import Mapping, Any
|
||||
import sqlite3
|
||||
import os
|
||||
|
||||
|
||||
class DatabaseController(ABC):
|
||||
""" Abstract Base Class to handle database access for each component
|
||||
in the web app
|
||||
"""
|
||||
|
||||
__data_dir = "./data/"
|
||||
__db_name = "wmgzon.db"
|
||||
|
||||
# Use test file if necessary
|
||||
if os.environ.get("ENVIRON") == "test":
|
||||
__db_name = "test_" + __db_name
|
||||
|
||||
__sqlitefile = __data_dir + __db_name
|
||||
|
||||
def __init__(self):
|
||||
""" Initialises the object and creates a connection to the local
|
||||
DB on the server
|
||||
"""
|
||||
self._conn = None
|
||||
try:
|
||||
# Creates a connection and specifies a flag to parse all types
|
||||
# back down into Python declared types e.g. date & time
|
||||
self._conn = sqlite3.connect(
|
||||
self.__sqlitefile, detect_types=sqlite3.PARSE_DECLTYPES)
|
||||
except sqlite3.Error as e:
|
||||
# Close the connection if still open
|
||||
if self._conn:
|
||||
self._conn.close()
|
||||
print(e)
|
||||
|
||||
def __del__(self):
|
||||
""" Object Destructor which kills the connection to the database """
|
||||
if self._conn is not None:
|
||||
self._conn.close()
|
||||
|
||||
def new_instance(self, of: type, with_fields: Mapping[str, Any]):
|
||||
""" Takes a dictionary of fields and returns the object
|
||||
with those fields populated
|
||||
"""
|
||||
obj = of.__new__(of)
|
||||
for attr, value in with_fields.items():
|
||||
setattr(obj, attr, value)
|
||||
return obj
|
||||
|
||||
"""
|
||||
Set of CRUD methods to allow for Data manipulation on the backend
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def create(self):
|
||||
""" Abstract method used to create a new record of a given
|
||||
type within the database
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def read(self):
|
||||
""" Abstract method used to read a record of a given
|
||||
type from the database
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def update(self):
|
||||
""" Abstract method used to update a record of a given
|
||||
type within the database
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def delete(self):
|
||||
""" Abstract method used to delete record of a given
|
||||
type from the database
|
||||
"""
|
||||
pass
|
||||
from abc import ABC, abstractmethod
|
||||
from typing import Mapping, Any
|
||||
import sqlite3
|
||||
import os
|
||||
|
||||
|
||||
class DatabaseController(ABC):
|
||||
""" Abstract Base Class to handle database access for each component
|
||||
in the web app
|
||||
"""
|
||||
|
||||
__data_dir = "./data/"
|
||||
__db_name = "wmgzon.db"
|
||||
|
||||
# Use test file if necessary
|
||||
if os.environ.get("ENVIRON") == "test":
|
||||
__db_name = "test_" + __db_name
|
||||
|
||||
__sqlitefile = __data_dir + __db_name
|
||||
|
||||
def __init__(self):
|
||||
""" Initialises the object and creates a connection to the local
|
||||
DB on the server
|
||||
"""
|
||||
self._conn = None
|
||||
try:
|
||||
# Creates a connection and specifies a flag to parse all types
|
||||
# back down into Python declared types e.g. date & time
|
||||
self._conn = sqlite3.connect(
|
||||
self.__sqlitefile, detect_types=sqlite3.PARSE_DECLTYPES)
|
||||
except sqlite3.Error as e:
|
||||
# Close the connection if still open
|
||||
if self._conn:
|
||||
self._conn.close()
|
||||
print(e)
|
||||
|
||||
def __del__(self):
|
||||
""" Object Destructor which kills the connection to the database """
|
||||
if self._conn is not None:
|
||||
self._conn.close()
|
||||
|
||||
def new_instance(self, of: type, with_fields: Mapping[str, Any]):
|
||||
""" Takes a dictionary of fields and returns the object
|
||||
with those fields populated
|
||||
"""
|
||||
obj = of.__new__(of)
|
||||
for attr, value in with_fields.items():
|
||||
setattr(obj, attr, value)
|
||||
return obj
|
||||
|
||||
"""
|
||||
Set of CRUD methods to allow for Data manipulation on the backend
|
||||
"""
|
||||
|
||||
@abstractmethod
|
||||
def create(self):
|
||||
""" Abstract method used to create a new record of a given
|
||||
type within the database
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def read(self):
|
||||
""" Abstract method used to read a record of a given
|
||||
type from the database
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def update(self):
|
||||
""" Abstract method used to update a record of a given
|
||||
type within the database
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def delete(self):
|
||||
""" Abstract method used to delete record of a given
|
||||
type from the database
|
||||
"""
|
||||
pass
|
||||
|
@ -1,160 +1,160 @@
|
||||
from .database import DatabaseController
|
||||
from models.products.product import Product
|
||||
|
||||
|
||||
class ProductController(DatabaseController):
|
||||
FIELDS = ['id', 'name', 'image', 'description', 'cost',
|
||||
'sellerID', 'category', 'postedDate', 'quantityAvailable']
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
def create(self, product: Product):
|
||||
params = [
|
||||
product.name,
|
||||
product.image,
|
||||
product.description,
|
||||
product.cost,
|
||||
product.category,
|
||||
product.sellerID,
|
||||
product.postedDate,
|
||||
product.quantityAvailable
|
||||
]
|
||||
|
||||
self._conn.execute(
|
||||
"""
|
||||
INSERT INTO Products
|
||||
(name, image, description, cost, categoryID,
|
||||
sellerID, postedDate, quantityAvailable)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
|
||||
params
|
||||
)
|
||||
self._conn.commit()
|
||||
|
||||
def read(self, name: str = "") -> list[Product] | None:
|
||||
params = [
|
||||
"%" + name + "%"
|
||||
]
|
||||
|
||||
cursor = self._conn.execute(
|
||||
"SELECT * FROM Products WHERE name like ?",
|
||||
params
|
||||
)
|
||||
rows = cursor.fetchmany()
|
||||
|
||||
if rows is None:
|
||||
return None
|
||||
|
||||
products = list()
|
||||
|
||||
# Create an object for each row
|
||||
for product in rows:
|
||||
params = dict(zip(self.FIELDS, product))
|
||||
obj = self.new_instance(Product, params)
|
||||
products.append(obj)
|
||||
|
||||
return products
|
||||
|
||||
def read_id(self, id: int) -> Product | None:
|
||||
params = [
|
||||
id
|
||||
]
|
||||
|
||||
cursor = self._conn.execute(
|
||||
"SELECT * FROM Products WHERE id == ?",
|
||||
params
|
||||
)
|
||||
row = cursor.fetchone()
|
||||
|
||||
if row is None:
|
||||
return None
|
||||
|
||||
# Create an object with the row
|
||||
params = dict(zip(self.FIELDS, row))
|
||||
obj = self.new_instance(Product, params)
|
||||
|
||||
return obj
|
||||
|
||||
def read_all(self, category: str = "",
|
||||
search_term: str = "") -> list[Product] | None:
|
||||
params = [
|
||||
"%" + category + "%",
|
||||
"%" + search_term + "%"
|
||||
]
|
||||
|
||||
cursor = self._conn.execute(
|
||||
"""SELECT * FROM Products
|
||||
INNER JOIN Categories ON Products.categoryID = Categories.id
|
||||
WHERE Categories.name LIKE ?
|
||||
AND Products.name LIKE ?
|
||||
""",
|
||||
params
|
||||
)
|
||||
rows = cursor.fetchall()
|
||||
|
||||
if len(rows) == 0:
|
||||
return None
|
||||
|
||||
products = list()
|
||||
|
||||
# Create an object for each row
|
||||
for product in rows:
|
||||
params = dict(zip(self.FIELDS, product))
|
||||
obj = self.new_instance(Product, params)
|
||||
products.append(obj)
|
||||
|
||||
return products
|
||||
|
||||
def read_user(self, user_id: int) -> list[Product] | None:
|
||||
params = [
|
||||
user_id
|
||||
]
|
||||
|
||||
cursor = self._conn.execute(
|
||||
"""SELECT * FROM Products
|
||||
WHERE sellerID = ?
|
||||
""",
|
||||
params
|
||||
)
|
||||
rows = cursor.fetchall()
|
||||
|
||||
if len(rows) == 0:
|
||||
return None
|
||||
|
||||
products = list()
|
||||
|
||||
# Create an object for each row
|
||||
for product in rows:
|
||||
params = dict(zip(self.FIELDS, product))
|
||||
obj = self.new_instance(Product, params)
|
||||
products.append(obj)
|
||||
|
||||
return products
|
||||
|
||||
def update(self, product: Product):
|
||||
params = [
|
||||
product.name,
|
||||
product.description,
|
||||
product.image,
|
||||
product.cost,
|
||||
product.quantityAvailable,
|
||||
product.category,
|
||||
product.id
|
||||
]
|
||||
|
||||
cursor = self._conn.execute(
|
||||
"""UPDATE Products
|
||||
SET name = ?,
|
||||
description = ?,
|
||||
image = ?,
|
||||
cost = ?,
|
||||
quantityAvailable = ?,
|
||||
categoryID = ?
|
||||
WHERE id = ?
|
||||
""",
|
||||
params
|
||||
)
|
||||
self._conn.commit()
|
||||
|
||||
def delete(self):
|
||||
print("Doing work")
|
||||
from .database import DatabaseController
|
||||
from models.products.product import Product
|
||||
|
||||
|
||||
class ProductController(DatabaseController):
|
||||
FIELDS = ['id', 'name', 'image', 'description', 'cost',
|
||||
'sellerID', 'category', 'postedDate', 'quantityAvailable']
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
def create(self, product: Product):
|
||||
params = [
|
||||
product.name,
|
||||
product.image,
|
||||
product.description,
|
||||
product.cost,
|
||||
product.category,
|
||||
product.sellerID,
|
||||
product.postedDate,
|
||||
product.quantityAvailable
|
||||
]
|
||||
|
||||
self._conn.execute(
|
||||
"""
|
||||
INSERT INTO Products
|
||||
(name, image, description, cost, categoryID,
|
||||
sellerID, postedDate, quantityAvailable)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
|
||||
params
|
||||
)
|
||||
self._conn.commit()
|
||||
|
||||
def read(self, name: str = "") -> list[Product] | None:
|
||||
params = [
|
||||
"%" + name + "%"
|
||||
]
|
||||
|
||||
cursor = self._conn.execute(
|
||||
"SELECT * FROM Products WHERE name like ?",
|
||||
params
|
||||
)
|
||||
rows = cursor.fetchmany()
|
||||
|
||||
if rows is None:
|
||||
return None
|
||||
|
||||
products = list()
|
||||
|
||||
# Create an object for each row
|
||||
for product in rows:
|
||||
params = dict(zip(self.FIELDS, product))
|
||||
obj = self.new_instance(Product, params)
|
||||
products.append(obj)
|
||||
|
||||
return products
|
||||
|
||||
def read_id(self, id: int) -> Product | None:
|
||||
params = [
|
||||
id
|
||||
]
|
||||
|
||||
cursor = self._conn.execute(
|
||||
"SELECT * FROM Products WHERE id == ?",
|
||||
params
|
||||
)
|
||||
row = cursor.fetchone()
|
||||
|
||||
if row is None:
|
||||
return None
|
||||
|
||||
# Create an object with the row
|
||||
params = dict(zip(self.FIELDS, row))
|
||||
obj = self.new_instance(Product, params)
|
||||
|
||||
return obj
|
||||
|
||||
def read_all(self, category: str = "",
|
||||
search_term: str = "") -> list[Product] | None:
|
||||
params = [
|
||||
"%" + category + "%",
|
||||
"%" + search_term + "%"
|
||||
]
|
||||
|
||||
cursor = self._conn.execute(
|
||||
"""SELECT * FROM Products
|
||||
INNER JOIN Categories ON Products.categoryID = Categories.id
|
||||
WHERE Categories.name LIKE ?
|
||||
AND Products.name LIKE ?
|
||||
""",
|
||||
params
|
||||
)
|
||||
rows = cursor.fetchall()
|
||||
|
||||
if len(rows) == 0:
|
||||
return None
|
||||
|
||||
products = list()
|
||||
|
||||
# Create an object for each row
|
||||
for product in rows:
|
||||
params = dict(zip(self.FIELDS, product))
|
||||
obj = self.new_instance(Product, params)
|
||||
products.append(obj)
|
||||
|
||||
return products
|
||||
|
||||
def read_user(self, user_id: int) -> list[Product] | None:
|
||||
params = [
|
||||
user_id
|
||||
]
|
||||
|
||||
cursor = self._conn.execute(
|
||||
"""SELECT * FROM Products
|
||||
WHERE sellerID = ?
|
||||
""",
|
||||
params
|
||||
)
|
||||
rows = cursor.fetchall()
|
||||
|
||||
if len(rows) == 0:
|
||||
return None
|
||||
|
||||
products = list()
|
||||
|
||||
# Create an object for each row
|
||||
for product in rows:
|
||||
params = dict(zip(self.FIELDS, product))
|
||||
obj = self.new_instance(Product, params)
|
||||
products.append(obj)
|
||||
|
||||
return products
|
||||
|
||||
def update(self, product: Product):
|
||||
params = [
|
||||
product.name,
|
||||
product.description,
|
||||
product.image,
|
||||
product.cost,
|
||||
product.quantityAvailable,
|
||||
product.category,
|
||||
product.id
|
||||
]
|
||||
|
||||
cursor = self._conn.execute(
|
||||
"""UPDATE Products
|
||||
SET name = ?,
|
||||
description = ?,
|
||||
image = ?,
|
||||
cost = ?,
|
||||
quantityAvailable = ?,
|
||||
categoryID = ?
|
||||
WHERE id = ?
|
||||
""",
|
||||
params
|
||||
)
|
||||
self._conn.commit()
|
||||
|
||||
def delete(self):
|
||||
print("Doing work")
|
||||
|
@ -1,85 +1,85 @@
|
||||
from .database import DatabaseController
|
||||
from models.users.user import User
|
||||
from models.users.customer import Customer
|
||||
from models.users.seller import Seller
|
||||
|
||||
|
||||
class UserController(DatabaseController):
|
||||
FIELDS = ['id', 'username', 'password', 'firstName',
|
||||
'lastName', 'email', 'phone', 'role']
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
def create(self, user: User):
|
||||
params = [
|
||||
user.username,
|
||||
user.password,
|
||||
user.firstName,
|
||||
user.lastName,
|
||||
user.email,
|
||||
user.phone,
|
||||
user.role
|
||||
]
|
||||
|
||||
self._conn.execute(
|
||||
"""INSERT INTO Users
|
||||
(username, password, first_name, last_name, email, phone, role)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?)""",
|
||||
params
|
||||
)
|
||||
self._conn.commit()
|
||||
|
||||
def read(self, username: str) -> User | None:
|
||||
params = [
|
||||
username
|
||||
]
|
||||
|
||||
cursor = self._conn.execute(
|
||||
"SELECT * FROM Users WHERE Username = ?",
|
||||
params
|
||||
)
|
||||
row = cursor.fetchone()
|
||||
|
||||
if row is not None:
|
||||
params = dict(zip(self.FIELDS, row))
|
||||
|
||||
# Is user a seller
|
||||
type = Customer
|
||||
if row[7] == "Seller":
|
||||
type = Seller
|
||||
|
||||
obj = self.new_instance(type, params)
|
||||
return obj
|
||||
|
||||
return None
|
||||
|
||||
def read_id(self, id: int) -> User | None:
|
||||
params = [
|
||||
id
|
||||
]
|
||||
|
||||
cursor = self._conn.execute(
|
||||
"SELECT * FROM Users WHERE id = ?",
|
||||
params
|
||||
)
|
||||
row = cursor.fetchone()
|
||||
|
||||
if row is not None:
|
||||
params = dict(zip(self.FIELDS, row))
|
||||
|
||||
# Is user a seller
|
||||
type = Customer
|
||||
if row[7] == "Seller":
|
||||
type = Seller
|
||||
|
||||
obj = self.new_instance(type, params)
|
||||
return obj
|
||||
|
||||
return None
|
||||
|
||||
def update(self):
|
||||
print("Doing work")
|
||||
|
||||
def delete(self):
|
||||
print("Doing work")
|
||||
from .database import DatabaseController
|
||||
from models.users.user import User
|
||||
from models.users.customer import Customer
|
||||
from models.users.seller import Seller
|
||||
|
||||
|
||||
class UserController(DatabaseController):
|
||||
FIELDS = ['id', 'username', 'password', 'firstName',
|
||||
'lastName', 'email', 'phone', 'role']
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
|
||||
def create(self, user: User):
|
||||
params = [
|
||||
user.username,
|
||||
user.password,
|
||||
user.firstName,
|
||||
user.lastName,
|
||||
user.email,
|
||||
user.phone,
|
||||
user.role
|
||||
]
|
||||
|
||||
self._conn.execute(
|
||||
"""INSERT INTO Users
|
||||
(username, password, first_name, last_name, email, phone, role)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?)""",
|
||||
params
|
||||
)
|
||||
self._conn.commit()
|
||||
|
||||
def read(self, username: str) -> User | None:
|
||||
params = [
|
||||
username
|
||||
]
|
||||
|
||||
cursor = self._conn.execute(
|
||||
"SELECT * FROM Users WHERE Username = ?",
|
||||
params
|
||||
)
|
||||
row = cursor.fetchone()
|
||||
|
||||
if row is not None:
|
||||
params = dict(zip(self.FIELDS, row))
|
||||
|
||||
# Is user a seller
|
||||
type = Customer
|
||||
if row[7] == "Seller":
|
||||
type = Seller
|
||||
|
||||
obj = self.new_instance(type, params)
|
||||
return obj
|
||||
|
||||
return None
|
||||
|
||||
def read_id(self, id: int) -> User | None:
|
||||
params = [
|
||||
id
|
||||
]
|
||||
|
||||
cursor = self._conn.execute(
|
||||
"SELECT * FROM Users WHERE id = ?",
|
||||
params
|
||||
)
|
||||
row = cursor.fetchone()
|
||||
|
||||
if row is not None:
|
||||
params = dict(zip(self.FIELDS, row))
|
||||
|
||||
# Is user a seller
|
||||
type = Customer
|
||||
if row[7] == "Seller":
|
||||
type = Seller
|
||||
|
||||
obj = self.new_instance(type, params)
|
||||
return obj
|
||||
|
||||
return None
|
||||
|
||||
def update(self):
|
||||
print("Doing work")
|
||||
|
||||
def delete(self):
|
||||
print("Doing work")
|
||||
|
@ -1,34 +1,34 @@
|
||||
from models.users.user import User
|
||||
from controllers.database.user import UserController
|
||||
|
||||
from flask import redirect, Blueprint, session
|
||||
|
||||
from . import user
|
||||
from . import product
|
||||
|
||||
blueprint = Blueprint('main', __name__)
|
||||
|
||||
blueprint.register_blueprint(user.blueprint)
|
||||
blueprint.register_blueprint(product.blueprint)
|
||||
|
||||
|
||||
# CONTEXTS #
|
||||
|
||||
# Function that returns a given user class based on the ID in the session
|
||||
@blueprint.context_processor
|
||||
def get_user() -> dict[User | None]:
|
||||
# Get the user based on the user ID
|
||||
user_id = session.get('user_id')
|
||||
user = None
|
||||
|
||||
if user_id is not None:
|
||||
db = UserController()
|
||||
user = db.read_id(user_id)
|
||||
|
||||
return dict(user=user)
|
||||
|
||||
|
||||
# Function responsible for displaying the main landing page of the site
|
||||
@blueprint.route('/')
|
||||
def index():
|
||||
return redirect("/products")
|
||||
from models.users.user import User
|
||||
from controllers.database.user import UserController
|
||||
|
||||
from flask import redirect, Blueprint, session
|
||||
|
||||
from . import user
|
||||
from . import product
|
||||
|
||||
blueprint = Blueprint('main', __name__)
|
||||
|
||||
blueprint.register_blueprint(user.blueprint)
|
||||
blueprint.register_blueprint(product.blueprint)
|
||||
|
||||
|
||||
# CONTEXTS #
|
||||
|
||||
# Function that returns a given user class based on the ID in the session
|
||||
@blueprint.context_processor
|
||||
def get_user() -> dict[User | None]:
|
||||
# Get the user based on the user ID
|
||||
user_id = session.get('user_id')
|
||||
user = None
|
||||
|
||||
if user_id is not None:
|
||||
db = UserController()
|
||||
user = db.read_id(user_id)
|
||||
|
||||
return dict(user=user)
|
||||
|
||||
|
||||
# Function responsible for displaying the main landing page of the site
|
||||
@blueprint.route('/')
|
||||
def index():
|
||||
return redirect("/products")
|
||||
|
@ -1,188 +1,188 @@
|
||||
"""
|
||||
Product related endpoints. Included contexts for principles such as
|
||||
categories and image processing.
|
||||
"""
|
||||
|
||||
from flask import render_template, session, flash, request, redirect, Blueprint
|
||||
|
||||
from models.products.product import Product
|
||||
from controllers.database.product import ProductController
|
||||
from controllers.database.category import CategoryController
|
||||
from controllers.database.user import UserController
|
||||
|
||||
from datetime import datetime
|
||||
from utils.file_utils import allowed_file, save_image, remove_file
|
||||
from utils.user_utils import is_role
|
||||
|
||||
import pathlib
|
||||
import os
|
||||
|
||||
blueprint = Blueprint("products", __name__, url_prefix="/products")
|
||||
|
||||
|
||||
@blueprint.context_processor
|
||||
def category_list():
|
||||
""" Places a list of all categories in the products context """
|
||||
database = CategoryController()
|
||||
categories = database.read_all()
|
||||
return dict(categories=categories)
|
||||
|
||||
|
||||
@blueprint.route('/')
|
||||
def index():
|
||||
""" The front product page """
|
||||
# Returning an empty category acts the same
|
||||
# as a generic home page
|
||||
return category("")
|
||||
|
||||
|
||||
@blueprint.route('/<string:category>')
|
||||
def category(category: str):
|
||||
""" Loads a given categories page """
|
||||
database = ProductController()
|
||||
|
||||
# Check to see if there is a custome search term
|
||||
search_term = request.args.get("search", type=str)
|
||||
if search_term is not None:
|
||||
products = database.read_all(category, search_term)
|
||||
else:
|
||||
products = database.read_all(category)
|
||||
|
||||
# No Products visible
|
||||
if products is None:
|
||||
flash(
|
||||
f"No Products available. Try expanding your search criteria.",
|
||||
"warning"
|
||||
)
|
||||
|
||||
return render_template(
|
||||
'index.html',
|
||||
content="content.html",
|
||||
products=products,
|
||||
category=category
|
||||
)
|
||||
|
||||
|
||||
@blueprint.route('/<int:id>')
|
||||
def id(id: int):
|
||||
""" Loads a given product based on ID """
|
||||
db = ProductController()
|
||||
product = db.read_id(id)
|
||||
|
||||
# Check that a valid product was returned
|
||||
if product is None:
|
||||
flash(f"No Product available with id {id}", "warning")
|
||||
return redirect("/")
|
||||
|
||||
print(product.name)
|
||||
return render_template(
|
||||
'index.html',
|
||||
content='product.html',
|
||||
product=product
|
||||
)
|
||||
|
||||
|
||||
@blueprint.route('/add')
|
||||
def display_add_product():
|
||||
""" Launches the page to add a new product to the site """
|
||||
user_id = session.get('user_id')
|
||||
|
||||
# User needs to be logged in as a seller to view this page
|
||||
if not is_role("Seller"):
|
||||
flash("You must be logged in as a seller to view this page!", "error")
|
||||
return redirect("/")
|
||||
|
||||
return render_template('index.html', content='new_product.html')
|
||||
|
||||
|
||||
@blueprint.post('/add')
|
||||
def add_product():
|
||||
""" Server site processing to handle a request to add a
|
||||
new product to the site
|
||||
"""
|
||||
user_id = session.get('user_id')
|
||||
|
||||
# User needs to be logged in as a seller to view this page
|
||||
if not is_role("Seller"):
|
||||
flash("You must be logged in as a seller to view this page!", "error")
|
||||
return redirect("/")
|
||||
|
||||
file = request.files.get('image')
|
||||
image_filename = save_image(file)
|
||||
|
||||
product = Product(
|
||||
request.form.get('name'),
|
||||
image_filename if image_filename is not None else "",
|
||||
request.form.get('description'),
|
||||
request.form.get('cost'),
|
||||
request.form.get('category'),
|
||||
user_id,
|
||||
datetime.now(),
|
||||
request.form.get('quantity')
|
||||
)
|
||||
|
||||
db = ProductController()
|
||||
db.create(product)
|
||||
|
||||
return redirect('/products/ownproducts')
|
||||
|
||||
|
||||
@blueprint.post('/update/<int:id>')
|
||||
def update_product(id: int):
|
||||
""" Processes a request to update a product in place on the site """
|
||||
# Ensure that the product belongs to the current user
|
||||
user_id = session.get('user_id')
|
||||
|
||||
# User needs to be logged in as a seller to view this page
|
||||
if not is_role("Seller"):
|
||||
flash("You must be logged in as a seller to view this page!", "error")
|
||||
return redirect("/")
|
||||
|
||||
db = ProductController()
|
||||
product = db.read_id(id)
|
||||
|
||||
if product.sellerID != user_id:
|
||||
flash("This product does not belong to you!", "error")
|
||||
return redirect("/ownproducts")
|
||||
|
||||
# Save new image file
|
||||
file = request.files.get('image')
|
||||
new_image = save_image(file)
|
||||
|
||||
if new_image is not None:
|
||||
remove_file(os.path.join(os.environ.get('FILESTORE'), product.image))
|
||||
product.image = new_image
|
||||
|
||||
# Update product details
|
||||
product.name = request.form.get('name')
|
||||
product.description = request.form.get('description')
|
||||
product.category = request.form.get('category')
|
||||
product.cost = request.form.get('cost')
|
||||
product.quantityAvailable = request.form.get('quantity')
|
||||
|
||||
db.update(product)
|
||||
flash("Product successfully updated", 'notice')
|
||||
return redirect(f"/products/{product.id}")
|
||||
|
||||
|
||||
@blueprint.route('/ownproducts')
|
||||
def display_own_products():
|
||||
""" Display products owned by the currently logged in seller """
|
||||
user_id = session.get('user_id')
|
||||
|
||||
# User must be logged in as seller to view page
|
||||
if not is_role("Seller"):
|
||||
flash("You must be logged in as a seller to view this page!", "error")
|
||||
return redirect("/")
|
||||
|
||||
db = ProductController()
|
||||
products = db.read_user(user_id)
|
||||
|
||||
if products is None:
|
||||
flash("You don't currently have any products for sale.", "info")
|
||||
|
||||
return render_template(
|
||||
'index.html',
|
||||
content='content.html',
|
||||
products=products
|
||||
)
|
||||
"""
|
||||
Product related endpoints. Included contexts for principles such as
|
||||
categories and image processing.
|
||||
"""
|
||||
|
||||
from flask import render_template, session, flash, request, redirect, Blueprint
|
||||
|
||||
from models.products.product import Product
|
||||
from controllers.database.product import ProductController
|
||||
from controllers.database.category import CategoryController
|
||||
from controllers.database.user import UserController
|
||||
|
||||
from datetime import datetime
|
||||
from utils.file_utils import allowed_file, save_image, remove_file
|
||||
from utils.user_utils import is_role
|
||||
|
||||
import pathlib
|
||||
import os
|
||||
|
||||
blueprint = Blueprint("products", __name__, url_prefix="/products")
|
||||
|
||||
|
||||
@blueprint.context_processor
|
||||
def category_list():
|
||||
""" Places a list of all categories in the products context """
|
||||
database = CategoryController()
|
||||
categories = database.read_all()
|
||||
return dict(categories=categories)
|
||||
|
||||
|
||||
@blueprint.route('/')
|
||||
def index():
|
||||
""" The front product page """
|
||||
# Returning an empty category acts the same
|
||||
# as a generic home page
|
||||
return category("")
|
||||
|
||||
|
||||
@blueprint.route('/<string:category>')
|
||||
def category(category: str):
|
||||
""" Loads a given categories page """
|
||||
database = ProductController()
|
||||
|
||||
# Check to see if there is a custome search term
|
||||
search_term = request.args.get("search", type=str)
|
||||
if search_term is not None:
|
||||
products = database.read_all(category, search_term)
|
||||
else:
|
||||
products = database.read_all(category)
|
||||
|
||||
# No Products visible
|
||||
if products is None:
|
||||
flash(
|
||||
f"No Products available. Try expanding your search criteria.",
|
||||
"warning"
|
||||
)
|
||||
|
||||
return render_template(
|
||||
'index.html',
|
||||
content="content.html",
|
||||
products=products,
|
||||
category=category
|
||||
)
|
||||
|
||||
|
||||
@blueprint.route('/<int:id>')
|
||||
def id(id: int):
|
||||
""" Loads a given product based on ID """
|
||||
db = ProductController()
|
||||
product = db.read_id(id)
|
||||
|
||||
# Check that a valid product was returned
|
||||
if product is None:
|
||||
flash(f"No Product available with id {id}", "warning")
|
||||
return redirect("/")
|
||||
|
||||
print(product.name)
|
||||
return render_template(
|
||||
'index.html',
|
||||
content='product.html',
|
||||
product=product
|
||||
)
|
||||
|
||||
|
||||
@blueprint.route('/add')
|
||||
def display_add_product():
|
||||
""" Launches the page to add a new product to the site """
|
||||
user_id = session.get('user_id')
|
||||
|
||||
# User needs to be logged in as a seller to view this page
|
||||
if not is_role("Seller"):
|
||||
flash("You must be logged in as a seller to view this page!", "error")
|
||||
return redirect("/")
|
||||
|
||||
return render_template('index.html', content='new_product.html')
|
||||
|
||||
|
||||
@blueprint.post('/add')
|
||||
def add_product():
|
||||
""" Server site processing to handle a request to add a
|
||||
new product to the site
|
||||
"""
|
||||
user_id = session.get('user_id')
|
||||
|
||||
# User needs to be logged in as a seller to view this page
|
||||
if not is_role("Seller"):
|
||||
flash("You must be logged in as a seller to view this page!", "error")
|
||||
return redirect("/")
|
||||
|
||||
file = request.files.get('image')
|
||||
image_filename = save_image(file)
|
||||
|
||||
product = Product(
|
||||
request.form.get('name'),
|
||||
image_filename if image_filename is not None else "",
|
||||
request.form.get('description'),
|
||||
request.form.get('cost'),
|
||||
request.form.get('category'),
|
||||
user_id,
|
||||
datetime.now(),
|
||||
request.form.get('quantity')
|
||||
)
|
||||
|
||||
db = ProductController()
|
||||
db.create(product)
|
||||
|
||||
return redirect('/products/ownproducts')
|
||||
|
||||
|
||||
@blueprint.post('/update/<int:id>')
|
||||
def update_product(id: int):
|
||||
""" Processes a request to update a product in place on the site """
|
||||
# Ensure that the product belongs to the current user
|
||||
user_id = session.get('user_id')
|
||||
|
||||
# User needs to be logged in as a seller to view this page
|
||||
if not is_role("Seller"):
|
||||
flash("You must be logged in as a seller to view this page!", "error")
|
||||
return redirect("/")
|
||||
|
||||
db = ProductController()
|
||||
product = db.read_id(id)
|
||||
|
||||
if product.sellerID != user_id:
|
||||
flash("This product does not belong to you!", "error")
|
||||
return redirect("/ownproducts")
|
||||
|
||||
# Save new image file
|
||||
file = request.files.get('image')
|
||||
new_image = save_image(file)
|
||||
|
||||
if new_image is not None:
|
||||
remove_file(os.path.join(os.environ.get('FILESTORE'), product.image))
|
||||
product.image = new_image
|
||||
|
||||
# Update product details
|
||||
product.name = request.form.get('name')
|
||||
product.description = request.form.get('description')
|
||||
product.category = request.form.get('category')
|
||||
product.cost = request.form.get('cost')
|
||||
product.quantityAvailable = request.form.get('quantity')
|
||||
|
||||
db.update(product)
|
||||
flash("Product successfully updated", 'notice')
|
||||
return redirect(f"/products/{product.id}")
|
||||
|
||||
|
||||
@blueprint.route('/ownproducts')
|
||||
def display_own_products():
|
||||
""" Display products owned by the currently logged in seller """
|
||||
user_id = session.get('user_id')
|
||||
|
||||
# User must be logged in as seller to view page
|
||||
if not is_role("Seller"):
|
||||
flash("You must be logged in as a seller to view this page!", "error")
|
||||
return redirect("/")
|
||||
|
||||
db = ProductController()
|
||||
products = db.read_user(user_id)
|
||||
|
||||
if products is None:
|
||||
flash("You don't currently have any products for sale.", "info")
|
||||
|
||||
return render_template(
|
||||
'index.html',
|
||||
content='content.html',
|
||||
products=products
|
||||
)
|
||||
|
@ -1,99 +1,99 @@
|
||||
""" The user controller to manage all of the user related endpoints
|
||||
in the web app
|
||||
"""
|
||||
from flask import Blueprint
|
||||
|
||||
from flask import render_template, redirect, request, session, flash
|
||||
from controllers.database.user import UserController
|
||||
from models.users.user import User
|
||||
from models.users.customer import Customer
|
||||
from models.users.seller import Seller
|
||||
from hashlib import sha512
|
||||
|
||||
# Blueprint to append user endpoints to
|
||||
blueprint = Blueprint("users", __name__)
|
||||
|
||||
|
||||
# LOGIN FUNCTIONALITY
|
||||
@blueprint.route('/login')
|
||||
def display_login():
|
||||
""" Function responsible for delivering the Login page for the site """
|
||||
return render_template('index.html', content="login.html")
|
||||
|
||||
|
||||
@blueprint.post('/login')
|
||||
def login():
|
||||
""" Function to handle the backend processing of a login request """
|
||||
database = UserController()
|
||||
user = database.read(request.form['username'])
|
||||
error = None
|
||||
|
||||
# No user found
|
||||
if user is None:
|
||||
error = "No user found with the username " + request.form['username']
|
||||
flash(error, 'warning')
|
||||
return redirect("/login")
|
||||
|
||||
# Incorrect Password
|
||||
if sha512(request.form['password'].encode()).hexdigest() != user.password:
|
||||
error = "Incorrect Password"
|
||||
flash(error, 'warning')
|
||||
return redirect("/login")
|
||||
|
||||
session['user_id'] = user.id
|
||||
return redirect("/")
|
||||
|
||||
|
||||
# SIGNUP FUNCTIONALITY
|
||||
@blueprint.route('/signup')
|
||||
def display_signup():
|
||||
""" Function responsible for delivering the Signup page for the site """
|
||||
return render_template('index.html', content="signup.html")
|
||||
|
||||
|
||||
@blueprint.post('/signup')
|
||||
def signup():
|
||||
""" Function to handle the backend processing of a signup request """
|
||||
database = UserController()
|
||||
|
||||
# User already exists
|
||||
if database.read(request.form['username']) is not None:
|
||||
error = "User, " + request.form['username'] + " already exists"
|
||||
flash(error, 'warning')
|
||||
return redirect("/signup")
|
||||
|
||||
# Signup as Seller or Customer
|
||||
if request.form.get('seller'):
|
||||
user = Seller(
|
||||
request.form['username'],
|
||||
# Hashed as soon as it is recieved on the backend
|
||||
sha512(request.form['password'].encode()).hexdigest(),
|
||||
request.form['firstname'],
|
||||
request.form['lastname'],
|
||||
request.form['email'],
|
||||
"123"
|
||||
)
|
||||
else:
|
||||
user = Customer(
|
||||
request.form['username'],
|
||||
# Hashed as soon as it is recieved on the backend
|
||||
sha512(request.form['password'].encode()).hexdigest(),
|
||||
request.form['firstname'],
|
||||
request.form['lastname'],
|
||||
request.form['email'],
|
||||
"123"
|
||||
)
|
||||
|
||||
database.create(user)
|
||||
|
||||
# Code 307 Preserves the original request (POST)
|
||||
return redirect("/login", code=307)
|
||||
|
||||
|
||||
# SIGN OUT FUNCTIONALITY
|
||||
@blueprint.route('/logout')
|
||||
def logout():
|
||||
""" Function responsible for handling logouts from the site """
|
||||
# Clear the current user from the session if they are logged in
|
||||
session.pop('user_id', None)
|
||||
return redirect("/")
|
||||
""" The user controller to manage all of the user related endpoints
|
||||
in the web app
|
||||
"""
|
||||
from flask import Blueprint
|
||||
|
||||
from flask import render_template, redirect, request, session, flash
|
||||
from controllers.database.user import UserController
|
||||
from models.users.user import User
|
||||
from models.users.customer import Customer
|
||||
from models.users.seller import Seller
|
||||
from hashlib import sha512
|
||||
|
||||
# Blueprint to append user endpoints to
|
||||
blueprint = Blueprint("users", __name__)
|
||||
|
||||
|
||||
# LOGIN FUNCTIONALITY
|
||||
@blueprint.route('/login')
|
||||
def display_login():
|
||||
""" Function responsible for delivering the Login page for the site """
|
||||
return render_template('index.html', content="login.html")
|
||||
|
||||
|
||||
@blueprint.post('/login')
|
||||
def login():
|
||||
""" Function to handle the backend processing of a login request """
|
||||
database = UserController()
|
||||
user = database.read(request.form['username'])
|
||||
error = None
|
||||
|
||||
# No user found
|
||||
if user is None:
|
||||
error = "No user found with the username " + request.form['username']
|
||||
flash(error, 'warning')
|
||||
return redirect("/login")
|
||||
|
||||
# Incorrect Password
|
||||
if sha512(request.form['password'].encode()).hexdigest() != user.password:
|
||||
error = "Incorrect Password"
|
||||
flash(error, 'warning')
|
||||
return redirect("/login")
|
||||
|
||||
session['user_id'] = user.id
|
||||
return redirect("/")
|
||||
|
||||
|
||||
# SIGNUP FUNCTIONALITY
|
||||
@blueprint.route('/signup')
|
||||
def display_signup():
|
||||
""" Function responsible for delivering the Signup page for the site """
|
||||
return render_template('index.html', content="signup.html")
|
||||
|
||||
|
||||
@blueprint.post('/signup')
|
||||
def signup():
|
||||
""" Function to handle the backend processing of a signup request """
|
||||
database = UserController()
|
||||
|
||||
# User already exists
|
||||
if database.read(request.form['username']) is not None:
|
||||
error = "User, " + request.form['username'] + " already exists"
|
||||
flash(error, 'warning')
|
||||
return redirect("/signup")
|
||||
|
||||
# Signup as Seller or Customer
|
||||
if request.form.get('seller'):
|
||||
user = Seller(
|
||||
request.form['username'],
|
||||
# Hashed as soon as it is recieved on the backend
|
||||
sha512(request.form['password'].encode()).hexdigest(),
|
||||
request.form['firstname'],
|
||||
request.form['lastname'],
|
||||
request.form['email'],
|
||||
"123"
|
||||
)
|
||||
else:
|
||||
user = Customer(
|
||||
request.form['username'],
|
||||
# Hashed as soon as it is recieved on the backend
|
||||
sha512(request.form['password'].encode()).hexdigest(),
|
||||
request.form['firstname'],
|
||||
request.form['lastname'],
|
||||
request.form['email'],
|
||||
"123"
|
||||
)
|
||||
|
||||
database.create(user)
|
||||
|
||||
# Code 307 Preserves the original request (POST)
|
||||
return redirect("/login", code=307)
|
||||
|
||||
|
||||
# SIGN OUT FUNCTIONALITY
|
||||
@blueprint.route('/logout')
|
||||
def logout():
|
||||
""" Function responsible for handling logouts from the site """
|
||||
# Clear the current user from the session if they are logged in
|
||||
session.pop('user_id', None)
|
||||
return redirect("/")
|
||||
|
Reference in New Issue
Block a user