Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1import http 

2import os 

3import hashlib 

4import hmac 

5import datetime 

6import uuid 

7 

8from pymongo import MongoClient 

9 

10 

11class Database: 

12 def __init__(self, test=False): 

13 if test: 

14 client = MongoClient("localhost") 

15 self.db = client.test 

16 self.db.templates.drop() 

17 self.db.users.drop() 

18 self.db.tests.drop() 

19 self.db.logs.drop() 

20 self.db.tokens.drop() 

21 self.db.slides.drop() 

22 else: 

23 # client = MongoClient(os.environ.get("MONGODB_IP"), 

24 # username=os.environ.get("MONGODB_USERNAME"), 

25 # password=os.environ.get("MONGODB_PASSWORD"), 

26 # authSource=os.environ.get("MONGODB_DBNAME"), 

27 # authMechanism="SCRAM-SHA-1") 

28 password = os.environ.get("MONGODB_PASSWORD") 

29 client = MongoClient( 

30 f"mongodb+srv://slidedeck-admin:{password}@slidedeck-cluster.uqekc.mongodb.net/myFirstDatabase?retryWrites=true&w=majority") 

31 self.db = client.get_database("SlideDeck") 

32 

33 def check_user_exists(self, name): 

34 count = self.db.users.count_documents({"name": name}) 

35 if count > 0: 

36 return True 

37 else: 

38 return False 

39 

40 def create_user(self, name, password, role): 

41 try: 

42 salt = os.urandom(16) 

43 password_hash = hashlib.pbkdf2_hmac("sha512", password.encode(), salt, 100000) 

44 self.db.users.insert_one({ 

45 "name": name, 

46 "hash": password_hash, 

47 "salt": salt, 

48 "id": uuid.uuid4().hex, 

49 "role": role 

50 }) 

51 return True 

52 except Exception as e: 

53 print(e) 

54 return False 

55 

56 def check_password(self, name, password): 

57 user = self.db.users.find_one({"name": name}) 

58 if not user: 

59 return False, None 

60 result = hmac.compare_digest( 

61 user["hash"], 

62 hashlib.pbkdf2_hmac("sha512", password.encode(), user["salt"], 100000) 

63 ) 

64 return result, user if result else None 

65 

66 def create_token(self, expiration_length, user_id): 

67 token = os.urandom(16) 

68 self.db.tokens.insert_one( 

69 { 

70 "token": token, 

71 "timeout": expiration_length, 

72 "last_used": datetime.datetime.now(), 

73 "user": user_id 

74 } 

75 ) 

76 return token.hex() 

77 

78 def check_token(self, token): 

79 try: 

80 token_obj = self.db.tokens.find_one({"token": bytes.fromhex(token)}) 

81 except ValueError: 

82 return None 

83 if token_obj is None: 

84 return None 

85 elif datetime.datetime.now() - token_obj["last_used"] < datetime.timedelta(seconds=token_obj["timeout"]): 

86 self.db.tokens.update_one( 

87 {"token": bytes.fromhex(token)}, 

88 {"$set": {"last_used": datetime.datetime.now()}} 

89 ) 

90 return self.db.users.find_one({"id": token_obj["user"]}) 

91 else: 

92 self.db.tokens.delete_one({"token": token}) 

93 return None 

94 

95 def create_template(self, name, columns): 

96 template = {"name": name, "headers": {}} 

97 now = datetime.datetime.now().isoformat() 

98 template["created"] = now 

99 template["touched"] = now 

100 template["id"] = uuid.uuid4().hex 

101 for column in columns: 

102 template["headers"][column["title"]] = [column["type"], column["role"]] 

103 try: 

104 self.db.templates.insert_one(template) 

105 return template 

106 except: 

107 return http.HTTPStatus.INTERNAL_SERVER_ERROR 

108 

109 def get_templates(self): 

110 templates = [] 

111 for template in self.db.templates.find(): 

112 templates.append(template) 

113 return templates 

114 

115 def create_test(self, name, template_id, fields): 

116 test = { 

117 "name": name, 

118 "template": template_id, 

119 "fields": fields, 

120 } 

121 now = datetime.datetime.now().isoformat() 

122 test["created"] = now 

