101 lines
4.1 KiB
Python
101 lines
4.1 KiB
Python
import requests
|
|
import argparse
|
|
import time
|
|
from requests.adapters import HTTPAdapter
|
|
from urllib3.util.retry import Retry
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
|
|
# 使用方法: python clean-gitea-user.py --keep-users admin user1 --gitea-url http://gitea.example.com --api-token your_token
|
|
|
|
# 解析命令行参数
|
|
parser = argparse.ArgumentParser(description='清理 Gitea 用户和仓库,保留指定的用户及其仓库。')
|
|
parser.add_argument('--keep-users', nargs='+', required=True, help='要保留的用户列表,例如 --keep-users user1 user2')
|
|
parser.add_argument('--gitea-url', required=True, help='Gitea 服务器的 URL,例如 http://localhost:3000')
|
|
parser.add_argument('--api-token', required=True, help='Gitea API Token')
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Gitea服务器 URL 和 API 访问 token
|
|
gitea_url = args.gitea_url
|
|
api_token = args.api_token
|
|
|
|
# 获取所有用户的API请求
|
|
users_api = f'{gitea_url}/api/v1/admin/users' # 获取所有用户的 API URL
|
|
headers = {
|
|
'Authorization': f'token {api_token}' # 设置认证头,使用 API Token
|
|
}
|
|
|
|
# 创建带有重试机制的会话
|
|
session = requests.Session()
|
|
retry = Retry(total=5, backoff_factor=1, status_forcelist=[429, 500, 502, 503, 504])
|
|
adapter = HTTPAdapter(max_retries=retry)
|
|
session.mount('http://', adapter)
|
|
session.mount('https://', adapter)
|
|
|
|
# 删除指定用户的仓库(并发版本)
|
|
def delete_user_repositories(username):
|
|
"""
|
|
获取并删除指定用户的所有仓库。
|
|
:param username: 用户名
|
|
"""
|
|
# 获取用户的仓库
|
|
repos_api = f'{gitea_url}/api/v1/users/{username}/repos' # 用户仓库 API URL
|
|
response = session.get(repos_api, headers=headers)
|
|
|
|
if response.status_code == 200:
|
|
repos = response.json()
|
|
with ThreadPoolExecutor(max_workers=5) as executor: # 并发删除仓库
|
|
futures = []
|
|
for repo in repos:
|
|
repo_name = repo['name']
|
|
futures.append(executor.submit(delete_single_repo, username, repo_name))
|
|
for future in as_completed(futures):
|
|
try:
|
|
future.result()
|
|
except Exception as e:
|
|
print(f'Error deleting repo: {e}')
|
|
else:
|
|
print(f'Failed to fetch repositories for {username}. Status code: {response.status_code}')
|
|
|
|
def delete_single_repo(username, repo_name):
|
|
delete_repo_api = f'{gitea_url}/api/v1/repos/{username}/{repo_name}'
|
|
delete_response = session.delete(delete_repo_api, headers=headers)
|
|
if delete_response.status_code == 204:
|
|
print(f'Repository {repo_name} deleted successfully.')
|
|
else:
|
|
print(f'Failed to delete repository {repo_name}. Status: {delete_response.status_code}')
|
|
time.sleep(0.1) # 减少延迟
|
|
|
|
# 删除用户
|
|
def delete_user(username):
|
|
delete_user_api = f'{gitea_url}/api/v1/admin/users/{username}'
|
|
delete_response = session.delete(delete_user_api, headers=headers)
|
|
if delete_response.status_code == 204:
|
|
print(f'User {username} deleted successfully.')
|
|
else:
|
|
print(f'Failed to delete user {username}. Status: {delete_response.status_code}')
|
|
time.sleep(0.1)
|
|
|
|
# 获取所有用户并删除除指定用户外的用户及其仓库
|
|
response = session.get(users_api, headers=headers)
|
|
|
|
if response.status_code == 200:
|
|
users = response.json()
|
|
with ThreadPoolExecutor(max_workers=3) as executor: # 并发处理用户
|
|
futures = []
|
|
for user in users:
|
|
username = user['username']
|
|
if username not in args.keep_users:
|
|
print(f'Processing user: {username}')
|
|
# 先删除仓库,再删除用户
|
|
delete_user_repositories(username)
|
|
futures.append(executor.submit(delete_user, username))
|
|
else:
|
|
print(f'User {username} is being kept.')
|
|
for future in as_completed(futures):
|
|
try:
|
|
future.result()
|
|
except Exception as e:
|
|
print(f'Error deleting user: {e}')
|
|
else:
|
|
print('Failed to fetch users. Status code:', response.status_code) |