Coverage for database.py : 85%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import http
2import os
3import hashlib
4import hmac
5import datetime
6import uuid
8from pymongo import MongoClient
11class Database:
12 def __init__(self, test=False):
13 if test:
14 client = MongoClient("localhost")
15 self.db = client.test
16 self.db.templates.drop()
17 self.db.users.drop()
18 self.db.tests.drop()
19 self.db.logs.drop()
20 self.db.tokens.drop()
21 self.db.slides.drop()
22 else:
23 # client = MongoClient(os.environ.get("MONGODB_IP"),
24 # username=os.environ.get("MONGODB_USERNAME"),
25 # password=os.environ.get("MONGODB_PASSWORD"),
26 # authSource=os.environ.get("MONGODB_DBNAME"),
27 # authMechanism="SCRAM-SHA-1")
28 password = os.environ.get("MONGODB_PASSWORD")
29 client = MongoClient(
30 f"mongodb+srv://slidedeck-admin:{password}@slidedeck-cluster.uqekc.mongodb.net/myFirstDatabase?retryWrites=true&w=majority")
31 self.db = client.get_database("SlideDeck")
33 def check_user_exists(self, name):
34 count = self.db.users.count_documents({"name": name})
35 if count > 0:
36 return True
37 else:
38 return False
40 def create_user(self, name, password, role):
41 try:
42 salt = os.urandom(16)
43 password_hash = hashlib.pbkdf2_hmac("sha512", password.encode(), salt, 100000)
44 self.db.users.insert_one({
45 "name": name,
46 "hash": password_hash,
47 "salt": salt,
48 "id": uuid.uuid4().hex,
49 "role": role
50 })
51 return True
52 except Exception as e:
53 print(e)
54 return False
56 def check_password(self, name, password):
57 user = self.db.users.find_one({"name": name})
58 if not user:
59 return False, None
60 result = hmac.compare_digest(
61 user["hash"],
62 hashlib.pbkdf2_hmac("sha512", password.encode(), user["salt"], 100000)
63 )
64 return result, user if result else None
66 def create_token(self, expiration_length, user_id):
67 token = os.urandom(16)
68 self.db.tokens.insert_one(
69 {
70 "token": token,
71 "timeout": expiration_length,
72 "last_used": datetime.datetime.now(),
73 "user": user_id
74 }
75 )
76 return token.hex()
78 def check_token(self, token):
79 try:
80 token_obj = self.db.tokens.find_one({"token": bytes.fromhex(token)})
81 except ValueError:
82 return None
83 if token_obj is None:
84 return None
85 elif datetime.datetime.now() - token_obj["last_used"] < datetime.timedelta(seconds=token_obj["timeout"]):
86 self.db.tokens.update_one(
87 {"token": bytes.fromhex(token)},
88 {"$set": {"last_used": datetime.datetime.now()}}
89 )
90 return self.db.users.find_one({"id": token_obj["user"]})
91 else:
92 self.db.tokens.delete_one({"token": token})
93 return None
95 def create_template(self, name, columns):
96 template = {"name": name, "headers": {}}
97 now = datetime.datetime.now().isoformat()
98 template["created"] = now
99 template["touched"] = now
100 template["id"] = uuid.uuid4().hex
101 for column in columns:
102 template["headers"][column["title"]] = [column["type"], column["role"]]
103 try:
104 self.db.templates.insert_one(template)
105 return template
106 except:
107 return http.HTTPStatus.INTERNAL_SERVER_ERROR
109 def get_templates(self):
110 templates = []
111 for template in self.db.templates.find():
112 templates.append(template)
113 return templates
115 def create_test(self, name, template_id, fields):
116 test = {
117 "name": name,
118 "template": template_id,
119 "fields": fields,
120 }
121 now = datetime.datetime.now().isoformat()
122 test["created"] = now
123 test["touched"] = now
124 test["id"] = uuid.uuid4().hex
125 try:
126 self.db.tests.insert_one(test)
127 return True
128 except:
129 return False
131 def get_tests(self):
132 tests = []
133 for test in self.db.tests.find():
134 tests.append(test)
135 return tests
137 def create_log(self, template_id, test_id, name):
138 try:
139 log = {"id": uuid.uuid4().hex, "slides": [], "template": template_id}
140 test = self.db.tests.find_one({"id": test_id})
141 if name is None:
142 name = test["name"]
143 except Exception as e:
144 print(e)
145 return http.HTTPStatus.BAD_REQUEST
146 log["name"] = name
147 log["test"] = test_id
148 now = datetime.datetime.now().isoformat()
149 log["created"] = now
150 log["touched"] = now
151 self.db.logs.insert_one(log)
152 return log
154 def get_logs(self):
155 logs = []
156 for log in self.db.logs.find():
157 logs.append(log)
158 return logs
160 def get_log(self, log_id):
161 result = self.db.logs.find_one({"id": log_id})
162 if result is None:
163 return http.HTTPStatus.BAD_REQUEST
164 else:
165 return result
167 def post_slide(self, log_id, fields, submit, user):
168 log = self.db.logs.find_one({"id": log_id})
169 if log is None:
170 return None
171 else:
172 slide = {"id": uuid.uuid4().hex,
173 "log": log_id,
174 "submitted": submit,
175 "reviewed": {"date": None, "status": False}
176 }
177 try:
178 headers = self.db.templates.find_one({"id": log["template"]})["headers"]
179 valid_fields = {}
180 for key in fields.keys():
181 if headers[key][1] == user["role"]:
182 valid_fields[key] = fields[key]
183 slide["fields"] = valid_fields
184 now = datetime.datetime.now().isoformat()
185 slide["created"] = now
186 slide["touched"] = now
187 slide["users"] = [user["id"]]
188 except Exception as e:
189 print(e)
190 return None
191 log_slides = log["slides"]
192 log_slides.append(slide["id"])
193 self.db.logs.update_one({"id": log_id}, {"$set": {"slides": log_slides}})
194 self.db.slides.insert_one(slide)
195 return slide
197 def edit_slide(self, slide_id, fields, submit, user):
198 slide = self.db.slides.find_one({"id": slide_id})
199 if slide is None:
200 return http.HTTPStatus.BAD_REQUEST
201 else:
202 if not slide["submitted"] and submit:
203 self.db.slides.update_one({"id": slide_id}, {"$set": {"submit": submit}})
204 try:
205 log = self.db.logs.find_one({"id": slide["log"]})
206 headers = self.db.templates.find_one({"id": log["template"]})["headers"]
207 valid_fields = {}
208 for key in fields.keys():
209 if headers[key][1] == user["role"]:
210 valid_fields[key] = fields[key]
211 self.db.slides.update_one({"id": slide_id}, {"$set": {"fields": valid_fields}})
212 return True
213 except:
214 return False
216 def get_slide(self, slide_id):
217 slide = self.db.slides.find_one({"id": slide_id})
218 if slide is None:
219 return http.HTTPStatus.BAD_REQUEST
220 else:
221 return slide
223 def get_slides(self, log_id):
224 log = self.db.logs.find_one({"id": log_id})
225 slide_ids = log["slides"]
226 slides = []
227 for slide_id in slide_ids:
228 slide = self.db.slides.find_one({"id": slide_id})
229 if slide:
230 slides.append(slide)
231 return slides
233 def delete_template(self, template_id):
234 log_count = self.db.logs.count_documents({"template": template_id})
235 if log_count == 0:
236 self.db.templates.delete_one({"id": template_id})
237 return True
238 else:
239 return False