Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
fix:Change global name 'schema' is not defined (#457)
  • Loading branch information
Alda committed Jun 13, 2017
commit 4effeca6fc263b4f082d251bc5f2094ac7afe6d7
50 changes: 42 additions & 8 deletions metadata-etl/src/main/resources/jython/OracleExtract.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from wherehows.common.schemas import SampleDataRecord
from wherehows.common import Constant
from org.slf4j import LoggerFactory
from wherehows.common.writers import FileWriter


class OracleExtract:
Expand Down Expand Up @@ -190,7 +191,7 @@ def format_view_metadata(self, rows, schema):
self.logger.info("")


def format_table_metadata(self, rows):
def format_table_metadata(self, rows,schema):
'''
add table info from rows into schema
:param rows: input. each row is a database with all it's tables
Expand All @@ -201,10 +202,38 @@ def format_table_metadata(self, rows):
table_record = {}
table_idx = 0
field_idx = 0
db_dict = {}

db_output_dict = {}
db_idx = len(schema) - 1
db_output_idx = -1

for row in rows:
table_name_key = "%s.%s" % (row[0], row[1])
table_urn = "oracle:///%s/%s" % (row[0], row[1])
if row[0] not in db_dict:
schema.append({'database': row[0], 'type': 'Oracle', 'tables': []})
db_idx += 1
db_dict[row[0]] = db_idx
full_name = ''
if row[0]:
full_name = row[0]
if row[1]:
full_name += '.' + row[1]
elif row[1]:
full_name = row[1]
original_name = row[0] + '.' + row[1]
if full_name not in db_output_dict:
schema[db_idx]['tables'].append(
{'name': row[1], 'type': 'Oracle', 'columns': [],'original_name': original_name})
db_output_idx += 1
db_output_dict[full_name] = db_output_idx
schema[db_idx]['tables'][db_output_idx]['columns'].append(
{'name': row[4], 'nullable': row[6], 'dataType': row[5] ,
'maxByteLength': row[7] , 'precision': row[8] ,
'scale': row[9] ,'default': row[12]})
column_idx = len(schema[db_idx]['tables'][db_output_idx]['columns']) - 1
datetime.datetime.now(), db_output_idx + 1, len(rows), row[0]))

if 'urn' not in table_record or table_urn != table_record['urn']:
# This is a new table. Let's push the previous table record into output_list
Expand Down Expand Up @@ -265,6 +294,7 @@ def get_sample_data(self, database_name, table_name):
fullname = ''
columns = []
rows_data = []
fullname = database_name + '."' + table_name + '"'

sql = 'SELECT * FROM %s WHERE ROWNUM<=10' % fullname
curs_td = self.conn_db.cursor()
Expand Down Expand Up @@ -332,12 +362,13 @@ def run(self, database_name, table_name, table_output_file, field_output_file, s
:param collect_sample:
:return:
"""
schema = []
if database_name is None and table_name is None: # default route: process everything
begin = datetime.datetime.now().strftime("%H:%M:%S")
# table info
rows = self.get_table_info(None, None)
self.get_extra_table_info()
self.format_table_metadata(rows)
self.format_table_metadata(rows,schema)
end = datetime.datetime.now().strftime("%H:%M:%S")
self.logger.info("Collecting table info [%s -> %s]" % (str(begin), str(end)))

Expand All @@ -348,12 +379,13 @@ def run(self, database_name, table_name, table_output_file, field_output_file, s
csv_columns = ['dataset_urn', 'sort_id', 'name', 'data_type', 'nullable',
'size', 'precision', 'scale', 'default_value', 'doc']
self.write_csv(field_output_file, csv_columns, self.field_output_list)

scaned_dict = {}
if sample:
csvfile = open(sample_output_file, 'wb')
open(sample_output_file, 'wb')
os.chmod(sample_output_file, 0666)
writer = csv.DictWriter(csvfile, fieldnames=csv_columns, delimiter='\x1A', lineterminator='\n',
quoting=csv.QUOTE_NONE, quotechar='\1', escapechar='\0')
sample_file_writer = FileWriter(sample_output_file)
##writer = csv.DictWriter(csvfile, fieldnames=csv_columns, delimiter='\x1A', lineterminator='\n',
## quoting=csv.QUOTE_NONE, quotechar='\1', escapechar='\0')
self.logger.info("Writing to CSV file {}".format(sample_output_file))

# collect sample data
Expand All @@ -373,8 +405,10 @@ def run(self, database_name, table_name, table_output_file, field_output_file, s
(ref_urn, sample_data) = self.get_sample_data(database_name, table_name)
sample_record = SampleDataRecord('oracle', '/' + database_name + '/' + table_name, '', sample_data)
scaned_dict[table_name] = {'ref_urn': ref_urn, 'data': sample_data}
writer.writerow(sample_record)
csvfile.close()
sample_file_writer.append(sample_record)
sample_file_writer.close()
#writer.writerow(sample_record)
#csvfile.close()


if __name__ == "__main__":
Expand Down
2 changes: 1 addition & 1 deletion metadata-etl/src/main/resources/jython/OracleLoad.py
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ def run(self):
begin = datetime.datetime.now().strftime("%H:%M:%S")
self.load_tables()
self.load_fields()
# self.load_sample()
self.load_sample()
end = datetime.datetime.now().strftime("%H:%M:%S")
self.logger.info("Load Oracle metadata [%s -> %s]" % (str(begin), str(end)))
finally:
Expand Down