WMGZON/controllers/database/stats.py

126 lines
3.2 KiB
Python

from .database import DatabaseController
from models.stats import Stats
from datetime import datetime, timedelta
from utils.general_utils import is_within_x_days
class StatsController(DatabaseController):
FIELDS = ['id', 'userID', 'productID', 'viewDate']
TYPE = Stats
def __init__(self):
super().__init__()
def create(self, view: Stats):
params = [
view.userID,
view.productID,
view.viewDate
]
query = """
INSERT INTO Views
(userID, productID, viewDate)
VALUES (?, ?, ?)
"""
self.do(query, params)
def read(self) -> list[Stats] | None:
query = "SELECT * FROM Views"
self.get_many(query, [])
def read_product(self, product_id: int = 0) -> list[Stats] | None:
params = [
product_id
]
query = """
SELECT * FROM Views
WHERE productID = ?
"""
return self.get_many(query, params)
def read_user(self, user_id: int) -> list[Stats] | None:
params = [
user_id
]
query = """
SELECT * FROM Views
WHERE userID = ?
"""
return self.get_many(query, params)
def read_days(self, product_id: int, prev_days: int = 7
) -> dict[datetime, list[Stats]] | None:
""" Returns data from within the given number of days """
data = self.read_product(product_id)
# Filter the values to only be within X prev days
filtered_data = list()
if data is not None:
filtered_data = list(filter(
lambda d: is_within_x_days(d.viewDate, prev_days),
data
))
day_views: dict[str, list[Stats]] = dict()
for i in range(0, prev_days):
key = datetime.today() - timedelta(days=i)
day_views[key.strftime('%b-%d')] = list()
# Organise data into distinct
for view in filtered_data:
key = datetime.today() - (datetime.today() - view.viewDate)
day_views[key.strftime('%b-%d')].append(view)
return day_views
def read_product_views(self, id: int):
""" Returns the total number of views for a product """
params = [
id
]
# Total Views
query = """
SELECT count(id) FROM Views WHERE productID = ?
"""
return self.get_one(query, params, int)
def read_ranking(self, id: int):
params = [
id
]
query = """
SELECT
ranking
FROM
(
SELECT
p.id as id,
RANK() OVER (
PARTITION BY p.categoryID
ORDER BY COUNT(v.productID) DESC
)AS ranking
FROM
Products p
LEFT JOIN
Views v ON p.id = v.productID
GROUP BY
p.id, p.categoryID
)
WHERE id = ?
"""
return self.get_one(query, params, int)
def update(self):
print("Doing work")
def delete(self):
print("Doing work")