179 lines
5.4 KiB
Python
Executable File
179 lines
5.4 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
import datetime
|
|
import logging
|
|
import os
|
|
import pathlib
|
|
import sys
|
|
|
|
import requests
|
|
import yaml
|
|
|
|
|
|
# Load config file if it exists, or set as an empty dict
|
|
|
|
config_file = pathlib.Path(__file__).parent.joinpath("config.yml")
|
|
|
|
if pathlib.Path.exists(config_file):
|
|
config = yaml.safe_load(open(config_file).read())
|
|
else:
|
|
config = dict()
|
|
|
|
# set variables from environment or config
|
|
|
|
log_level = os.environ.get("MOD_LOG_LEVEL", config.get("log_level", "INFO"))
|
|
base_url = os.environ.get("MOD_BASE_URL", config.get("base_url"))
|
|
api_token = os.environ.get('MOD_API_TOKEN', config.get("api_token"))
|
|
orgs= os.environ.get("MOD_ORGS", config.get("orgs", list()))
|
|
repos= os.environ.get("MOD_REPOS", config.get("repos", list()))
|
|
block_users = os.environ.get("MOD_BLOCK_USERS", config.get("block_users", list()))
|
|
history = os.environ.get("MOD_HISTORY", config.get("history", dict(days=1)))
|
|
|
|
|
|
logging.basicConfig(format="[%(levelname)s] %(message)s", level=log_level)
|
|
logging.debug(f"log_level = {log_level}")
|
|
|
|
|
|
# Run checks on variables and make necessary changes
|
|
|
|
if isinstance(base_url, str):
|
|
base_url = f"{base_url.strip('/')}/api/v1"
|
|
else:
|
|
logging.critical("base_url not set!")
|
|
sys.exit(1)
|
|
logging.debug(f"base_url = {base_url}")
|
|
|
|
if api_token is None:
|
|
logging.critical("api_token not set!")
|
|
sys.exit(1)
|
|
|
|
if isinstance(orgs, str):
|
|
orgs = orgs.replace(" ", "").split(",")
|
|
logging.debug(f"orgs = {orgs}")
|
|
|
|
if isinstance(repos, str):
|
|
repos = repos.replace(" ", "").split(",")
|
|
logging.debug(f"repos = {repos}")
|
|
|
|
if isinstance(block_users, str):
|
|
block_users = block_users.replace(" ", "").split(",")
|
|
|
|
for i, entry in enumerate(block_users):
|
|
if ":" in entry:
|
|
block_users[i] = [ entry.split(":")[0], ":".join(entry.split(":")[1:]) ]
|
|
else:
|
|
block_users[i] = [ entry ]
|
|
logging.debug(f"block_users = {block_users}")
|
|
|
|
if isinstance(history, str):
|
|
history = dict((k.strip(), int(v.strip())) for k, v in
|
|
(item.split(":") for item in history.split(",")))
|
|
logging.debug(f"history = {history}")
|
|
|
|
|
|
s = requests.Session()
|
|
s.headers.update(Authorization=f"token {api_token}")
|
|
|
|
dt_hist = datetime.datetime.utcnow() - datetime.timedelta(**history)
|
|
dt_hist = f"{dt_hist.isoformat()}Z"
|
|
|
|
|
|
# get list of repos to moderate
|
|
|
|
repolist = list()
|
|
|
|
for org in orgs:
|
|
resp = s.get(f"{base_url}/orgs/{org}/repos")
|
|
if resp.ok:
|
|
for repo in resp.json():
|
|
repolist.append(repo)
|
|
logging.info(f"Watching repo {repo.get('full_name')}")
|
|
else:
|
|
logging.warning(f"Error getting org {org} ({resp.status_code} {resp.reason})")
|
|
|
|
for repo in repos:
|
|
resp = s.get(f"{base_url}/repo/{repo}")
|
|
if resp.ok:
|
|
repolist.append(resp.json())
|
|
logging.info(f"Watching repo {resp.json().get('full_name')}")
|
|
else:
|
|
logging.warning(f"Error getting repo {repo} ({resp.status_code} {resp.reason})")
|
|
|
|
if len(repolist) == 0:
|
|
logging.info("No repos to check. Exiting!")
|
|
sys.exit(0)
|
|
|
|
# list of users to moderate
|
|
|
|
userlist = list()
|
|
|
|
for user in block_users:
|
|
|
|
if len(user) > 1:
|
|
if datetime.datetime.fromisoformat(user[1]) < datetime.datetime.utcnow():
|
|
logging.warning(f"Block has expired for {user[0]}, skipping.")
|
|
continue
|
|
|
|
resp = s.get(f"{base_url}/users/{user[0]}")
|
|
if resp.ok:
|
|
userlist.append(resp.json())
|
|
logging.info(f"Blocking user {user[0]}.")
|
|
else:
|
|
logging.warning(f"Error getting user {user[0]} ({resp.status_code} {resp.reason})")
|
|
|
|
if len(userlist) == 0:
|
|
logging.info("No users to block. Exiting!")
|
|
sys.exit(0)
|
|
|
|
|
|
# remove issues created by blocked users
|
|
|
|
logging.info(f"Searching for issues and PRs created by blocked users.")
|
|
|
|
for repo in repolist:
|
|
|
|
repostr = f"/repos/{repo.get('full_name')}/issues"
|
|
|
|
for user in userlist:
|
|
|
|
data = dict(state="all", created_by=user.get("username"), since=dt_hist)
|
|
params = "&".join(f"{k}={v}" for k, v in data.items())
|
|
resp = s.get(f"{base_url}{repostr}?{params}")
|
|
|
|
if resp.ok:
|
|
|
|
for issue in resp.json():
|
|
logging.info(f"Found issue created by {user.get('username')}!")
|
|
logging.info(f"Deleting issue {issue.get('html_url')}")
|
|
dresp = s.delete(issue.get("url"))
|
|
if not dresp.ok:
|
|
logging.error(f"Error deleting {issue.get('html_url')} ({dresp.status_code} {dresp.reason})")
|
|
else:
|
|
logging.warning(f"{resp.url} {resp.status_code} {resp.reason}")
|
|
|
|
|
|
# remove comments added by moderated users
|
|
|
|
logging.info(f"Searching for comments created by blocked users.")
|
|
|
|
for repo in repolist:
|
|
|
|
commentstr = f"/repos/{repo.get('full_name')}/issues/comments"
|
|
|
|
for user in userlist:
|
|
|
|
resp = s.get(f"{base_url}{commentstr}?since={dt_hist}")
|
|
|
|
if resp.ok:
|
|
|
|
for comment in resp.json():
|
|
commentuser = comment.get("user").get("username")
|
|
if commentuser in [user.get("username") for user in userlist]:
|
|
logging.info(f"Found comment from {commentuser}!")
|
|
logging.info(f"Deleting comment {comment.get('html_url')}")
|
|
dresp = s.delete(f"{base_url}{commentstr}/{comment.get('id')}")
|
|
if not dresp.ok:
|
|
logging.error(f"Error deleting {comment.get('html_url')} ({dresp.status_code} {dresp.reason})")
|
|
else:
|
|
logging.warning(f"{resp.url} {resp.status_code} {resp.reason}")
|