123 test["touched"] = now 

124 test["id"] = uuid.uuid4().hex 

125 try: 

126 self.db.tests.insert_one(test) 

127 return True 

128 except: 

129 return False 

130 

131 def get_tests(self): 

132 tests = [] 

133 for test in self.db.tests.find(): 

134 tests.append(test) 

135 return tests 

136 

137 def create_log(self, template_id, test_id, name): 

138 try: 

139 log = {"id": uuid.uuid4().hex, "slides": [], "template": template_id} 

140 test = self.db.tests.find_one({"id": test_id}) 

141 if name is None: 

142 name = test["name"] 

143 except Exception as e: 

144 print(e) 

145 return http.HTTPStatus.BAD_REQUEST 

146 log["name"] = name 

147 log["test"] = test_id 

148 now = datetime.datetime.now().isoformat() 

149 log["created"] = now 

150 log["touched"] = now 

151 self.db.logs.insert_one(log) 

152 return log 

153 

154 def get_logs(self): 

155 logs = [] 

156 for log in self.db.logs.find(): 

157 logs.append(log) 

158 return logs 

159 

160 def get_log(self, log_id): 

161 result = self.db.logs.find_one({"id": log_id}) 

162 if result is None: 

163 return http.HTTPStatus.BAD_REQUEST 

164 else: 

165 return result 

166 

167 def post_slide(self, log_id, fields, submit, user): 

168 log = self.db.logs.find_one({"id": log_id}) 

169 if log is None: 

170 return None 

171 else: 

172 slide = {"id": uuid.uuid4().hex, 

173 "log": log_id, 

174 "submitted": submit, 

175 "reviewed": {"date": None, "status": False} 

176 } 

177 try: 

178 headers = self.db.templates.find_one({"id": log["template"]})["headers"] 

179 valid_fields = {} 

180 for key in fields.keys(): 

181 if headers[key][1] == user["role"]: 

182 valid_fields[key] = fields[key] 

183 slide["fields"] = valid_fields 

184 now = datetime.datetime.now().isoformat() 

185 slide["created"] = now 

186 slide["touched"] = now 

187 slide["users"] = [user["id"]] 

188 except Exception as e: 

189 print(e) 

190 return None 

191 log_slides = log["slides"] 

192 log_slides.append(slide["id"]) 

193 self.db.logs.update_one({"id": log_id}, {"$set": {"slides": log_slides}}) 

194 self.db.slides.insert_one(slide) 

195 return slide 

196 

197 def edit_slide(self, slide_id, fields, submit, user): 

198 slide = self.db.slides.find_one({"id": slide_id}) 

199 if slide is None: 

200 return http.HTTPStatus.BAD_REQUEST 

201 else: 

202 if not slide["submitted"] and submit: 

203 self.db.slides.update_one({"id": slide_id}, {"$set": {"submit": submit}}) 

204 try: 

205 log = self.db.logs.find_one({"id": slide["log"]}) 

206 headers = self.db.templates.find_one({"id": log["template"]})["headers"] 

207 valid_fields = {} 

208 for key in fields.keys(): 

209 if headers[key][1] == user["role"]: 

210 valid_fields[key] = fields[key] 

211 self.db.slides.update_one({"id": slide_id}, {"$set": {"fields": valid_fields}}) 

212 return True 

213 except: 

214 return False 

215 

216 def get_slide(self, slide_id): 

217 slide = self.db.slides.find_one({"id": slide_id}) 

218 if slide is None: 

219 return http.HTTPStatus.BAD_REQUEST 

220 else: 

221 return slide 

222 

223 def get_slides(self, log_id): 

224 log = self.db.logs.find_one({"id": log_id}) 

225 slide_ids = log["slides"] 

226 slides = [] 

227 for slide_id in slide_ids: 

228 slide = self.db.slides.find_one({"id": slide_id}) 

229 if slide: 

230 slides.append(slide) 

231 return slides 

232 

233 def delete_template(self, template_id): 

234 log_count = self.db.logs.count_documents({"template": template_id}) 

235 if log_count == 0: 

236 self.db.templates.delete_one({"id": template_id}) 

237 return True 

238 else: 

239 return False