import os
import sys
import re
import argparse
import subprocess
import logging
def add2dict(diff_dict: dict, path: str, line_num: str, content):
key = path
value = [line_num, content]
if key in diff_dict:
value_list = diff_dict.pop(key)
value_list.append(value)
else:
value_list = [value]
diff_dict.update({key: value_list})
def __diff_match_file_start(control_block: dict, line: str):
pattern = "diff --git"
if not line.startswith(pattern):
return False
control_block["line_num"] = 0
control_block["fullpath"] = ""
control_block["match_flag"] = False
control_block["curr_key"] = ""
control_block["is_new_file"] = False
return True
def __diff_match_is_newfile(control_block: dict, line: str):
pattern = "new file"
if not line.startswith(pattern):
return False
control_block["is_new_file"] = True
return True
def __diff_match_filename_with_minus(control_block: dict, line: str):
pattern = "---\ (a/)?.*"
if re.match(pattern, line) is None:
return False
control_block["match_flag"] = False
return True
def __diff_match_filename_with_plus(control_block: dict, line: str):
pattern = "\+\+\+\ b/(.*)"
if re.match(pattern, line) is None:
return False
for key in control_block["diff_dict"]:
if re.search(key, line) is not None:
control_block["curr_key"] = key
res = re.match(pattern, line)
control_block["fullpath"] = (
"{}, {}".format(control_block["pull_request_url"], res.group(1).strip())
)
if control_block['is_new_file'] is True:
control_block["fullpath"] = "%s%s" % (
control_block["fullpath"], "(new file)")
control_block["match_flag"] = True
return True
def __diff_match_start_linenum(control_block: dict, line: str):
pattern = "@@\ -[0-9]+,[0-9]+\ \+([0-9]+)(,[0-9]+)?\ @@.*"
if control_block["match_flag"] is False or re.match(pattern, line) is None:
return False
res = re.match(pattern, line)
control_block["line_num"] = int(res.group(1))
return True
def __diff_match_code_line(control_block: dict, line: str):
diff_dict = control_block["diff_dict"]
pattern1 = "[\ +-](.*)"
pattern2 = "([\ +-])?(.*)"
if control_block["match_flag"] is False or re.match(pattern1, line) is None:
return False
res = re.match(pattern2, line)
if res.group(1) == "+":
add2dict(
diff_dict[control_block["curr_key"]],
control_block["fullpath"],
control_block["line_num"],
res.group(2),
)
if res.group(1) != "-":
control_block["line_num"] = control_block["line_num"] + 1
return True
def strip_diff(diff_dict: dict, pull_request_url: str, gitee_pr_diff: str):
control_block = {
"line_num": 0,
"fullpath": "",
"match_flag": False,
"curr_key": "",
"diff_dict": diff_dict,
"pull_request_url": pull_request_url,
"is_new_file": False,
}
strip_diff_handlers = [
__diff_match_file_start,
__diff_match_is_newfile,
__diff_match_filename_with_minus,
__diff_match_filename_with_plus,
__diff_match_start_linenum,
__diff_match_code_line,
]
for line in gitee_pr_diff.splitlines():
for handler in strip_diff_handlers:
if handler(control_block, line) is True:
break
def get_diff_by_repo_pr_num(repo_url: str, pr_num: int):
diff_url = "%spulls/%s.diff" % (repo_url, str(pr_num))
cmd = "curl -L -s " + diff_url
gitee_pr_diff = ""
try:
ret = subprocess.Popen(
["/usr/bin/curl", "-L", "-s", diff_url],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
errors="replace",
)
gitee_pr_diff, errorout = ret.communicate()
if len(errorout) != 0:
logging.error("Popen error: ", errorout)
except Exception as err:
logging.error("error %s", err, cmd)
return gitee_pr_diff
class GiteeCsctPrehandler:
def __init__(self, pr_list: str, *patterns):
self.diff_dict = {}
for pattern in patterns:
pattern_dict = {pattern: {}}
self.diff_dict.update(pattern_dict)
repo_pr_num_list = pr_list.split(";")
for pr_item in repo_pr_num_list:
pr_split_group = pr_item.split("pulls/")
repo_url = pr_split_group[0].strip()
pr_num = pr_split_group[1].strip("/")
gitee_pr_diff = get_diff_by_repo_pr_num(repo_url, pr_num)
strip_diff(self.diff_dict, pr_item, gitee_pr_diff)
def clear_repo_num_file(self):
self.diff_dict.clear()
def get_diff_dict(self, pattern):
ret_diff = {}
if pattern in self.diff_dict.keys():
ret_diff = self.diff_dict[pattern]
return ret_diff
def test():
if len(sys.argv) == 1:
sys.stderr.write("test error: pr_list is empty.\n")
return
pr_list = sys.argv[1]
csct_prehandler = GiteeCsctPrehandler(
pr_list, "BUILD.gn", "bundle.json", ".gni")
print("==================start get diff====================")
print(csct_prehandler.get_diff_dict("BUILD.gn"))
print("==================start get diff====================")
print(csct_prehandler.get_diff_dict("bundle.json"))
print("========================end=========================")
if __name__ == "__main__":
sys.exit(test())