使用 Python 集成 GitHub App 和 GitHub Check API,构建持续集成服务
这篇博客的起因是在做项目的过程中要求使用 Python 完成相应功能,现在将这部份代码按教程的流程发布出来。
原文《使用 Checks API 创建 CI 测试》中使用 Ruby,现使用 Python 完成文档示例。由于教程已经将大部分内容详细描述了,本文只列出与原来教程有不同的步骤,以及对应的 Python 代码。
项目地址:qiwihui/githubappcheckruns
基本要求
文档:https://docs.github.com/cn/developers/apps/setting-up-your-development-environment-to-create-a-github-app
- 使用本地测试,利用 smee 转发 github 回调到本地
访问 smee.io 并创建一个新的 channel,比如 https://smee.io/LgDQ8xrhy0q2GeET,然后使用 pysmee
命令运行如下命令:
# 安装
pip install pysmee
# 运行命令
pysmee forward https://smee.io/LgDQ8xrhy0q2GeET http://localhost:5000/events
或者使用项目目录 smee 中的 node 脚本运行
npm i
npm run smee
第 1 部分 创建检查 API 接口
步骤 1.1. 更新应用程序权限
主要为以下权限:
- Repository permissions
- Checks: Read & write
- Contents: Read & write
- Pull requests: Read & write
- Subscribe to events
- check suite
- check run
步骤 1.2. 添加事件处理
对应于 Ruby 中使用 Sinatra 作为 web 框架,我们使用 Flask
作为 web 框架,并结合 PyGithub
这个库提供的 github API 封装,由于 PyGithub 在发布的版本中还未集成 check run 对应的 API,所以使用其 master
分支上的代码,添加 git+https://github.com/PyGithub/PyGithub.git
到 requirements.txt 中。
app = Flask(__name__)
APP_NAME = "Octo PyLinter"
app.config["GITHUB_APP_ID"] = config.GITHUB_APP_ID
app.config["GITHUB_KEY_FILE"] = config.GITHUB_KEY_FILE
app.config["GITHUB_SECRET"] = config.GITHUB_SECRET
app.config["GITHUB_APP_ROUTE"] = config.GITHUB_APP_ROUTE
github_app = GithubAppFlask(app)
@github_app.on(
[
"check_suite.requested",
"check_suite.rerequested",
"check_run.rerequested",
]
)
def create_check_run():
pass
client = github_app.github_app_installation.get_github_client()
head_sha = (
github_app.payload["check_run"]
if "check_run" in github_app.payload
else github_app.payload["check_suite"]["head_sha"]
)
repo = client.get_repo(github_app.payload["repository"]["full_name"])
repo.create_check_run(name=APP_NAME, head_sha=head_sha)
其中,GithubAppFlask
提供三个功能:
- 提供 github_app 封装;
- 提供
on
装饰器,对于不同 github 动作分发处理; - github webhook 认证;
步骤 1.3. 创建 check run
使用 PyGithub 库的 create_check_run
处理
def create_check_run():
client = github_app.github_app_installation.get_github_client()
head_sha = (
github_app.payload["check_run"]
if "check_run" in github_app.payload
else github_app.payload["check_suite"]["head_sha"]
)
repo = client.get_repo(github_app.payload["repository"]["full_name"])
repo.create_check_run(name=APP_NAME, head_sha=head_sha)
步骤 1.4. 更新 check run
@github_app.on(["check_run.created"])
def initiate_check_run():
"""Start the CI process"""
# Check that the event is being sent to this app
if str(github_app.payload["check_run"]["app"]["id"]) == config.GITHUB_APP_ID:
client = github_app.github_app_installation.get_github_client()
repo = client.get_repo(github_app.payload["repository"]["full_name"])
check_run = repo.get_check_run(github_app.payload["check_run"]["id"])
# Mark the check run as in process
check_run.edit(
name=APP_NAME,
status="in_progress",
started_at=datetime.now(),
)
# ***** RUN A CI TEST *****
# 暂略
# Mark the check run as complete!
check_run.edit(
name=APP_NAME,
status="completed",
completed_at=datetime.now(),
conclusion=conclusion
)
第 2 部分 创建 Octo RuboCop CI 测试
原教程使用 RuboCop 作为 ruby 代码语法检查和格式化工具,相对应,我们使用 pylint
作为 python 代码语法检查,使用 autopep8
作为格式化工具。
同样,对于git项目的操作,我们使用 GitPython
简化操作。
步骤 2.1. 添加 Python 文件
添加要操作的 python 文件即可。
步骤 2.2. 克隆仓库
使用 GitPython 库处理,使用临时目录进行克隆。
def clone_repository(full_repo_name, repository, ref, installation_token, clean=False):
repo_dir = tempfile.mkdtemp()
git.Git(repo_dir).clone(f"https://x-access-token:{installation_token}@github.com/{full_repo_name}.git")
# pull and chekout
repo = git.Repo(f"{repo_dir}/{repository}")
repo.git.pull()
repo.git.checkout(ref)
if clean:
shutil.rmtree(tempdir, ignore_errors=True)
return repo_dir
运行 CI 测试:
# ***** RUN A CI TEST *****
full_repo_name = github_app.payload["repository"]["full_name"]
repository = github_app.payload["repository"]["name"]
head_sha = github_app.payload["check_run"]["head_sha"]
repo_dir = clone_repository(
full_repo_name,
repository,
head_sha,
installation_token=github_app.github_app_installation.token,
clean=True,
)
步骤 2.3. 运行 pylint
pylint 运行并输出json结果。
command = f"pylint {repo_dir}/{repository}/**/*.py -f json"
report = subprocess.getoutput(command)
shutil.rmtree(repo_dir)
output = json.loads(report)
步骤 2.4. 收集 pylint 错误
pylint结果与 rubocop
类似,收集并解析结果:
# lint
max_annotations = 50
annotations = []
# RuboCop reports the number of errors found in "offense_count"
if len(output) == 0:
conclusion = "success"
actions = None
else:
conclusion = "neutral"
for file in output:
file_path = re.sub(f"{repo_dir}/{repository}/", "", file["path"])
annotation_level = "notice"
# Parse each offense to get details and location
# Limit the number of annotations to 50
if max_annotations == 0:
break
max_annotations -= 1
start_line = file["line"]
end_line = file["line"]
start_column = file["column"]
end_column = file["column"]
message = file["message"]
# Create a new annotation for each error
annotation = {
"path": file_path,
"start_line": start_line,
"end_line": end_line,
"start_column": start_column,
"end_column": end_column,
"annotation_level": annotation_level,
"message": message,
}
# # Annotations only support start and end columns on the same line
# if start_line == end_line:
# annotation.merge({"start_column": start_column, "end_column": end_column})
annotations.append(annotation)
# Need fix action
actions = [
{
"label": "Fix this",
"description": "Automatically fix all linter notices.",
"identifier": "fix_rubocop_notices",
}
]
步骤 2.5. 使用 CI 测试结果更新检查运行
整理结果,并添加修复动作:
summary = (
f"Summary\n"
f"- Offense count: {len(output)}\n"
f"- File count: {len(set([file['path'] for file in output]))}\n"
)
text = "Octo Pylinter version: pylint"
# Mark the check run as complete!
check_run.edit(
name=APP_NAME,
status="completed",
completed_at=datetime.now(),
conclusion=conclusion,
output={
"title": "Octo Pylinter",
"summary": summary,
"text": text,
"annotations": annotations,
},
actions=actions,
)
步骤 2.6. 自动修复错误
沿用 fix_rubocop_notices
这个 ID,使用 autopep8
做 python 文件的修正,将结果以 PR 的方式提交。
@github_app.on(["check_run.requested_action"])
def take_requested_action():
full_repo_name = github_app.payload["repository"]["full_name"]
repository = github_app.payload["repository"]["name"]
head_branch = github_app.payload["check_run"]["check_suite"]["head_branch"]
check_run_id = github_app.payload["check_run"]["id"]
if github_app.payload["requested_action"]["identifier"] == "fix_rubocop_notices":
repo_dir = clone_repository(
full_repo_name,
repository,
head_branch,
installation_token=github_app.github_app_installation.token,
)
# Automatically correct style errors
# fix with autopep8
command = f"autopep8 -a -i {repo_dir}/{repository}/**/*.py"
report = subprocess.getoutput(command)
# create new branch
new_branch = f"fix_rubocop_notices_{check_run_id}"
pushed = False
try:
repo = git.Repo(f"{repo_dir}/{repository}")
if repo.index.diff(None) or repo.untracked_files:
current = repo.create_head(new_branch)
current.checkout()
repo.config_writer().set_value("user", "name", config.GITHUB_APP_USER_NAME).release()
repo.config_writer().set_value("user", "email", config.GITHUB_APP_USER_EMAIL).release()
repo.git.add(update=True)
repo.git.commit("-m", "Automatically fix Octo RuboCop notices.")
repo.git.push("--set-upstream", "origin", current)
pushed = True
else:
print("no changes")
except:
print("failed to commit and push")
# # Nothing to commit!
# print("Nothing to commit")
finally:
shutil.rmtree(repo_dir, ignore_errors=True)
if pushed:
# create pull request
client = github_app.github_app_installation.get_github_client()
repo = client.get_repo(full_repo_name)
body = """Automatically fix Octo RuboCop notices."""
pr = repo.create_pull(
title="Automatically fix Octo RuboCop notices.", body=body, head=new_branch, base="master"
)
print(f"Pull Request number: {pr.number}")
在以上步骤的基础上,可以构建更复杂的测试过程,完成不同的需求。
GitHub repo: qiwihui/blog
Follow me: @qiwihui
Site: QIWIHUI