Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Get owners for espresso and oracle, and fix a bug for teradata
  • Loading branch information
Na Zhang committed Oct 19, 2016
commit 043dc25e89a1e681cb1438aaa65be4e1e8fbf123
15 changes: 15 additions & 0 deletions data-model/DDL/ETL_DDL/git_metadata.sql
Original file line number Diff line number Diff line change
Expand Up @@ -94,3 +94,18 @@ CREATE TABLE `stg_repo_owner` (
`paths` TEXT CHAR SET utf8 DEFAULT NULL COMMENT 'covered paths by this acl',
PRIMARY KEY (`scm_repo_fullname`, `scm_type`, `owner_type`, `owner_name`, `app_id`)
) ENGINE = InnoDB DEFAULT CHARSET = latin1;



CREATE TABLE stg_database_scm_map (
`database_name` VARCHAR(100) COMMENT 'database name',
`database_type` VARCHAR(50) COMMENT 'database type',
`app_name` VARCHAR(127) COMMENT 'the name of application',
`scm_type` VARCHAR(50) COMMENT 'scm type',
`scm_url` VARCHAR(127) COMMENT 'scm url',
`committers` VARCHAR(500) COMMENT 'committers',
`filepath` VARCHAR(200) COMMENT 'filepath',
`app_id` INT COMMENT 'application id of the namesapce',
`wh_etl_exec_id` BIGINT COMMENT 'wherehows etl execution id that modified this record',
PRIMARY KEY (`database_type`,`database_name`,`scm_type`,`app_name`)
) ENGINE = InnoDB DEFAULT CHARSET = latin1;
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/**
* Copyright 2015 LinkedIn Corp. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*/
package metadata.etl.git;

import java.io.InputStream;
import java.util.Properties;
import metadata.etl.EtlJob;


