From c3541af9848fd7e2490ce363396a2499ec82ad86 Mon Sep 17 00:00:00 2001 From: madtibo Date: Thu, 16 Aug 2018 13:02:28 +0200 Subject: [PATCH 01/12] add a '-j' switch to be able to load the loaded data as jsonb field --- load_into_pg.py | 42 +++++++++++++++++++++++++++-------------- sql/Badges_pre.sql | 3 ++- sql/Comments_pre.sql | 3 ++- sql/PostHistory_pre.sql | 3 ++- sql/PostLinks_pre.sql | 7 ++++--- sql/Posts_pre.sql | 3 ++- sql/Tags_pre.sql | 7 ++++--- sql/Users_pre.sql | 3 ++- sql/Votes_pre.sql | 3 ++- 9 files changed, 48 insertions(+), 26 deletions(-) diff --git a/load_into_pg.py b/load_into_pg.py index c65e854..33be75c 100755 --- a/load_into_pg.py +++ b/load_into_pg.py @@ -5,6 +5,7 @@ import psycopg2 as pg import row_processor as Processor import six +import json # Special rules needed for certain tables (esp. for old database dumps) specialRules = { @@ -15,17 +16,17 @@ def _makeDefValues(keys): """Returns a dictionary containing None for all keys.""" return dict(( (k, None) for k in keys )) -def _createMogrificationTemplate(table, keys): +def _createMogrificationTemplate(table, keys, insertJson): """Return the template string for mogrification for the given keys.""" - return ( '(' + - ', '.join( [ '%(' + k + ')s' if (table, k) not in specialRules else specialRules[table, k] - for k in keys - ] - ) + - ')' - ) - -def _createCmdTuple(cursor, keys, templ, attribs): + table_keys = ', '.join( [ '%(' + k + ')s' if (table, k) not in specialRules + else specialRules[table, k] + for k in keys ]) + if insertJson: + return ('(' + table_keys + ', %(jsonfield)s' + ')') + else: + return ('(' + table_keys + ')') + +def _createCmdTuple(cursor, keys, templ, attribs, insertJson): """Use the cursor to mogrify a tuple of data. The passed data in `attribs` is augmented with default data (NULLs) and the order of data in the tuple is the same as in the list of `keys`. The @@ -34,12 +35,20 @@ def _createCmdTuple(cursor, keys, templ, attribs): """ defs = _makeDefValues(keys) defs.update(attribs) + + if insertJson: + dict_attribs = { } + for name, value in attribs.items(): + dict_attribs[name] = value + defs['jsonfield'] = json.dumps(dict_attribs) + + values_to_insert = cursor.mogrify(templ, defs) return cursor.mogrify(templ, defs) -def handleTable(table, keys, dbname, mbDbFile, mbHost, mbPort, mbUsername, mbPassword): +def handleTable(table, keys, insertJson, dbname, mbDbFile, mbHost, mbPort, mbUsername, mbPassword): """Handle the table including the post/pre processing.""" dbFile = mbDbFile if mbDbFile is not None else table + '.xml' - tmpl = _createMogrificationTemplate(table, keys) + tmpl = _createMogrificationTemplate(table, keys, insertJson) start_time = time.time() try: @@ -82,7 +91,7 @@ def handleTable(table, keys, dbname, mbDbFile, mbHost, mbPort, mbUsername, mbPas six.print_('Processing data ...') for rows in Processor.batch(Processor.parse(xml), 500): valuesStr = ',\n'.join( - [ _createCmdTuple(cur, keys, tmpl, row_attribs).decode('utf-8') + [ _createCmdTuple(cur, keys, tmpl, row_attribs, insertJson).decode('utf-8') for row_attribs in rows ] ) @@ -159,6 +168,11 @@ def handleTable(table, keys, dbname, mbDbFile, mbHost, mbPort, mbUsername, mbPas , default = False ) +parser.add_argument( '-j', '--insert-json' + , help = 'Insert raw data as JSON.' + , action = 'store_true' + , default = False + ) args = parser.parse_args() table = args.table @@ -279,7 +293,7 @@ def handleTable(table, keys, dbname, mbDbFile, mbHost, mbPort, mbUsername, mbPas choice = input('This will drop the {} table. Are you sure [y/n]?'.format(table)) if len(choice) > 0 and choice[0].lower() == 'y': - handleTable(table, keys, args.dbname, args.file, args.host, args.port, args.username, args.password) + handleTable(table, keys, args.insert_json, args.dbname, args.file, args.host, args.port, args.username, args.password) else: six.print_("Cancelled.") diff --git a/sql/Badges_pre.sql b/sql/Badges_pre.sql index 98a2b34..65944d9 100644 --- a/sql/Badges_pre.sql +++ b/sql/Badges_pre.sql @@ -3,5 +3,6 @@ CREATE TABLE Badges ( Id int PRIMARY KEY , UserId int not NULL , Name text not NULL , - Date timestamp not NULL + Date timestamp not NULL , + jsonfield jsonb ); diff --git a/sql/Comments_pre.sql b/sql/Comments_pre.sql index 6942df6..43f166c 100644 --- a/sql/Comments_pre.sql +++ b/sql/Comments_pre.sql @@ -5,5 +5,6 @@ CREATE TABLE Comments ( Score int not NULL , Text text , CreationDate timestamp not NULL , - UserId int + UserId int , + jsonfield jsonb ); diff --git a/sql/PostHistory_pre.sql b/sql/PostHistory_pre.sql index 24684d1..361dd3d 100644 --- a/sql/PostHistory_pre.sql +++ b/sql/PostHistory_pre.sql @@ -6,5 +6,6 @@ CREATE TABLE PostHistory ( RevisionGUID text , CreationDate timestamp not NULL , UserId int , - PostText text + PostText text , + jsonfield jsonb ); diff --git a/sql/PostLinks_pre.sql b/sql/PostLinks_pre.sql index aaa258c..e4bf700 100644 --- a/sql/PostLinks_pre.sql +++ b/sql/PostLinks_pre.sql @@ -2,7 +2,8 @@ DROP TABLE IF EXISTS PostLinks CASCADE; CREATE TABLE PostLinks ( Id int PRIMARY KEY , CreationDate timestamp not NUll , - PostId int not NULL , - RelatedPostId int not NULL , - LinkTypeId int not Null + PostId int , -- not NULL , + RelatedPostId int , -- not NULL , + LinkTypeId int not Null , + jsonfield jsonb ); diff --git a/sql/Posts_pre.sql b/sql/Posts_pre.sql index 60f7239..ed4d75e 100644 --- a/sql/Posts_pre.sql +++ b/sql/Posts_pre.sql @@ -19,6 +19,7 @@ CREATE TABLE Posts ( CommentCount int , FavoriteCount int , ClosedDate timestamp , - CommunityOwnedDate timestamp + CommunityOwnedDate timestamp , + jsonfield jsonb ); diff --git a/sql/Tags_pre.sql b/sql/Tags_pre.sql index 26979fe..24dd050 100644 --- a/sql/Tags_pre.sql +++ b/sql/Tags_pre.sql @@ -2,7 +2,8 @@ DROP TABLE IF EXISTS Tags CASCADE; CREATE TABLE Tags ( Id int PRIMARY KEY , TagName text not NULL , - Count int, - ExcerptPostId int, - WikiPostId int + Count int , + ExcerptPostId int , + WikiPostId int , + jsonfield jsonb ); diff --git a/sql/Users_pre.sql b/sql/Users_pre.sql index 4246be3..ad188cf 100644 --- a/sql/Users_pre.sql +++ b/sql/Users_pre.sql @@ -13,6 +13,7 @@ CREATE TABLE Users ( DownVotes int not NULL , ProfileImageUrl text , Age int , - AccountId int -- NULL accountId == deleted account? + AccountId int , -- NULL accountId == deleted account? + jsonfield jsonb ); diff --git a/sql/Votes_pre.sql b/sql/Votes_pre.sql index 2a9b5ff..29aebe0 100644 --- a/sql/Votes_pre.sql +++ b/sql/Votes_pre.sql @@ -5,6 +5,7 @@ CREATE TABLE Votes ( VoteTypeId int not NULL , UserId int , CreationDate timestamp not NULL , - BountyAmount int + BountyAmount int , + jsonfield jsonb ); From 47759e7c1e7706cdf863b5d5c1888e72409fdb22 Mon Sep 17 00:00:00 2001 From: madtibo Date: Thu, 16 Aug 2018 17:43:49 +0200 Subject: [PATCH 02/12] revert to post columns as not null --- sql/PostLinks_pre.sql | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/PostLinks_pre.sql b/sql/PostLinks_pre.sql index e4bf700..3793522 100644 --- a/sql/PostLinks_pre.sql +++ b/sql/PostLinks_pre.sql @@ -2,8 +2,8 @@ DROP TABLE IF EXISTS PostLinks CASCADE; CREATE TABLE PostLinks ( Id int PRIMARY KEY , CreationDate timestamp not NUll , - PostId int , -- not NULL , - RelatedPostId int , -- not NULL , + PostId int not NULL , + RelatedPostId int not NULL , LinkTypeId int not Null , jsonfield jsonb ); From cbb39fbfbbb521f2b5070b43b3dbeafd799f5263 Mon Sep 17 00:00:00 2001 From: Thibaut Date: Wed, 23 Jan 2019 13:54:22 +0100 Subject: [PATCH 03/12] Add foreign key support for users id and posts id (#8) * add foreign key support for users id and posts id using the "--foreign-keys" switch WARNING: when using the foreign keys option, some entries in votes and postlinks might be updated to enforce data integrity * Do not force constraint validation by setting them as 'not valid' * log table name --- load_into_pg.py | 251 ++++++++++++++++++++++------------------- sql/Badges_fk.sql | 1 + sql/Comments_fk.sql | 2 + sql/Comments_post.sql | 2 +- sql/PostHistory_fk.sql | 2 + sql/PostLinks_fk.sql | 13 +++ sql/Posts_fk.sql | 3 + sql/Tags_fk.sql | 2 + sql/Users_fk.sql | 2 + sql/Votes_fk.sql | 10 ++ 10 files changed, 170 insertions(+), 118 deletions(-) create mode 100644 sql/Badges_fk.sql create mode 100644 sql/Comments_fk.sql create mode 100644 sql/PostHistory_fk.sql create mode 100644 sql/PostLinks_fk.sql create mode 100644 sql/Posts_fk.sql create mode 100644 sql/Tags_fk.sql create mode 100644 sql/Users_fk.sql create mode 100644 sql/Votes_fk.sql diff --git a/load_into_pg.py b/load_into_pg.py index 33be75c..66b651d 100755 --- a/load_into_pg.py +++ b/load_into_pg.py @@ -30,7 +30,7 @@ def _createCmdTuple(cursor, keys, templ, attribs, insertJson): """Use the cursor to mogrify a tuple of data. The passed data in `attribs` is augmented with default data (NULLs) and the order of data in the tuple is the same as in the list of `keys`. The - `cursor` is used toe mogrify the data and the `templ` is the template used + `cursor` is used to mogrify the data and the `templ` is the template used for the mogrification. """ defs = _makeDefValues(keys) @@ -45,8 +45,114 @@ def _createCmdTuple(cursor, keys, templ, attribs, insertJson): values_to_insert = cursor.mogrify(templ, defs) return cursor.mogrify(templ, defs) -def handleTable(table, keys, insertJson, dbname, mbDbFile, mbHost, mbPort, mbUsername, mbPassword): +def _getTableKeys(table): + """Return an array of the keys for a given table""" + keys = None + if table == 'Users': + keys = [ + 'Id' + , 'Reputation' + , 'CreationDate' + , 'DisplayName' + , 'LastAccessDate' + , 'WebsiteUrl' + , 'Location' + , 'AboutMe' + , 'Views' + , 'UpVotes' + , 'DownVotes' + , 'ProfileImageUrl' + , 'Age' + , 'AccountId' + ] + elif table == 'Badges': + keys = [ + 'Id' + , 'UserId' + , 'Name' + , 'Date' + ] + elif table == 'PostLinks': + keys = [ + 'Id' + , 'CreationDate' + , 'PostId' + , 'RelatedPostId' + , 'LinkTypeId' + ] + elif table == 'Comments': + keys = [ + 'Id' + , 'PostId' + , 'Score' + , 'Text' + , 'CreationDate' + , 'UserId' + ] + elif table == 'Votes': + keys = [ + 'Id' + , 'PostId' + , 'VoteTypeId' + , 'UserId' + , 'CreationDate' + , 'BountyAmount' + ] + elif table == 'Posts': + keys = [ + 'Id' + , 'PostTypeId' + , 'AcceptedAnswerId' + , 'ParentId' + , 'CreationDate' + , 'Score' + , 'ViewCount' + , 'Body' + , 'OwnerUserId' + , 'LastEditorUserId' + , 'LastEditorDisplayName' + , 'LastEditDate' + , 'LastActivityDate' + , 'Title' + , 'Tags' + , 'AnswerCount' + , 'CommentCount' + , 'FavoriteCount' + , 'ClosedDate' + , 'CommunityOwnedDate' + ] + elif table == 'Tags': + keys = [ + 'Id' + , 'TagName' + , 'Count' + , 'ExcerptPostId' + , 'WikiPostId' + ] + elif table == 'PostHistory': + keys = [ + 'Id', + 'PostHistoryTypeId', + 'PostId', + 'RevisionGUID', + 'CreationDate', + 'UserId', + 'Text' + ] + elif table == 'Comments': + keys = [ + 'Id', + 'PostId', + 'Score', + 'Text', + 'CreationDate', + 'UserId', + ] + return keys + +def handleTable(table, insertJson, createFk, dbname, mbDbFile, mbHost, mbPort, mbUsername, mbPassword): """Handle the table including the post/pre processing.""" + keys = _getTableKeys(table) dbFile = mbDbFile if mbDbFile is not None else table + '.xml' tmpl = _createMogrificationTemplate(table, keys, insertJson) start_time = time.time() @@ -54,8 +160,9 @@ def handleTable(table, keys, insertJson, dbname, mbDbFile, mbHost, mbPort, mbUse try: pre = open('./sql/' + table + '_pre.sql').read() post = open('./sql/' + table + '_post.sql').read() + fk = open('./sql/' + table + '_fk.sql').read() except IOError as e: - six.print_("Could not load pre/post sql. Are you running from the correct path?", file=sys.stderr) + six.print_("Could not load pre/post/fk sql. Are you running from the correct path?", file=sys.stderr) sys.exit(-1) dbConnectionParam = "dbname={}".format(dbname) @@ -74,6 +181,7 @@ def handleTable(table, keys, insertJson, dbname, mbDbFile, mbHost, mbPort, mbUse if mbPassword is not None: dbConnectionParam += ' password={}'.format(mbPassword) + try: with pg.connect(dbConnectionParam) as conn: with conn.cursor() as cur: @@ -95,13 +203,12 @@ def handleTable(table, keys, insertJson, dbname, mbDbFile, mbHost, mbPort, mbUse for row_attribs in rows ] ) - if len(valuesStr) > 0: cmd = 'INSERT INTO ' + table + \ ' VALUES\n' + valuesStr + ';' cur.execute(cmd) conn.commit() - six.print_('Table processing took {:.1f} seconds'.format(time.time() - start_time)) + six.print_('Table {0} processing took {1:.1f} seconds'.format(table, time.time() - start_time)) # Post-processing (creation of indexes) start_time = time.time() @@ -110,6 +217,14 @@ def handleTable(table, keys, insertJson, dbname, mbDbFile, mbHost, mbPort, mbUse cur.execute(post) conn.commit() six.print_('Post processing took {} seconds'.format(time.time() - start_time)) + if createFk: + # fk-processing (creation of foreign keys) + start_time = time.time() + six.print_('fk processing ...') + if post != '': + cur.execute(fk) + conn.commit() + six.print_('fk processing took {} seconds'.format(time.time() - start_time)) except IOError as e: six.print_("Could not read from file {}.".format(dbFile), file=sys.stderr) @@ -122,8 +237,6 @@ def handleTable(table, keys, insertJson, dbname, mbDbFile, mbHost, mbPort, mbUse six.print_("Warning from the database.", file=sys.stderr) six.print_("pg.Warning: {0}".format(str(w)), file=sys.stderr) - - ############################################################# parser = argparse.ArgumentParser() @@ -173,116 +286,16 @@ def handleTable(table, keys, insertJson, dbname, mbDbFile, mbHost, mbPort, mbUse , action = 'store_true' , default = False ) -args = parser.parse_args() -table = args.table -keys = None - -if table == 'Users': - keys = [ - 'Id' - , 'Reputation' - , 'CreationDate' - , 'DisplayName' - , 'LastAccessDate' - , 'WebsiteUrl' - , 'Location' - , 'AboutMe' - , 'Views' - , 'UpVotes' - , 'DownVotes' - , 'ProfileImageUrl' - , 'Age' - , 'AccountId' - ] -elif table == 'Badges': - keys = [ - 'Id' - , 'UserId' - , 'Name' - , 'Date' - ] -elif table == 'PostLinks': - keys = [ - 'Id' - , 'CreationDate' - , 'PostId' - , 'RelatedPostId' - , 'LinkTypeId' - ] -elif table == 'Comments': - keys = [ - 'Id' - , 'PostId' - , 'Score' - , 'Text' - , 'CreationDate' - , 'UserId' - ] -elif table == 'Votes': - keys = [ - 'Id' - , 'PostId' - , 'VoteTypeId' - , 'UserId' - , 'CreationDate' - , 'BountyAmount' - ] -elif table == 'Posts': - keys = [ - 'Id' - , 'PostTypeId' - , 'AcceptedAnswerId' - , 'ParentId' - , 'CreationDate' - , 'Score' - , 'ViewCount' - , 'Body' - , 'OwnerUserId' - , 'LastEditorUserId' - , 'LastEditorDisplayName' - , 'LastEditDate' - , 'LastActivityDate' - , 'Title' - , 'Tags' - , 'AnswerCount' - , 'CommentCount' - , 'FavoriteCount' - , 'ClosedDate' - , 'CommunityOwnedDate' - ] +parser.add_argument( '--foreign-keys' + , help = 'Create foreign keys.' + , action = 'store_true' + , default = False + ) - # If the user has not explicitly asked for loading the body, we replace it with NULL - if not args.with_post_body: - specialRules[('Posts', 'Body')] = 'NULL' +args = parser.parse_args() -elif table == 'Tags': - keys = [ - 'Id' - , 'TagName' - , 'Count' - , 'ExcerptPostId' - , 'WikiPostId' - ] -elif table == 'PostHistory': - keys = [ - 'Id', - 'PostHistoryTypeId', - 'PostId', - 'RevisionGUID', - 'CreationDate', - 'UserId', - 'Text' - ] -elif table == 'Comments': - keys = [ - 'Id', - 'PostId', - 'Score', - 'Text', - 'CreationDate', - 'UserId', - ] +table = args.table try: # Python 2/3 compatibility @@ -290,10 +303,14 @@ def handleTable(table, keys, insertJson, dbname, mbDbFile, mbHost, mbPort, mbUse except NameError: pass -choice = input('This will drop the {} table. Are you sure [y/n]?'.format(table)) +if table == 'Posts': + # If the user has not explicitly asked for loading the body, we replace it with NULL + if not args.with_post_body: + specialRules[('Posts', 'Body')] = 'NULL' + +choice = input('This will drop the {} table. Are you sure [y/n]? '.format(table)) if len(choice) > 0 and choice[0].lower() == 'y': - handleTable(table, keys, args.insert_json, args.dbname, args.file, args.host, args.port, args.username, args.password) + handleTable(table, args.insert_json, args.foreign_keys, args.dbname, args.file, args.host, args.port, args.username, args.password) else: six.print_("Cancelled.") - diff --git a/sql/Badges_fk.sql b/sql/Badges_fk.sql new file mode 100644 index 0000000..b5a4e3f --- /dev/null +++ b/sql/Badges_fk.sql @@ -0,0 +1 @@ +ALTER TABLE badges ADD CONSTRAINT fk_badges_userid FOREIGN KEY (userid) REFERENCES users (id); diff --git a/sql/Comments_fk.sql b/sql/Comments_fk.sql new file mode 100644 index 0000000..aea00c9 --- /dev/null +++ b/sql/Comments_fk.sql @@ -0,0 +1,2 @@ +ALTER TABLE Comments ADD CONSTRAINT fk_comments_userid FOREIGN KEY (userid) REFERENCES users (id); +ALTER TABLE Comments ADD CONSTRAINT fk_comments_postid FOREIGN KEY (postid) REFERENCES posts (id); diff --git a/sql/Comments_post.sql b/sql/Comments_post.sql index e19e8b8..2c3e7a2 100644 --- a/sql/Comments_post.sql +++ b/sql/Comments_post.sql @@ -6,4 +6,4 @@ CREATE INDEX cmnts_postid_idx ON Comments USING hash (PostId) CREATE INDEX cmnts_creation_date_idx ON Comments USING btree (CreationDate) WITH (FILLFACTOR = 100); CREATE INDEX cmnts_userid_idx ON Comments USING btree (UserId) - WITH (FILLFACTOR = 100); \ No newline at end of file + WITH (FILLFACTOR = 100); diff --git a/sql/PostHistory_fk.sql b/sql/PostHistory_fk.sql new file mode 100644 index 0000000..91379eb --- /dev/null +++ b/sql/PostHistory_fk.sql @@ -0,0 +1,2 @@ +ALTER TABLE Posthistory ADD CONSTRAINT fk_posthistory_userid FOREIGN KEY (userid) REFERENCES users (id); +ALTER TABLE Posthistory ADD CONSTRAINT fk_posthistory_postid FOREIGN KEY (postid) REFERENCES posts (id); diff --git a/sql/PostLinks_fk.sql b/sql/PostLinks_fk.sql new file mode 100644 index 0000000..5c40cb4 --- /dev/null +++ b/sql/PostLinks_fk.sql @@ -0,0 +1,13 @@ +-- impossible to enforce these constraints, set as 'not valid' to disable +-- initial test. +-- +-- These constaints can be forced running the following queries: +-- ALTER TABLE postlinks ALTER postid DROP NOT NULL; +-- UPDATE postlinks SET postid=NULL WHERE postid NOT IN (SELECT DISTINCT id FROM Posts); +-- ALTER TABLE postlinks VALIDATE CONSTRAINT fk_postlinks_postid; +-- ALTER TABLE postlinks ALTER relatedpostid DROP NOT NULL; +-- UPDATE postlinks SET relatedpostid=NULL WHERE relatedpostid NOT IN (SELECT DISTINCT id FROM Posts); +-- ALTER TABLE postlinks VALIDATE CONSTRAINT fk_postlinks_relatedpostid; +-- +ALTER TABLE Postlinks ADD CONSTRAINT fk_postlinks_postid FOREIGN KEY (postid) REFERENCES posts (id) NOT VALID; +ALTER TABLE Postlinks ADD CONSTRAINT fk_postlinks_relatedpostid FOREIGN KEY (relatedpostid) REFERENCES posts (id) NOT VALID; diff --git a/sql/Posts_fk.sql b/sql/Posts_fk.sql new file mode 100644 index 0000000..65fea37 --- /dev/null +++ b/sql/Posts_fk.sql @@ -0,0 +1,3 @@ +ALTER TABLE Posts ADD CONSTRAINT fk_posts_parentid FOREIGN KEY (parentid) REFERENCES posts (id); +ALTER TABLE Posts ADD CONSTRAINT fk_posts_owneruserid FOREIGN KEY (owneruserid) REFERENCES users (id); +ALTER TABLE Posts ADD CONSTRAINT fk_posts_lasteditoruserid FOREIGN KEY (lasteditoruserid) REFERENCES users (id); diff --git a/sql/Tags_fk.sql b/sql/Tags_fk.sql new file mode 100644 index 0000000..ca4ca40 --- /dev/null +++ b/sql/Tags_fk.sql @@ -0,0 +1,2 @@ +-- dummy query +SELECT 1; diff --git a/sql/Users_fk.sql b/sql/Users_fk.sql new file mode 100644 index 0000000..ca4ca40 --- /dev/null +++ b/sql/Users_fk.sql @@ -0,0 +1,2 @@ +-- dummy query +SELECT 1; diff --git a/sql/Votes_fk.sql b/sql/Votes_fk.sql new file mode 100644 index 0000000..a52a2a1 --- /dev/null +++ b/sql/Votes_fk.sql @@ -0,0 +1,10 @@ +ALTER TABLE Votes ADD CONSTRAINT fk_votes_userid FOREIGN KEY (userid) REFERENCES users (id); +-- impossible to enforce this constraint, set as 'not valid' to disable +-- initial test. +-- +-- This constaint can be forced running the following queries: +-- ALTER TABLE votes ALTER PostId DROP NOT NULL; +-- UPDATE votes SET postid=NULL WHERE postid NOT IN (SELECT DISTINCT id FROM Posts); +-- ALTER TABLE votes VALIDATE CONSTRAINT fk_votes_postid; +-- +ALTER TABLE Votes ADD CONSTRAINT fk_votes_postid FOREIGN KEY (postid) REFERENCES posts (id) NOT VALID; From 69112012c41ac534db929cc606d42434fd58f6a2 Mon Sep 17 00:00:00 2001 From: Utkarsh Upadhyay <502876+musically-ut@users.noreply.github.com> Date: Fri, 25 Jan 2019 12:22:16 +0100 Subject: [PATCH 04/12] Acknowledge @madtibo's contributions. --- README.md | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 826a4ef..26ccbf4 100644 --- a/README.md +++ b/README.md @@ -34,7 +34,7 @@ Schema hints are taken from [a post on Meta.StackExchange](http://meta.stackexch - `psql stackoverflow < ./sql/optional_post.sql` - Again, remember to user the correct database name here, if not `stackoverflow`. -## Caveats and TODOs +## Caveats - It prepares some indexes and views which may not be necessary for your analysis. - The `Body` field in `Posts` table is NOT populated by default. You have to use `--with-post-body` argument to include it. @@ -44,3 +44,8 @@ Schema hints are taken from [a post on Meta.StackExchange](http://meta.stackexch - The `tags.xml` is missing from the data dump. Hence, the `PostTag` and `UserTagQA` tables will be empty after `final_post.sql`. - The `ViewCount` in `Posts` is sometimes equal to an `empty` value. It is replaced by `NULL` in those cases. + + +## Acknowledgement + +[@madtibo](https://github.com/madtibo) made significant contributions by adding `jsonb` and Foreign Key support. From b77bfbcd53fc18708ef1170a05416809d680fdaa Mon Sep 17 00:00:00 2001 From: Thibaut Date: Thu, 2 May 2019 19:53:03 +0200 Subject: [PATCH 05/12] Allows downloading/loading a complete StackExchange project (#9) Using the '-s' switch, download the compressed file from _https://ia800107.us.archive.org/27/items/stackexchange/_, then, uncompress it and load all the files in the database. Add a '-n' switch to move the tables to a given schema. --- README.md | 39 +++++-- load_into_pg.py | 279 ++++++++++++++++++++++++++++++++++------------ sql/Votes_pre.sql | 2 +- 3 files changed, 240 insertions(+), 80 deletions(-) diff --git a/README.md b/README.md index 26ccbf4..36e9b2b 100644 --- a/README.md +++ b/README.md @@ -7,7 +7,8 @@ Schema hints are taken from [a post on Meta.StackExchange](http://meta.stackexch ## Dependencies - [`lxml`](http://lxml.de/installation.html) - - [`psychopg2`](http://initd.org/psycopg/docs/install.html) + - [`psycopg2`](http://initd.org/psycopg/docs/install.html) + - [`libarchive-c`](https://pypi.org/project/libarchive-c/) ## Usage @@ -18,14 +19,14 @@ Schema hints are taken from [a post on Meta.StackExchange](http://meta.stackexch `Badges.xml`, `Votes.xml`, `Posts.xml`, `Users.xml`, `Tags.xml`. - In some old dumps, the cases in the filenames are different. - Execute in the current folder (in parallel, if desired): - - `python load_into_pg.py Badges` - - `python load_into_pg.py Posts` - - `python load_into_pg.py Tags` (not present in earliest dumps) - - `python load_into_pg.py Users` - - `python load_into_pg.py Votes` - - `python load_into_pg.py PostLinks` - - `python load_into_pg.py PostHistory` - - `python load_into_pg.py Comments` + - `python load_into_pg.py -t Badges` + - `python load_into_pg.py -t Posts` + - `python load_into_pg.py -t Tags` (not present in earliest dumps) + - `python load_into_pg.py -t Users` + - `python load_into_pg.py -t Votes` + - `python load_into_pg.py -t PostLinks` + - `python load_into_pg.py -t PostHistory` + - `python load_into_pg.py -t Comments` - Finally, after all the initial tables have been created: - `psql stackoverflow < ./sql/final_post.sql` - If you used a different database name, make sure to use that instead of @@ -34,7 +35,25 @@ Schema hints are taken from [a post on Meta.StackExchange](http://meta.stackexch - `psql stackoverflow < ./sql/optional_post.sql` - Again, remember to user the correct database name here, if not `stackoverflow`. -## Caveats +## Loading a complete stackexchange project + +You can use the script to download a given stackexchange compressed file from +[archive.org](https://ia800107.us.archive.org/27/items/stackexchange/) and load +all the tables at once, using the `-s` switch. + +You will need the `urllib` and `libarchive` modules. + +If you give a schema name using the `-n` switch, all the tables will be moved +to the given schema. This schema will be created in the script. + +To load the _dba.stackexchange.com_ project in the `dba` schema, you would execute: +`./load_into_pg.py -s dba -n dba` + +The paths are not changed in the final scripts `sql/final_post.sql` and +`sql/optional_post.sql`. To run them, first set the _search_path_ to your +schema name: `SET search_path TO ;` + +## Caveats and TODOs - It prepares some indexes and views which may not be necessary for your analysis. - The `Body` field in `Posts` table is NOT populated by default. You have to use `--with-post-body` argument to include it. diff --git a/load_into_pg.py b/load_into_pg.py index 66b651d..d7c3158 100755 --- a/load_into_pg.py +++ b/load_into_pg.py @@ -1,8 +1,10 @@ #!/usr/bin/env python + import sys import time import argparse import psycopg2 as pg +import os import row_processor as Processor import six import json @@ -12,20 +14,71 @@ ('Posts', 'ViewCount'): "NULLIF(%(ViewCount)s, '')::int" } +# part of the file already downloaded +file_part = None + + +def show_progress(block_num, block_size, total_size): + """Display the total size of the file to download and the progress in percent""" + global file_part + if file_part is None: + suffixes = ['B', 'KB', 'MB', 'GB', 'TB'] + suffixIndex = 0 + pp_size = total_size + while pp_size > 1024: + suffixIndex += 1 # Increment the index of the suffix + pp_size = pp_size / 1024.0 # Apply the division + six.print_('Total file size is: {0:.1f} {1}' + .format(pp_size, suffixes[suffixIndex])) + six.print_("0 % of the file downloaded ...\r", end="", flush=True) + file_part = 0 + + downloaded = block_num * block_size + if downloaded < total_size: + percent = 100 * downloaded / total_size + if percent - file_part > 1: + file_part = percent + six.print_("{0} % of the file downloaded ...\r".format(int(percent)), end="", flush=True) + else: + file_part = None + six.print_("") + + +def buildConnectionString(dbname, mbHost, mbPort, mbUsername, mbPassword): + dbConnectionParam = "dbname={}".format(dbname) + + if mbPort is not None: + dbConnectionParam += ' port={}'.format(mbPort) + + if mbHost is not None: + dbConnectionParam += ' host={}'.format(mbHost) + + # TODO Is the escaping done here correct? + if mbUsername is not None: + dbConnectionParam += ' user={}'.format(mbUsername) + + # TODO Is the escaping done here correct? + if mbPassword is not None: + dbConnectionParam += ' password={}'.format(mbPassword) + return dbConnectionParam + + def _makeDefValues(keys): """Returns a dictionary containing None for all keys.""" - return dict(( (k, None) for k in keys )) + return dict(((k, None) for k in keys)) + def _createMogrificationTemplate(table, keys, insertJson): """Return the template string for mogrification for the given keys.""" - table_keys = ', '.join( [ '%(' + k + ')s' if (table, k) not in specialRules - else specialRules[table, k] - for k in keys ]) + table_keys = ', '.join(['%(' + k + ')s' if (table, k) not in specialRules + else specialRules[table, k] + for k in keys]) if insertJson: return ('(' + table_keys + ', %(jsonfield)s' + ')') else: return ('(' + table_keys + ')') + def _createCmdTuple(cursor, keys, templ, attribs, insertJson): """Use the cursor to mogrify a tuple of data. The passed data in `attribs` is augmented with default data (NULLs) and the @@ -37,14 +90,14 @@ def _createCmdTuple(cursor, keys, templ, attribs, insertJson): defs.update(attribs) if insertJson: - dict_attribs = { } + dict_attribs = {} for name, value in attribs.items(): dict_attribs[name] = value defs['jsonfield'] = json.dumps(dict_attribs) - values_to_insert = cursor.mogrify(templ, defs) return cursor.mogrify(templ, defs) + def _getTableKeys(table): """Return an array of the keys for a given table""" keys = None @@ -131,26 +184,27 @@ def _getTableKeys(table): ] elif table == 'PostHistory': keys = [ - 'Id', - 'PostHistoryTypeId', - 'PostId', - 'RevisionGUID', - 'CreationDate', - 'UserId', - 'Text' + 'Id' + , 'PostHistoryTypeId' + , 'PostId' + , 'RevisionGUID' + , 'CreationDate' + , 'UserId' + , 'Text' ] elif table == 'Comments': keys = [ - 'Id', - 'PostId', - 'Score', - 'Text', - 'CreationDate', - 'UserId', + 'Id' + , 'PostId' + , 'Score' + , 'Text' + , 'CreationDate' + , 'UserId' ] return keys -def handleTable(table, insertJson, createFk, dbname, mbDbFile, mbHost, mbPort, mbUsername, mbPassword): + +def handleTable(table, insertJson, createFk, mbDbFile, dbConnectionParam): """Handle the table including the post/pre processing.""" keys = _getTableKeys(table) dbFile = mbDbFile if mbDbFile is not None else table + '.xml' @@ -165,23 +219,6 @@ def handleTable(table, insertJson, createFk, dbname, mbDbFile, mbHost, mbPort, m six.print_("Could not load pre/post/fk sql. Are you running from the correct path?", file=sys.stderr) sys.exit(-1) - dbConnectionParam = "dbname={}".format(dbname) - - if mbPort is not None: - dbConnectionParam += ' port={}'.format(mbPort) - - if mbHost is not None: - dbConnectionParam += ' host={}'.format(mbHost) - - # TODO Is the escaping done here correct? - if mbUsername is not None: - dbConnectionParam += ' user={}'.format(mbUsername) - - # TODO Is the escaping done here correct? - if mbPassword is not None: - dbConnectionParam += ' password={}'.format(mbPassword) - - try: with pg.connect(dbConnectionParam) as conn: with conn.cursor() as cur: @@ -199,16 +236,16 @@ def handleTable(table, insertJson, createFk, dbname, mbDbFile, mbHost, mbPort, m six.print_('Processing data ...') for rows in Processor.batch(Processor.parse(xml), 500): valuesStr = ',\n'.join( - [ _createCmdTuple(cur, keys, tmpl, row_attribs, insertJson).decode('utf-8') - for row_attribs in rows - ] - ) + [_createCmdTuple(cur, keys, tmpl, row_attribs, insertJson).decode('utf-8') + for row_attribs in rows + ] + ) if len(valuesStr) > 0: cmd = 'INSERT INTO ' + table + \ ' VALUES\n' + valuesStr + ';' cur.execute(cmd) conn.commit() - six.print_('Table {0} processing took {1:.1f} seconds'.format(table, time.time() - start_time)) + six.print_('Table \'{0}\' processing took {1:.1f} seconds'.format(table, time.time() - start_time)) # Post-processing (creation of indexes) start_time = time.time() @@ -216,15 +253,15 @@ def handleTable(table, insertJson, createFk, dbname, mbDbFile, mbHost, mbPort, m if post != '': cur.execute(post) conn.commit() - six.print_('Post processing took {} seconds'.format(time.time() - start_time)) + six.print_('Post processing took {0:.1f} seconds'.format(time.time() - start_time)) if createFk: # fk-processing (creation of foreign keys) start_time = time.time() - six.print_('fk processing ...') + six.print_('Foreign Key processing ...') if post != '': cur.execute(fk) conn.commit() - six.print_('fk processing took {} seconds'.format(time.time() - start_time)) + six.print_('Foreign Key processing took {0:.1f} seconds'.format(time.time() - start_time)) except IOError as e: six.print_("Could not read from file {}.".format(dbFile), file=sys.stderr) @@ -237,80 +274,184 @@ def handleTable(table, insertJson, createFk, dbname, mbDbFile, mbHost, mbPort, m six.print_("Warning from the database.", file=sys.stderr) six.print_("pg.Warning: {0}".format(str(w)), file=sys.stderr) + +def moveTableToSchema(table, schemaName, dbConnectionParam): + try: + with pg.connect(dbConnectionParam) as conn: + with conn.cursor() as cur: + # create the schema + cur.execute('CREATE SCHEMA IF NOT EXISTS ' + schemaName + ';') + conn.commit() + # move the table to the right schema + cur.execute('ALTER TABLE ' + table + ' SET SCHEMA ' + schemaName + ';') + conn.commit() + except pg.Error as e: + six.print_("Error in dealing with the database.", file=sys.stderr) + six.print_("pg.Error ({0}): {1}".format(e.pgcode, e.pgerror), file=sys.stderr) + six.print_(str(e), file=sys.stderr) + except pg.Warning as w: + six.print_("Warning from the database.", file=sys.stderr) + six.print_("pg.Warning: {0}".format(str(w)), file=sys.stderr) + ############################################################# + parser = argparse.ArgumentParser() -parser.add_argument( 'table' +parser.add_argument('-t', '--table' , help = 'The table to work on.' , choices = ['Users', 'Badges', 'Posts', 'Tags', 'Votes', 'PostLinks', 'PostHistory', 'Comments'] + , default = None ) -parser.add_argument( '-d', '--dbname' +parser.add_argument('-d', '--dbname' , help = 'Name of database to create the table in. The database must exist.' , default = 'stackoverflow' ) -parser.add_argument( '-f', '--file' +parser.add_argument('-f', '--file' , help = 'Name of the file to extract data from.' , default = None ) -parser.add_argument( '-u', '--username' +parser.add_argument('-s', '--so-project' + , help = 'StackExchange project to load.' + , default = None + ) + +parser.add_argument('--archive-url' + , help = 'URL of the archive directory to retrieve.' + , default = 'https://ia800107.us.archive.org/27/items/stackexchange' + ) + +parser.add_argument('-k', '--keep-archive' + , help = 'Will preserve the downloaded archive instead of deleting it.' + , action = 'store_true' + , default = False + ) + +parser.add_argument('-u', '--username' , help = 'Username for the database.' , default = None ) -parser.add_argument( '-p', '--password' +parser.add_argument('-p', '--password' , help = 'Password for the database.' , default = None ) -parser.add_argument( '-P', '--port' +parser.add_argument('-P', '--port' , help = 'Port to connect with the database on.' , default = None ) -parser.add_argument( '-H', '--host' +parser.add_argument('-H', '--host' , help = 'Hostname for the database.' , default = None ) -parser.add_argument( '--with-post-body' - , help = 'Import the posts with the post body. Only used if importing Posts.xml' - , action = 'store_true' +parser.add_argument('--with-post-body' + , help = 'Import the posts with the post body. Only used if importing Posts.xml' + , action = 'store_true' , default = False ) -parser.add_argument( '-j', '--insert-json' +parser.add_argument('-j', '--insert-json' , help = 'Insert raw data as JSON.' - , action = 'store_true' + , action = 'store_true' , default = False ) -parser.add_argument( '--foreign-keys' +parser.add_argument('-n', '--schema-name' + , help = 'Use specific schema.' + , default = 'public' + ) + +parser.add_argument('--foreign-keys' , help = 'Create foreign keys.' - , action = 'store_true' + , action = 'store_true' , default = False ) args = parser.parse_args() -table = args.table - try: # Python 2/3 compatibility input = raw_input except NameError: pass +dbConnectionParam = buildConnectionString(args.dbname, args.host, args.port, args.username, args.password) + +# load given file in table +if args.file and args.table: + table = args.table -if table == 'Posts': - # If the user has not explicitly asked for loading the body, we replace it with NULL - if not args.with_post_body: - specialRules[('Posts', 'Body')] = 'NULL' + if table == 'Posts': + # If the user has not explicitly asked for loading the body, we replace it with NULL + if not args.with_post_body: + specialRules[('Posts', 'Body')] = 'NULL' + + choice = input('This will drop the {} table. Are you sure [y/n]?'.format(table)) + if len(choice) > 0 and choice[0].lower() == 'y': + handleTable(table, args.insert_json, args.foreign_keys, args.file, dbConnectionParam) + else: + six.print_("Cancelled.") + if args.schema_name != 'public': + moveTableToSchema(table, args.schema_name, dbConnectionParam) + exit(0) + +# load a project +elif args.so_project: + import libarchive + import tempfile + + filepath = None + temp_dir = None + if args.file: + filepath = args.file + url = filepath + else: + # download the 7z archive in tempdir + file_name = args.so_project + '.stackexchange.com.7z' + url = '{0}/{1}'.format(args.archive_url, file_name) + temp_dir = tempfile.mkdtemp(prefix='so_') + filepath = os.path.join(temp_dir, file_name) + six.print_('Downloading the archive in {0}'.format(filepath)) + six.print_('please be patient ...') + try: + six.moves.urllib.request.urlretrieve(url, filepath, show_progress) + except Exception as e: + six.print_('Error: impossible to download the {0} archive ({1})'.format(url, e)) + exit(1) + + try: + libarchive.extract_file(filepath) + except Exception as e: + six.print_('Error: impossible to extract the {0} archive ({1})'.format(url, e)) + exit(1) + + tables = ['Tags', 'Users', 'Badges', 'Posts', 'Comments', + 'Votes', 'PostLinks', 'PostHistory'] + + for table in tables: + six.print_('Load {0}.xml file'.format(table)) + handleTable(table, args.insert_json, args.foreign_keys, None, dbConnectionParam) + # remove file + os.remove(table + '.xml') + + if not args.keep_archive: + os.remove(filepath) + if temp_dir: + # remove the archive and the temporary directory + os.rmdir(temp_dir) + else: + six.print_("Archive '{0}' deleted".format(filepath)) + + if args.schema_name != 'public': + for table in tables: + moveTableToSchema(table, args.schema_name, dbConnectionParam) + exit(0) -choice = input('This will drop the {} table. Are you sure [y/n]? '.format(table)) -if len(choice) > 0 and choice[0].lower() == 'y': - handleTable(table, args.insert_json, args.foreign_keys, args.dbname, args.file, args.host, args.port, args.username, args.password) else: - six.print_("Cancelled.") + six.print_("Error: you must either use '-f' and '-t' arguments or the '-s' argument.") + parser.print_help() diff --git a/sql/Votes_pre.sql b/sql/Votes_pre.sql index 29aebe0..3ed0b53 100644 --- a/sql/Votes_pre.sql +++ b/sql/Votes_pre.sql @@ -1,7 +1,7 @@ DROP TABLE IF EXISTS Votes CASCADE; CREATE TABLE Votes ( Id int PRIMARY KEY , - PostId int not NULL , + PostId int , -- not NULL , VoteTypeId int not NULL , UserId int , CreationDate timestamp not NULL , From 491e552cfcd735d2f4483482fec6933e85f07e30 Mon Sep 17 00:00:00 2001 From: Youri Wijnands Date: Tue, 11 Aug 2020 13:36:35 +0200 Subject: [PATCH 06/12] Update row_processor.py (#11) --- row_processor.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/row_processor.py b/row_processor.py index 7aeb09c..195d2de 100644 --- a/row_processor.py +++ b/row_processor.py @@ -32,5 +32,7 @@ def batch(iterable, size): sourceiter = iter(iterable) while True: batchiter = islice(sourceiter, size) - yield chain([six.next(batchiter)], batchiter) - + try: + yield chain([six.next(batchiter)], batchiter) + except StopIteration: + return From 6ae297f048ac54d55c587d03a9798008a59ae868 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C3=89tienne=20BERSAC?= <542613+bersace@users.noreply.github.com> Date: Sun, 30 Aug 2020 20:04:13 +0200 Subject: [PATCH 07/12] 2020 Update (#12) * Use psycopg2-binary * Update lxml to 4.5.2 Allows to use wheel. * Avoid confusion between libarchive and libarchive-c * Install libarchive-c for downloader * Drop distribute This project is merged with setuptools. * Review README Document a quickstart setup first and then describe advanced usage for custom tables. * Change the example to use a different DB name. Also, removed mention of unnecessary dependency which was installed for Python 2 support. * Update requirements.txt Remove wsgiref which was required for Python 2 support. Co-authored-by: Utkarsh Upadhyay <502876+musically-ut@users.noreply.github.com> --- README.md | 103 +++++++++++++++++++++++++++-------------------- requirements.txt | 7 ++-- 2 files changed, 63 insertions(+), 47 deletions(-) diff --git a/README.md b/README.md index 36e9b2b..f9026c4 100644 --- a/README.md +++ b/README.md @@ -1,58 +1,74 @@ # StackOverflow data to postgres -This is a quick script to move the Stackoverflow data from the [StackExchange data dump (Sept '14)](https://archive.org/details/stackexchange) to a Postgres SQL database. - -Schema hints are taken from [a post on Meta.StackExchange](http://meta.stackexchange.com/questions/2677/database-schema-documentation-for-the-public-data-dump-and-sede) and from [StackExchange Data Explorer](http://data.stackexchange.com). - -## Dependencies - - - [`lxml`](http://lxml.de/installation.html) - - [`psycopg2`](http://initd.org/psycopg/docs/install.html) - - [`libarchive-c`](https://pypi.org/project/libarchive-c/) - -## Usage - - - Create the database `stackoverflow` in your database: `CREATE DATABASE stackoverflow;` - - You can use a custom database name as well. Make sure to explicitly give - it while executing the script later. - - Move the following files to the folder from where the program is executed: - `Badges.xml`, `Votes.xml`, `Posts.xml`, `Users.xml`, `Tags.xml`. - - In some old dumps, the cases in the filenames are different. - - Execute in the current folder (in parallel, if desired): - - `python load_into_pg.py -t Badges` - - `python load_into_pg.py -t Posts` - - `python load_into_pg.py -t Tags` (not present in earliest dumps) - - `python load_into_pg.py -t Users` - - `python load_into_pg.py -t Votes` - - `python load_into_pg.py -t PostLinks` - - `python load_into_pg.py -t PostHistory` - - `python load_into_pg.py -t Comments` - - Finally, after all the initial tables have been created: - - `psql stackoverflow < ./sql/final_post.sql` - - If you used a different database name, make sure to use that instead of - `stackoverflow` while executing this step. - - For some additional indexes and tables, you can also execute the the following; - - `psql stackoverflow < ./sql/optional_post.sql` - - Again, remember to user the correct database name here, if not `stackoverflow`. - -## Loading a complete stackexchange project - -You can use the script to download a given stackexchange compressed file from +This is a quick script to move the Stackoverflow data from the [StackExchange +data dump (Sept '14)](https://archive.org/details/stackexchange) to a Postgres +SQL database. + +Schema hints are taken from [a post on +Meta.StackExchange](http://meta.stackexchange.com/questions/2677/database-schema-documentation-for-the-public-data-dump-and-sede) +and from [StackExchange Data Explorer](http://data.stackexchange.com). + +## Quickstart + +Install requirements, create a `stackoverflow` database, and use +`load_into_pg.py` script: + +``` console +$ pip install -r requirements.txt +... +Successfully installed argparse-1.2.1 libarchive-c-2.9 lxml-4.5.2 psycopg2-binary-2.8.4 six-1.10.0 +$ createdb beerSO +$ python load_into_pg.py -s beer -d beerSO +``` + +This will download compressed files from [archive.org](https://ia800107.us.archive.org/27/items/stackexchange/) and load -all the tables at once, using the `-s` switch. +all the tables at once. + + +## Advanced Usage + +You can use a custom database name as well. Make sure to explicitly give it +while executing the script later. + +Each table data is archived in an XML file. Available tables varies accross +history. `load_into_pg.py` knows how to handle the following tables: -You will need the `urllib` and `libarchive` modules. +- `Badges`. +- `Posts`. +- `Tags` (not present in earliest dumps). +- `Users`. +- `Votes`. +- `PostLinks`. +- `PostHistory`. +- `Comments`. + +You can download manually the files to the folder from where the program is +executed: `Badges.xml`, `Votes.xml`, `Posts.xml`, `Users.xml`, `Tags.xml`. In +some old dumps, the cases in the filenames are different. + +Then load each file with e.g. `python load_into_pg.py -t Badges`. + +After all the initial tables have been created: + +``` console +$ psql beerSO < ./sql/final_post.sql +``` + +For some additional indexes and tables, you can also execute the the following; + +``` console +$ psql beerSO < ./sql/optional_post.sql +``` If you give a schema name using the `-n` switch, all the tables will be moved to the given schema. This schema will be created in the script. -To load the _dba.stackexchange.com_ project in the `dba` schema, you would execute: -`./load_into_pg.py -s dba -n dba` - The paths are not changed in the final scripts `sql/final_post.sql` and `sql/optional_post.sql`. To run them, first set the _search_path_ to your schema name: `SET search_path TO ;` + ## Caveats and TODOs - It prepares some indexes and views which may not be necessary for your analysis. @@ -68,3 +84,4 @@ schema name: `SET search_path TO ;` ## Acknowledgement [@madtibo](https://github.com/madtibo) made significant contributions by adding `jsonb` and Foreign Key support. +[@bersace](https://github.com/bersace) brought the dependencies and the `README.md` instructions into 2020. diff --git a/requirements.txt b/requirements.txt index e1c997c..10665d2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,5 @@ argparse==1.2.1 -distribute==0.6.24 -lxml==3.4.1 -psycopg2==2.5.4 -wsgiref==0.1.2 +libarchive-c==2.9 +lxml==4.5.2 +psycopg2-binary==2.8.4 six==1.10.0 From 542ed6815be04cdf969e9e6d7b1bb76e9772f327 Mon Sep 17 00:00:00 2001 From: Utkarsh Upadhyay <502876+musically-ut@users.noreply.github.com> Date: Mon, 31 Aug 2020 11:46:49 +0200 Subject: [PATCH 08/12] Blackify the code. --- load_into_pg.py | 486 ++++++++++++++++++++++++----------------------- row_processor.py | 10 +- 2 files changed, 259 insertions(+), 237 deletions(-) diff --git a/load_into_pg.py b/load_into_pg.py index d7c3158..32c05aa 100755 --- a/load_into_pg.py +++ b/load_into_pg.py @@ -10,9 +10,7 @@ import json # Special rules needed for certain tables (esp. for old database dumps) -specialRules = { - ('Posts', 'ViewCount'): "NULLIF(%(ViewCount)s, '')::int" -} +specialRules = {("Posts", "ViewCount"): "NULLIF(%(ViewCount)s, '')::int"} # part of the file already downloaded file_part = None @@ -22,14 +20,15 @@ def show_progress(block_num, block_size, total_size): """Display the total size of the file to download and the progress in percent""" global file_part if file_part is None: - suffixes = ['B', 'KB', 'MB', 'GB', 'TB'] + suffixes = ["B", "KB", "MB", "GB", "TB"] suffixIndex = 0 pp_size = total_size while pp_size > 1024: suffixIndex += 1 # Increment the index of the suffix pp_size = pp_size / 1024.0 # Apply the division - six.print_('Total file size is: {0:.1f} {1}' - .format(pp_size, suffixes[suffixIndex])) + six.print_( + "Total file size is: {0:.1f} {1}".format(pp_size, suffixes[suffixIndex]) + ) six.print_("0 % of the file downloaded ...\r", end="", flush=True) file_part = 0 @@ -38,7 +37,11 @@ def show_progress(block_num, block_size, total_size): percent = 100 * downloaded / total_size if percent - file_part > 1: file_part = percent - six.print_("{0} % of the file downloaded ...\r".format(int(percent)), end="", flush=True) + six.print_( + "{0} % of the file downloaded ...\r".format(int(percent)), + end="", + flush=True, + ) else: file_part = None six.print_("") @@ -48,18 +51,18 @@ def buildConnectionString(dbname, mbHost, mbPort, mbUsername, mbPassword): dbConnectionParam = "dbname={}".format(dbname) if mbPort is not None: - dbConnectionParam += ' port={}'.format(mbPort) + dbConnectionParam += " port={}".format(mbPort) if mbHost is not None: - dbConnectionParam += ' host={}'.format(mbHost) + dbConnectionParam += " host={}".format(mbHost) # TODO Is the escaping done here correct? if mbUsername is not None: - dbConnectionParam += ' user={}'.format(mbUsername) + dbConnectionParam += " user={}".format(mbUsername) # TODO Is the escaping done here correct? if mbPassword is not None: - dbConnectionParam += ' password={}'.format(mbPassword) + dbConnectionParam += " password={}".format(mbPassword) return dbConnectionParam @@ -70,13 +73,18 @@ def _makeDefValues(keys): def _createMogrificationTemplate(table, keys, insertJson): """Return the template string for mogrification for the given keys.""" - table_keys = ', '.join(['%(' + k + ')s' if (table, k) not in specialRules - else specialRules[table, k] - for k in keys]) + table_keys = ", ".join( + [ + "%(" + k + ")s" + if (table, k) not in specialRules + else specialRules[table, k] + for k in keys + ] + ) if insertJson: - return ('(' + table_keys + ', %(jsonfield)s' + ')') + return "(" + table_keys + ", %(jsonfield)s" + ")" else: - return ('(' + table_keys + ')') + return "(" + table_keys + ")" def _createCmdTuple(cursor, keys, templ, attribs, insertJson): @@ -93,7 +101,7 @@ def _createCmdTuple(cursor, keys, templ, attribs, insertJson): dict_attribs = {} for name, value in attribs.items(): dict_attribs[name] = value - defs['jsonfield'] = json.dumps(dict_attribs) + defs["jsonfield"] = json.dumps(dict_attribs) return cursor.mogrify(templ, defs) @@ -101,170 +109,161 @@ def _createCmdTuple(cursor, keys, templ, attribs, insertJson): def _getTableKeys(table): """Return an array of the keys for a given table""" keys = None - if table == 'Users': - keys = [ - 'Id' - , 'Reputation' - , 'CreationDate' - , 'DisplayName' - , 'LastAccessDate' - , 'WebsiteUrl' - , 'Location' - , 'AboutMe' - , 'Views' - , 'UpVotes' - , 'DownVotes' - , 'ProfileImageUrl' - , 'Age' - , 'AccountId' - ] - elif table == 'Badges': - keys = [ - 'Id' - , 'UserId' - , 'Name' - , 'Date' - ] - elif table == 'PostLinks': - keys = [ - 'Id' - , 'CreationDate' - , 'PostId' - , 'RelatedPostId' - , 'LinkTypeId' - ] - elif table == 'Comments': - keys = [ - 'Id' - , 'PostId' - , 'Score' - , 'Text' - , 'CreationDate' - , 'UserId' - ] - elif table == 'Votes': + if table == "Users": keys = [ - 'Id' - , 'PostId' - , 'VoteTypeId' - , 'UserId' - , 'CreationDate' - , 'BountyAmount' + "Id", + "Reputation", + "CreationDate", + "DisplayName", + "LastAccessDate", + "WebsiteUrl", + "Location", + "AboutMe", + "Views", + "UpVotes", + "DownVotes", + "ProfileImageUrl", + "Age", + "AccountId", ] - elif table == 'Posts': + elif table == "Badges": + keys = ["Id", "UserId", "Name", "Date"] + elif table == "PostLinks": + keys = ["Id", "CreationDate", "PostId", "RelatedPostId", "LinkTypeId"] + elif table == "Comments": + keys = ["Id", "PostId", "Score", "Text", "CreationDate", "UserId"] + elif table == "Votes": + keys = ["Id", "PostId", "VoteTypeId", "UserId", "CreationDate", "BountyAmount"] + elif table == "Posts": keys = [ - 'Id' - , 'PostTypeId' - , 'AcceptedAnswerId' - , 'ParentId' - , 'CreationDate' - , 'Score' - , 'ViewCount' - , 'Body' - , 'OwnerUserId' - , 'LastEditorUserId' - , 'LastEditorDisplayName' - , 'LastEditDate' - , 'LastActivityDate' - , 'Title' - , 'Tags' - , 'AnswerCount' - , 'CommentCount' - , 'FavoriteCount' - , 'ClosedDate' - , 'CommunityOwnedDate' + "Id", + "PostTypeId", + "AcceptedAnswerId", + "ParentId", + "CreationDate", + "Score", + "ViewCount", + "Body", + "OwnerUserId", + "LastEditorUserId", + "LastEditorDisplayName", + "LastEditDate", + "LastActivityDate", + "Title", + "Tags", + "AnswerCount", + "CommentCount", + "FavoriteCount", + "ClosedDate", + "CommunityOwnedDate", ] - elif table == 'Tags': + elif table == "Tags": + keys = ["Id", "TagName", "Count", "ExcerptPostId", "WikiPostId"] + elif table == "PostHistory": keys = [ - 'Id' - , 'TagName' - , 'Count' - , 'ExcerptPostId' - , 'WikiPostId' - ] - elif table == 'PostHistory': - keys = [ - 'Id' - , 'PostHistoryTypeId' - , 'PostId' - , 'RevisionGUID' - , 'CreationDate' - , 'UserId' - , 'Text' - ] - elif table == 'Comments': - keys = [ - 'Id' - , 'PostId' - , 'Score' - , 'Text' - , 'CreationDate' - , 'UserId' + "Id", + "PostHistoryTypeId", + "PostId", + "RevisionGUID", + "CreationDate", + "UserId", + "Text", ] + elif table == "Comments": + keys = ["Id", "PostId", "Score", "Text", "CreationDate", "UserId"] return keys def handleTable(table, insertJson, createFk, mbDbFile, dbConnectionParam): """Handle the table including the post/pre processing.""" - keys = _getTableKeys(table) - dbFile = mbDbFile if mbDbFile is not None else table + '.xml' - tmpl = _createMogrificationTemplate(table, keys, insertJson) + keys = _getTableKeys(table) + dbFile = mbDbFile if mbDbFile is not None else table + ".xml" + tmpl = _createMogrificationTemplate(table, keys, insertJson) start_time = time.time() try: - pre = open('./sql/' + table + '_pre.sql').read() - post = open('./sql/' + table + '_post.sql').read() - fk = open('./sql/' + table + '_fk.sql').read() + pre = open("./sql/" + table + "_pre.sql").read() + post = open("./sql/" + table + "_post.sql").read() + fk = open("./sql/" + table + "_fk.sql").read() except IOError as e: - six.print_("Could not load pre/post/fk sql. Are you running from the correct path?", file=sys.stderr) + six.print_( + "Could not load pre/post/fk sql. Are you running from the correct path?", + file=sys.stderr, + ) sys.exit(-1) try: with pg.connect(dbConnectionParam) as conn: with conn.cursor() as cur: try: - with open(dbFile, 'rb') as xml: + with open(dbFile, "rb") as xml: # Pre-processing (dropping/creation of tables) - six.print_('Pre-processing ...') - if pre != '': + six.print_("Pre-processing ...") + if pre != "": cur.execute(pre) conn.commit() - six.print_('Pre-processing took {:.1f} seconds'.format(time.time() - start_time)) + six.print_( + "Pre-processing took {:.1f} seconds".format( + time.time() - start_time + ) + ) # Handle content of the table start_time = time.time() - six.print_('Processing data ...') + six.print_("Processing data ...") for rows in Processor.batch(Processor.parse(xml), 500): - valuesStr = ',\n'.join( - [_createCmdTuple(cur, keys, tmpl, row_attribs, insertJson).decode('utf-8') - for row_attribs in rows - ] + valuesStr = ",\n".join( + [ + _createCmdTuple( + cur, keys, tmpl, row_attribs, insertJson + ).decode("utf-8") + for row_attribs in rows + ] ) if len(valuesStr) > 0: - cmd = 'INSERT INTO ' + table + \ - ' VALUES\n' + valuesStr + ';' + cmd = ( + "INSERT INTO " + + table + + " VALUES\n" + + valuesStr + + ";" + ) cur.execute(cmd) conn.commit() - six.print_('Table \'{0}\' processing took {1:.1f} seconds'.format(table, time.time() - start_time)) + six.print_( + "Table '{0}' processing took {1:.1f} seconds".format( + table, time.time() - start_time + ) + ) # Post-processing (creation of indexes) start_time = time.time() - six.print_('Post processing ...') - if post != '': + six.print_("Post processing ...") + if post != "": cur.execute(post) conn.commit() - six.print_('Post processing took {0:.1f} seconds'.format(time.time() - start_time)) + six.print_( + "Post processing took {0:.1f} seconds".format( + time.time() - start_time + ) + ) if createFk: # fk-processing (creation of foreign keys) start_time = time.time() - six.print_('Foreign Key processing ...') - if post != '': + six.print_("Foreign Key processing ...") + if post != "": cur.execute(fk) conn.commit() - six.print_('Foreign Key processing took {0:.1f} seconds'.format(time.time() - start_time)) + six.print_( + "Foreign Key processing took {0:.1f} seconds".format( + time.time() - start_time + ) + ) except IOError as e: - six.print_("Could not read from file {}.".format(dbFile), file=sys.stderr) + six.print_( + "Could not read from file {}.".format(dbFile), file=sys.stderr + ) six.print_("IOError: {0}".format(e.strerror), file=sys.stderr) except pg.Error as e: six.print_("Error in dealing with the database.", file=sys.stderr) @@ -280,10 +279,10 @@ def moveTableToSchema(table, schemaName, dbConnectionParam): with pg.connect(dbConnectionParam) as conn: with conn.cursor() as cur: # create the schema - cur.execute('CREATE SCHEMA IF NOT EXISTS ' + schemaName + ';') + cur.execute("CREATE SCHEMA IF NOT EXISTS " + schemaName + ";") conn.commit() # move the table to the right schema - cur.execute('ALTER TABLE ' + table + ' SET SCHEMA ' + schemaName + ';') + cur.execute("ALTER TABLE " + table + " SET SCHEMA " + schemaName + ";") conn.commit() except pg.Error as e: six.print_("Error in dealing with the database.", file=sys.stderr) @@ -293,84 +292,89 @@ def moveTableToSchema(table, schemaName, dbConnectionParam): six.print_("Warning from the database.", file=sys.stderr) six.print_("pg.Warning: {0}".format(str(w)), file=sys.stderr) + ############################################################# parser = argparse.ArgumentParser() -parser.add_argument('-t', '--table' - , help = 'The table to work on.' - , choices = ['Users', 'Badges', 'Posts', 'Tags', 'Votes', 'PostLinks', 'PostHistory', 'Comments'] - , default = None - ) - -parser.add_argument('-d', '--dbname' - , help = 'Name of database to create the table in. The database must exist.' - , default = 'stackoverflow' - ) - -parser.add_argument('-f', '--file' - , help = 'Name of the file to extract data from.' - , default = None - ) - -parser.add_argument('-s', '--so-project' - , help = 'StackExchange project to load.' - , default = None - ) - -parser.add_argument('--archive-url' - , help = 'URL of the archive directory to retrieve.' - , default = 'https://ia800107.us.archive.org/27/items/stackexchange' - ) - -parser.add_argument('-k', '--keep-archive' - , help = 'Will preserve the downloaded archive instead of deleting it.' - , action = 'store_true' - , default = False - ) - -parser.add_argument('-u', '--username' - , help = 'Username for the database.' - , default = None - ) - -parser.add_argument('-p', '--password' - , help = 'Password for the database.' - , default = None - ) - -parser.add_argument('-P', '--port' - , help = 'Port to connect with the database on.' - , default = None - ) - -parser.add_argument('-H', '--host' - , help = 'Hostname for the database.' - , default = None - ) - -parser.add_argument('--with-post-body' - , help = 'Import the posts with the post body. Only used if importing Posts.xml' - , action = 'store_true' - , default = False - ) - -parser.add_argument('-j', '--insert-json' - , help = 'Insert raw data as JSON.' - , action = 'store_true' - , default = False - ) - -parser.add_argument('-n', '--schema-name' - , help = 'Use specific schema.' - , default = 'public' - ) - -parser.add_argument('--foreign-keys' - , help = 'Create foreign keys.' - , action = 'store_true' - , default = False - ) +parser.add_argument( + "-t", + "--table", + help="The table to work on.", + choices=[ + "Users", + "Badges", + "Posts", + "Tags", + "Votes", + "PostLinks", + "PostHistory", + "Comments", + ], + default=None, +) + +parser.add_argument( + "-d", + "--dbname", + help="Name of database to create the table in. The database must exist.", + default="stackoverflow", +) + +parser.add_argument( + "-f", "--file", help="Name of the file to extract data from.", default=None +) + +parser.add_argument( + "-s", "--so-project", help="StackExchange project to load.", default=None +) + +parser.add_argument( + "--archive-url", + help="URL of the archive directory to retrieve.", + default="https://ia800107.us.archive.org/27/items/stackexchange", +) + +parser.add_argument( + "-k", + "--keep-archive", + help="Will preserve the downloaded archive instead of deleting it.", + action="store_true", + default=False, +) + +parser.add_argument("-u", "--username", help="Username for the database.", default=None) + +parser.add_argument("-p", "--password", help="Password for the database.", default=None) + +parser.add_argument( + "-P", "--port", help="Port to connect with the database on.", default=None +) + +parser.add_argument("-H", "--host", help="Hostname for the database.", default=None) + +parser.add_argument( + "--with-post-body", + help="Import the posts with the post body. Only used if importing Posts.xml", + action="store_true", + default=False, +) + +parser.add_argument( + "-j", + "--insert-json", + help="Insert raw data as JSON.", + action="store_true", + default=False, +) + +parser.add_argument( + "-n", "--schema-name", help="Use specific schema.", default="public" +) + +parser.add_argument( + "--foreign-keys", help="Create foreign keys.", action="store_true", default=False +) args = parser.parse_args() @@ -380,23 +384,27 @@ def moveTableToSchema(table, schemaName, dbConnectionParam): except NameError: pass -dbConnectionParam = buildConnectionString(args.dbname, args.host, args.port, args.username, args.password) +dbConnectionParam = buildConnectionString( + args.dbname, args.host, args.port, args.username, args.password +) # load given file in table if args.file and args.table: table = args.table - if table == 'Posts': + if table == "Posts": # If the user has not explicitly asked for loading the body, we replace it with NULL if not args.with_post_body: - specialRules[('Posts', 'Body')] = 'NULL' + specialRules[("Posts", "Body")] = "NULL" - choice = input('This will drop the {} table. Are you sure [y/n]?'.format(table)) - if len(choice) > 0 and choice[0].lower() == 'y': - handleTable(table, args.insert_json, args.foreign_keys, args.file, dbConnectionParam) + choice = input("This will drop the {} table. Are you sure [y/n]?".format(table)) + if len(choice) > 0 and choice[0].lower() == "y": + handleTable( + table, args.insert_json, args.foreign_keys, args.file, dbConnectionParam + ) else: six.print_("Cancelled.") - if args.schema_name != 'public': + if args.schema_name != "public": moveTableToSchema(table, args.schema_name, dbConnectionParam) exit(0) @@ -412,32 +420,42 @@ def moveTableToSchema(table, schemaName, dbConnectionParam): url = filepath else: # download the 7z archive in tempdir - file_name = args.so_project + '.stackexchange.com.7z' - url = '{0}/{1}'.format(args.archive_url, file_name) - temp_dir = tempfile.mkdtemp(prefix='so_') + file_name = args.so_project + ".stackexchange.com.7z" + url = "{0}/{1}".format(args.archive_url, file_name) + temp_dir = tempfile.mkdtemp(prefix="so_") filepath = os.path.join(temp_dir, file_name) - six.print_('Downloading the archive in {0}'.format(filepath)) - six.print_('please be patient ...') + six.print_("Downloading the archive in {0}".format(filepath)) + six.print_("please be patient ...") try: six.moves.urllib.request.urlretrieve(url, filepath, show_progress) except Exception as e: - six.print_('Error: impossible to download the {0} archive ({1})'.format(url, e)) + six.print_( + "Error: impossible to download the {0} archive ({1})".format(url, e) + ) exit(1) try: libarchive.extract_file(filepath) except Exception as e: - six.print_('Error: impossible to extract the {0} archive ({1})'.format(url, e)) + six.print_("Error: impossible to extract the {0} archive ({1})".format(url, e)) exit(1) - tables = ['Tags', 'Users', 'Badges', 'Posts', 'Comments', - 'Votes', 'PostLinks', 'PostHistory'] + tables = [ + "Tags", + "Users", + "Badges", + "Posts", + "Comments", + "Votes", + "PostLinks", + "PostHistory", + ] for table in tables: - six.print_('Load {0}.xml file'.format(table)) + six.print_("Load {0}.xml file".format(table)) handleTable(table, args.insert_json, args.foreign_keys, None, dbConnectionParam) # remove file - os.remove(table + '.xml') + os.remove(table + ".xml") if not args.keep_archive: os.remove(filepath) @@ -447,11 +465,13 @@ def moveTableToSchema(table, schemaName, dbConnectionParam): else: six.print_("Archive '{0}' deleted".format(filepath)) - if args.schema_name != 'public': + if args.schema_name != "public": for table in tables: moveTableToSchema(table, args.schema_name, dbConnectionParam) exit(0) else: - six.print_("Error: you must either use '-f' and '-t' arguments or the '-s' argument.") + six.print_( + "Error: you must either use '-f' and '-t' arguments or the '-s' argument." + ) parser.print_help() diff --git a/row_processor.py b/row_processor.py index 195d2de..fec0814 100644 --- a/row_processor.py +++ b/row_processor.py @@ -2,6 +2,7 @@ from itertools import islice, chain import six + # Efficient parsing of large XML files from # http://stackoverflow.com/a/9814580/987185 def parse(fp): @@ -9,24 +10,25 @@ def parse(fp): returns a generator which yields one row at a time. """ - context = etree.iterparse(fp, events=('end',)) + context = etree.iterparse(fp, events=("end",)) for action, elem in context: - if elem.tag=='row': + if elem.tag == "row": # processing goes here assert elem.text is None, "The row wasn't empty" yield elem.attrib # cleanup # first empty children from current element - # This is not absolutely necessary if you are also deleting - # siblings, but it will allow you to free memory earlier. + # This is not absolutely necessary if you are also deleting + # siblings, but it will allow you to free memory earlier. elem.clear() # second, delete previous siblings (records) while elem.getprevious() is not None: del elem.getparent()[0] # make sure you have no references to Element objects outside the loop + def batch(iterable, size): """Creates a batches of size `size` from the `iterable`.""" sourceiter = iter(iterable) From 43ec3368b351de09eaef4103221fa7f0c804249d Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Thu, 7 Jan 2021 22:21:52 +0000 Subject: [PATCH 09/12] Bump lxml from 4.5.2 to 4.6.2 Bumps [lxml](https://github.com/lxml/lxml) from 4.5.2 to 4.6.2. - [Release notes](https://github.com/lxml/lxml/releases) - [Changelog](https://github.com/lxml/lxml/blob/master/CHANGES.txt) - [Commits](https://github.com/lxml/lxml/compare/lxml-4.5.2...lxml-4.6.2) Signed-off-by: dependabot[bot] --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 10665d2..4832127 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,5 @@ argparse==1.2.1 libarchive-c==2.9 -lxml==4.5.2 +lxml==4.6.2 psycopg2-binary==2.8.4 six==1.10.0 From 49d8358a83bb573c63c2715b122afb2fca98376c Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Sat, 12 Jun 2021 13:06:55 +0200 Subject: [PATCH 10/12] Bump lxml from 4.6.2 to 4.6.3 (#14) Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 4832127..196607c 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,5 @@ argparse==1.2.1 libarchive-c==2.9 -lxml==4.6.2 +lxml==4.6.3 psycopg2-binary==2.8.4 six==1.10.0 From 634c05739e99f1903b88e816939d98ccd2f1f834 Mon Sep 17 00:00:00 2001 From: Rodrigo Morales <74389646+rdrg109@users.noreply.github.com> Date: Sat, 4 Sep 2021 03:31:35 -0500 Subject: [PATCH 11/12] Create tables in the specified schema to avoid moving the tables afterwards (#21) Fixes #18. * Execute "moveTableToSchema" if table was succesfully created * Function for getting the connection parameters added * Function buildConnectionString deleted This is because the parameters are now obtained through the function getConnectionParameters * dbConnectionParam and moveTableToSchema replaced by single function dbConnectionParam has been deleted because now all parameters are obtained by a single function which is called in the function handleTable. moveTableToSchema has been deleted because tables are now created in the specified schema. Therefore, there is no need to move the table after their creation. Co-authored-by: rdrg109 <> --- load_into_pg.py | 73 +++++++++++++++++-------------------------------- 1 file changed, 25 insertions(+), 48 deletions(-) diff --git a/load_into_pg.py b/load_into_pg.py index 32c05aa..2d6cef5 100755 --- a/load_into_pg.py +++ b/load_into_pg.py @@ -46,24 +46,30 @@ def show_progress(block_num, block_size, total_size): file_part = None six.print_("") +def getConnectionParameters(): + """Get the parameters for the connection to the database.""" -def buildConnectionString(dbname, mbHost, mbPort, mbUsername, mbPassword): - dbConnectionParam = "dbname={}".format(dbname) + parameters = {} - if mbPort is not None: - dbConnectionParam += " port={}".format(mbPort) + if args.dbname: + parameters['dbname'] = args.dbname - if mbHost is not None: - dbConnectionParam += " host={}".format(mbHost) + if args.host: + parameters['host'] = args.host - # TODO Is the escaping done here correct? - if mbUsername is not None: - dbConnectionParam += " user={}".format(mbUsername) + if args.port: + parameters['port'] = args.port - # TODO Is the escaping done here correct? - if mbPassword is not None: - dbConnectionParam += " password={}".format(mbPassword) - return dbConnectionParam + if args.username: + parameters['user'] = args.username + + if args.password: + parameters['password'] = args.password + + if args.schema_name: + parameters['options'] = "-c search_path=" + args.schema_name + + return parameters def _makeDefValues(keys): @@ -174,7 +180,7 @@ def _getTableKeys(table): return keys -def handleTable(table, insertJson, createFk, mbDbFile, dbConnectionParam): +def handleTable(table, insertJson, createFk, mbDbFile): """Handle the table including the post/pre processing.""" keys = _getTableKeys(table) dbFile = mbDbFile if mbDbFile is not None else table + ".xml" @@ -193,7 +199,7 @@ def handleTable(table, insertJson, createFk, mbDbFile, dbConnectionParam): sys.exit(-1) try: - with pg.connect(dbConnectionParam) as conn: + with pg.connect(**getConnectionParameters()) as conn: with conn.cursor() as cur: try: with open(dbFile, "rb") as xml: @@ -273,29 +279,8 @@ def handleTable(table, insertJson, createFk, mbDbFile, dbConnectionParam): six.print_("Warning from the database.", file=sys.stderr) six.print_("pg.Warning: {0}".format(str(w)), file=sys.stderr) - -def moveTableToSchema(table, schemaName, dbConnectionParam): - try: - with pg.connect(dbConnectionParam) as conn: - with conn.cursor() as cur: - # create the schema - cur.execute("CREATE SCHEMA IF NOT EXISTS " + schemaName + ";") - conn.commit() - # move the table to the right schema - cur.execute("ALTER TABLE " + table + " SET SCHEMA " + schemaName + ";") - conn.commit() - except pg.Error as e: - six.print_("Error in dealing with the database.", file=sys.stderr) - six.print_("pg.Error ({0}): {1}".format(e.pgcode, e.pgerror), file=sys.stderr) - six.print_(str(e), file=sys.stderr) - except pg.Warning as w: - six.print_("Warning from the database.", file=sys.stderr) - six.print_("pg.Warning: {0}".format(str(w)), file=sys.stderr) - - ############################################################# - parser = argparse.ArgumentParser() parser.add_argument( "-t", @@ -384,10 +369,6 @@ def moveTableToSchema(table, schemaName, dbConnectionParam): except NameError: pass -dbConnectionParam = buildConnectionString( - args.dbname, args.host, args.port, args.username, args.password -) - # load given file in table if args.file and args.table: table = args.table @@ -398,14 +379,13 @@ def moveTableToSchema(table, schemaName, dbConnectionParam): specialRules[("Posts", "Body")] = "NULL" choice = input("This will drop the {} table. Are you sure [y/n]?".format(table)) + if len(choice) > 0 and choice[0].lower() == "y": handleTable( - table, args.insert_json, args.foreign_keys, args.file, dbConnectionParam - ) + table, args.insert_json, args.foreign_keys, args.file) else: six.print_("Cancelled.") - if args.schema_name != "public": - moveTableToSchema(table, args.schema_name, dbConnectionParam) + exit(0) # load a project @@ -453,7 +433,7 @@ def moveTableToSchema(table, schemaName, dbConnectionParam): for table in tables: six.print_("Load {0}.xml file".format(table)) - handleTable(table, args.insert_json, args.foreign_keys, None, dbConnectionParam) + handleTable(table, args.insert_json, args.foreign_keys, None) # remove file os.remove(table + ".xml") @@ -465,9 +445,6 @@ def moveTableToSchema(table, schemaName, dbConnectionParam): else: six.print_("Archive '{0}' deleted".format(filepath)) - if args.schema_name != "public": - for table in tables: - moveTableToSchema(table, args.schema_name, dbConnectionParam) exit(0) else: From 8bcb72858963daec7ee6688caef7a5f260f5489e Mon Sep 17 00:00:00 2001 From: Utkarsh Upadhyay <502876+musically-ut@users.noreply.github.com> Date: Sat, 4 Sep 2021 10:34:34 +0200 Subject: [PATCH 12/12] Acknowledge @rdrg109's contributions. --- README.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index f9026c4..2781792 100644 --- a/README.md +++ b/README.md @@ -10,8 +10,7 @@ and from [StackExchange Data Explorer](http://data.stackexchange.com). ## Quickstart -Install requirements, create a `stackoverflow` database, and use -`load_into_pg.py` script: +Install requirements, create a new database (e.g. `beerSO` below), and use `load_into_pg.py` script: ``` console $ pip install -r requirements.txt @@ -83,5 +82,6 @@ schema name: `SET search_path TO ;` ## Acknowledgement -[@madtibo](https://github.com/madtibo) made significant contributions by adding `jsonb` and Foreign Key support. -[@bersace](https://github.com/bersace) brought the dependencies and the `README.md` instructions into 2020. + - [@madtibo](https://github.com/madtibo) made significant contributions by adding `jsonb` and Foreign Key support. + - [@bersace](https://github.com/bersace) brought the dependencies and the `README.md` instructions into 2020s. + - [@rdrg109](https://github.com/rdrg109) simplified handling of non-public schemas and fixed bugs associated with re-importing tables.