-
Notifications
You must be signed in to change notification settings - Fork 405
Expand file tree
/
Copy pathImportUser.py
More file actions
executable file
·186 lines (150 loc) · 5.78 KB
/
ImportUser.py
File metadata and controls
executable file
·186 lines (150 loc) · 5.78 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
#!/usr/bin/env python3
# Contest Management System - http://cms-dev.github.io/
# Copyright © 2012 Bernard Blackham <bernard@largestprime.net>
# Copyright © 2010-2011 Giovanni Mascellani <mascellani@poisson.phc.unipi.it>
# Copyright © 2010-2018 Stefano Maggiolo <s.maggiolo@gmail.com>
# Copyright © 2010-2011 Matteo Boscariol <boscarim@hotmail.com>
# Copyright © 2014-2015 William Di Luigi <williamdiluigi@gmail.com>
# Copyright © 2015 Luca Chiodini <luca@chiodini.org>
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""This script imports a user from disk using one of the available
loaders.
The data parsed by the loader is used to create a new User in the
database.
"""
# We enable monkey patching to make many libraries gevent-friendly
# (for instance, urllib3, used by requests)
import gevent.monkey
gevent.monkey.patch_all() # noqa
import argparse
import logging
import os
import sys
from collections.abc import Callable
from cms import utf8_decoder
from cms.db.session import Session
from cms.db import Participation, SessionGen, User
from cms.db.filecacher import FileCacher
from cmscontrib.importing import ImportDataError, contest_from_db
from cmscontrib.loaders import choose_loader, build_epilog
from cmscontrib.loaders.base_loader import UserLoader
logger = logging.getLogger(__name__)
class UserImporter:
"""This script creates a user
"""
def __init__(self, path: str, contest_id: int, loader_class: type[UserLoader]):
self.file_cacher = FileCacher()
self.contest_id = contest_id
self.loader = loader_class(os.path.abspath(path), self.file_cacher)
def do_import(self):
"""Get the user from the UserLoader and store it."""
# Get the user
user = self.loader.get_user()
if user is None:
return False
# Store
logger.info("Creating user %s on the database.", user.username)
with SessionGen() as session:
try:
contest = contest_from_db(self.contest_id, session)
user = self._user_to_db(session, user)
except ImportDataError as e:
logger.error(str(e))
logger.info("Error while importing, no changes were made.")
return False
if contest is not None:
logger.info("Creating participation of user %s in contest %s.",
user.username, contest.name)
session.add(Participation(user=user, contest=contest,
group=contest.main_group))
session.commit()
user_id = user.id
logger.info("Import finished (new user id: %s).", user_id)
return True
def do_import_all(
self, base_path: str, get_loader: Callable[[str], type[UserLoader]]
):
"""Get the participation list from the ContestLoader and then
try to import the corresponding users.
"""
_, _, participations = self.loader.get_contest()
for p in participations:
user_path = os.path.join(base_path, p["username"])
importer = UserImporter(
path=user_path,
contest_id=self.contest_id,
loader_class=get_loader(user_path)
)
importer.do_import()
return True
@staticmethod
def _user_to_db(session: Session, user: User) -> User:
"""Add the user to the DB
Return the user again, or raise in case a user with the same username
was already present in the DB.
"""
old_user: User | None = (
session.query(User).filter(User.username == user.username).first()
)
if old_user is not None:
raise ImportDataError(
"User \"%s\" already exists." % user.username)
session.add(user)
return user
def main():
"""Parse arguments and launch process.
"""
parser = argparse.ArgumentParser(
description="Import a user to the database.",
epilog=build_epilog(),
formatter_class=argparse.RawDescriptionHelpFormatter
)
parser.add_argument(
"-L", "--loader",
action="store", type=utf8_decoder,
default=None,
help="use the specified loader (default: autodetect)"
)
parser.add_argument(
"target",
action="store", type=utf8_decoder, nargs="?",
default=os.getcwd(),
help="target file/directory from where to import user(s)"
)
parser.add_argument(
"-A", "--all",
action="store_true",
help="try to import all users inside target"
)
parser.add_argument(
"-c", "--contest-id",
action="store", type=int,
help="id of the contest the users will be attached to"
)
args = parser.parse_args()
def get_loader(path):
return choose_loader(args.loader, path, parser.error)
importer = UserImporter(
path=args.target,
contest_id=args.contest_id,
loader_class=get_loader(args.target)
)
if args.all:
success = importer.do_import_all(args.target, get_loader)
else:
success = importer.do_import()
return 0 if success is True else 1
if __name__ == "__main__":
sys.exit(main())