126 lines
3.2 KiB
Python
126 lines
3.2 KiB
Python
from .database import DatabaseController
|
|
from models.stats import Stats
|
|
from datetime import datetime, timedelta
|
|
from utils.general_utils import is_within_x_days
|
|
|
|
|
|
class StatsController(DatabaseController):
|
|
FIELDS = ['id', 'userID', 'productID', 'viewDate']
|
|
TYPE = Stats
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
|
|
def create(self, view: Stats):
|
|
params = [
|
|
view.userID,
|
|
view.productID,
|
|
view.viewDate
|
|
]
|
|
query = """
|
|
INSERT INTO Views
|
|
(userID, productID, viewDate)
|
|
VALUES (?, ?, ?)
|
|
"""
|
|
|
|
self.do(query, params)
|
|
|
|
def read(self) -> list[Stats] | None:
|
|
query = "SELECT * FROM Views"
|
|
self.get_many(query, [])
|
|
|
|
def read_product(self, product_id: int = 0) -> list[Stats] | None:
|
|
params = [
|
|
product_id
|
|
]
|
|
query = """
|
|
SELECT * FROM Views
|
|
WHERE productID = ?
|
|
"""
|
|
|
|
return self.get_many(query, params)
|
|
|
|
def read_user(self, user_id: int) -> list[Stats] | None:
|
|
params = [
|
|
user_id
|
|
]
|
|
query = """
|
|
SELECT * FROM Views
|
|
WHERE userID = ?
|
|
"""
|
|
|
|
return self.get_many(query, params)
|
|
|
|
def read_days(self, product_id: int, prev_days: int = 7
|
|
) -> dict[datetime, list[Stats]] | None:
|
|
""" Returns data from within the given number of days """
|
|
data = self.read_product(product_id)
|
|
|
|
# Filter the values to only be within X prev days
|
|
filtered_data = list()
|
|
if data is not None:
|
|
filtered_data = list(filter(
|
|
lambda d: is_within_x_days(d.viewDate, prev_days),
|
|
data
|
|
))
|
|
|
|
day_views: dict[str, list[Stats]] = dict()
|
|
|
|
for i in range(0, prev_days):
|
|
key = datetime.today() - timedelta(days=i)
|
|
day_views[key.strftime('%b-%d')] = list()
|
|
|
|
# Organise data into distinct
|
|
for view in filtered_data:
|
|
key = datetime.today() - (datetime.today() - view.viewDate)
|
|
day_views[key.strftime('%b-%d')].append(view)
|
|
|
|
return day_views
|
|
|
|
def read_product_views(self, id: int):
|
|
""" Returns the total number of views for a product """
|
|
params = [
|
|
id
|
|
]
|
|
|
|
# Total Views
|
|
query = """
|
|
SELECT count(id) FROM Views WHERE productID = ?
|
|
"""
|
|
|
|
return self.get_one(query, params, int)
|
|
|
|
def read_ranking(self, id: int):
|
|
params = [
|
|
id
|
|
]
|
|
|
|
query = """
|
|
SELECT
|
|
ranking
|
|
FROM
|
|
(
|
|
SELECT
|
|
p.id as id,
|
|
RANK() OVER (
|
|
PARTITION BY p.categoryID
|
|
ORDER BY COUNT(v.productID) DESC
|
|
)AS ranking
|
|
FROM
|
|
Products p
|
|
LEFT JOIN
|
|
Views v ON p.id = v.productID
|
|
GROUP BY
|
|
p.id, p.categoryID
|
|
)
|
|
WHERE id = ?
|
|
"""
|
|
|
|
return self.get_one(query, params, int)
|
|
|
|
def update(self):
|
|
print("Doing work")
|
|
|
|
def delete(self):
|
|
print("Doing work")
|