public class CodeSearchMetadataEtl extends EtlJob {

@Deprecated
public CodeSearchMetadataEtl(int appId, long whExecId) {
super(appId, null, whExecId);
}

public CodeSearchMetadataEtl(int appId, long whExecId, Properties prop) {
super(appId, null, whExecId, prop);
}


@Override
public void extract()
throws Exception {
logger.info("In Code Search metadata ETL, launch extract jython scripts");
InputStream inputStream = classLoader.getResourceAsStream("jython/CodeSearchExtract.py");
// logger.info("call scripts with args: " + interpreter.getSystemState().argv);
interpreter.execfile(inputStream);
inputStream.close();
}

@Override
public void transform()
throws Exception {
}

@Override
public void load()
throws Exception {
logger.info("In Code Search metadata ETL, launch load jython scripts");
InputStream inputStream = classLoader.getResourceAsStream("jython/CodeSearchLoad.py");
interpreter.execfile(inputStream);
inputStream.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import metadata.etl.scheduler.azkaban.AzkabanExecEtl;
import metadata.etl.scheduler.oozie.OozieExecEtl;
import metadata.etl.models.EtlJobName;
import metadata.etl.git.CodeSearchMetadataEtl;


/**
Expand Down Expand Up @@ -63,6 +64,8 @@ public static EtlJob getEtlJob(EtlJobName etlJobName, Integer refId, Long whExec
return new OracleMetadataEtl(refId, whExecId, properties);
case PRODUCT_REPO_METADATA_ETL:
return new MultiproductMetadataEtl(refId, whExecId, properties);
case DATABASE_SCM_METADATA_ETL:
return new CodeSearchMetadataEtl(refId, whExecId, properties);
default:
throw new UnsupportedOperationException("Unsupported job type: " + etlJobName);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ public enum EtlJobName {
ORACLE_DATASET_METADATA_ETL(EtlType.DATASET, RefIdType.DB),
PRODUCT_REPO_METADATA_ETL(EtlType.OPERATION, RefIdType.APP),
KAFKA_CONSUMER_ETL(EtlType.OPERATION, RefIdType.DB),
DATABASE_SCM_METADATA_ETL(EtlType.OPERATION, RefIdType.APP),
;

EtlType etlType;
Expand Down
190 changes: 190 additions & 0 deletions metadata-etl/src/main/resources/jython/CodeSearchExtract.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
#
# Copyright 2015 LinkedIn Corp. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#

import sys,os,re
import requests
import subprocess
from wherehows.common import Constant
from wherehows.common.schemas import SCMOwnerRecord
from wherehows.common.writers import FileWriter
from org.slf4j import LoggerFactory


class CodeSearchExtract:
"""
Lists all repos for oracle & espresso databases. Since this feature is not
available through the UI, we need to use http://go/codesearch to discover
the multiproduct repos that use 'li-db' plugin.
"""

# verbose = False
limit_search_result = 500
# limit_multiproduct = None
# limit_plugin = None

def __init__(self):
self.logger = LoggerFactory.getLogger('jython script : ' + self.__class__.__name__)
self.base_url = args[Constant.BASE_URL_KEY]
self.code_search_committer_writer = FileWriter(args[Constant.DATABASE_SCM_REPO_OUTPUT_KEY])

def run(self):
offset_min = 1
offset_max = 100
databases = []
search_request = \
{"request":
{
"other":{"CurrentResult":str(offset_min),"requestTimeout":"200000000"},
"queryContext":{"numToScore":1000,"docDataSet":"results","rawQuery":"type:gradle plugin:*'li-db'"},
"paginationContext":{"numToReturn":offset_max}
}
}
while True:
resp = requests.post(self.base_url + '/galene-codesearch?action=search',
json=search_request,
verify=False)
if resp.status_code != 200:
# This means something went wrong.
d = resp.json()
self.logger.info("Request Error! Stack trace {}".format(d['stackTrace']))
# raise Exception('Request Error', 'POST /galene-codesearch?action=search %s' % (resp.status_code))
break

result = resp.json()['value']
self.logger.debug("Pagination offset = {}".format(result['total']))
for element in result['elements']:
fpath = element['docData']['filepath']
ri = fpath.rindex('/')
prop_file = fpath[:ri] + '/database.properties'
# e.g. identity-mt/database/Identity/database.properties
# network/database/externmembermap/database.properties
# cap-backend/database/campaigns-db/database.properties
databases.append( {'filepath': prop_file, 'app_name': element['docData']['mp']} )

if result['total'] < 100:
break
offset_min += int(result['total'])
offset_max += 100 # if result['total'] < 100 else result['total']
search_request['request']['other']['CurrentResult'] = str(offset_min)
search_request['request']['paginationContext']['numToReturn'] = offset_max
self.logger.debug("Property file path {}".format(search_request))

self.logger.debug(" length of databases is {}".format(len(databases)))

owner_count = 0
committers_count = 0
for db in databases:
prop_file = db['filepath']
file_request = \
{"request":{
"other":{"filepath":prop_file,
"TextTokenize":"True",
"CurrentResult":"1",
"requestTimeout":"2000000000"
},
"queryContext":{"numToScore":10,"docDataSet":"result"},
"paginationContext":{"numToReturn":1}
}
}
resp = requests.post(self.base_url + '/galene-codesearch?action=search',
json=file_request,
verify=False)
if resp.status_code != 200:
# This means something went wrong.
d = resp.json()
self.logger.info("Request Error! Stack trace {}".format(d['stackTrace']))
continue
result = resp.json()['value']
if result['total'] < 1:
self.logger.info("Nothing found for {}".format(prop_file))
continue
if "repoUrl" in result['elements'][0]['docData']:
db['scm_url'] = result['elements'][0]['docData']['repoUrl']
db['scm_type'] = result['elements'][0]['docData']['repotype']
db['committers'] = ''

if db['scm_type'] == 'SVN':
schema_in_repo = re.sub(r"http://(\w+)\.([\w\.\-/].*)database.properties\?view=markup",
"http://svn." + r"\2" + "schema", db['scm_url'])
db['committers'] = self.get_svn_committers(schema_in_repo)
committers_count +=1
self.logger.info("Committers for {} => {}".format(schema_in_repo,db['committers']))

else:
self.logger.info("Search request {}".format(prop_file))

code = result['elements'][0]['docData']['code']
code_dict = dict(line.split("=", 1) for line in code.strip().splitlines())
if "database.name" in code_dict:
db['database_name'] = code_dict['database.name']
if "database.type" in code_dict:
db['database_type'] = code_dict['database.type']

owner_record = SCMOwnerRecord(
db['scm_url'],
db['database_name'],
db['database_type'],
db['app_name'],
db['filepath'],
db['committers'],
db['scm_type']
)
owner_count += 1
self.code_search_committer_writer.append(owner_record)

self.code_search_committer_writer.close()
self.logger.info('Finish Fetching committers, total {} committers entries'.format(committers_count))
self.logger.info('Finish Fetching SVN owners, total {} records'.format(owner_count))


def get_svn_committers(self, svn_repo_path):
"""Collect recent committers from the cmd
svn log %s | grep '^\(A=\|r[0-9]* \)' | head -10
e.g.
r1617887 | htang | 2016-09-21 14:27:40 -0700 (Wed, 21 Sep 2016) | 12 lines
A=shanda,pravi
r1600397 | llu | 2016-08-08 17:14:22 -0700 (Mon, 08 Aug 2016) | 3 lines
A=rramakri,htang
"""
#svn_cmd = """svn log %s | grep '^\(A=\|r[0-9]* \)' | head -10"""
committers = []
possible_svn_paths = [svn_repo_path, svn_repo_path + "ta"]
for svn_repo_path in possible_svn_paths:
p = subprocess.Popen('svn log ' + svn_repo_path + " |grep '^\(A=\|r[0-9]* \)' |head -10",
shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
svn_log_output, svn_log_err = p.communicate()
if svn_log_err[:12] == 'svn: E160013':
continue # try the next possible path

for line in svn_log_output.split('\n'):
if re.match(r"r[0-9]+", line):
committer = line.split('|')[1].strip()
if committer not in committers:
committers.append(committer)
elif line[:2] == 'A=':
for apvr in line[2:].split(','):
if apvr not in committers:
committers.append(apvr)


if len(committers) > 0:
self.logger.debug(" {}, ' => ', {}".format(svn_repo_path,committers))
break

return ','.join(committers)

if __name__ == "__main__":
args = sys.argv[1]
e = CodeSearchExtract()
e.run()
Loading
You are viewing a condensed version of this merge commit. You can view the full changes here.