Merge "Parallelize repo creation by org"

This commit is contained in:
Zuul 2019-07-17 17:01:38 +00:00 committed by Gerrit Code Review
commit 24ce1f6f8e

View File

@ -28,7 +28,6 @@ LP_REPO = 'https://bugs.launchpad.net/{repo}'
LP_FORMAT = 'https://bugs.launchpad.net/{repo}/+bug/{{index}}' LP_FORMAT = 'https://bugs.launchpad.net/{repo}/+bug/{{index}}'
class Gitea(object): class Gitea(object):
def __init__(self, url, password, always_update, projects): def __init__(self, url, password, always_update, projects):
@ -36,7 +35,12 @@ class Gitea(object):
self.password = password self.password = password
self.always_update = always_update self.always_update = always_update
self.projects = projects self.projects = projects
self.orgs = { f['project'].split('/')[0] for f in self.projects } self.orgs = { f['project'].split('/')[0] for f in self.projects }
self.org_projects = {}
for org in self.orgs:
p = [ f for f in self.projects
if (f['project'].split('/')[0] == org) ]
self.org_projects[org] = p
self._log = [] self._log = []
self.session = requests.Session() self.session = requests.Session()
self.failed = False self.failed = False
@ -119,27 +123,33 @@ class Gitea(object):
else: else:
external_tracker_url = LP_REPO.format(repo=repo) external_tracker_url = LP_REPO.format(repo=repo)
tracker_url_format = LP_FORMAT.format(repo=repo) tracker_url_format = LP_FORMAT.format(repo=repo)
self.post( for count in range(0, 5):
'/{org}/{repo}/settings'.format(org=org, repo=repo), try:
data=dict( self.post(
_csrf=csrf_token, '/{org}/{repo}/settings'.format(org=org, repo=repo),
action='advanced', data=dict(
# enable_pulls is not provided, which disables it _csrf=csrf_token,
# enable_wiki is not provided, which disables it action='advanced',
enable_external_wiki=False, # enable_pulls is not provided, which disables it
external_wiki_url='', # enable_wiki is not provided, which disables it
# enable_issues is on so that issue links work enable_external_wiki=False,
enable_issues='on', external_wiki_url='',
enable_external_tracker=True, # enable_issues is on so that issue links work
external_tracker_url=external_tracker_url, enable_issues='on',
tracker_url_format=tracker_url_format, enable_external_tracker=True,
tracker_issue_style='numeric', external_tracker_url=external_tracker_url,
), tracker_url_format=tracker_url_format,
allow_redirects=False) tracker_issue_style='numeric',
# Set allow_redirects to false because gitea returns ),
# with a 302 on success, and we don't need to follow allow_redirects=False)
# that. # Set allow_redirects to false because gitea returns
self.log("Updated tracker url:", external_tracker_url) # with a 302 on success, and we don't need to follow
# that.
self.log("Updated tracker url:", external_tracker_url)
return
except requests.exceptions.HTTPError as e:
time.sleep(3)
raise Exception("Could not update tracker url")
def update_gitea_project_branches(self, project, csrf_token): def update_gitea_project_branches(self, project, csrf_token):
org, repo = project['project'].split('/', 1) org, repo = project['project'].split('/', 1)
@ -163,8 +173,26 @@ class Gitea(object):
time.sleep(3) time.sleep(3)
raise Exception("Could not update branch settings") raise Exception("Could not update branch settings")
def make_projects(self, projects, gitea_repos, csrf_token,
settings_thread_pool, branches_thread_pool, futures):
for project in projects:
if project['project'] in gitea_repos:
create = False
else:
create = True
if create:
# TODO: use threadpool when we're running with
# https://github.com/go-gitea/gitea/pull/7493
self.make_gitea_project(project, csrf_token)
if create or self.always_update:
futures.append(settings_thread_pool.submit(
self.update_gitea_project_settings,
project, csrf_token))
futures.append(branches_thread_pool.submit(
self.update_gitea_project_branches,
project, csrf_token))
def run(self): def run(self):
thread_pool = concurrent.futures.ThreadPoolExecutor()
futures = [] futures = []
gitea_orgs = self.get_gitea_orgs() gitea_orgs = self.get_gitea_orgs()
gitea_repos = [] gitea_repos = []
@ -175,29 +203,59 @@ class Gitea(object):
gitea_repos.extend(self.get_org_repo_list(org)) gitea_repos.extend(self.get_org_repo_list(org))
csrf_token = self.get_csrf_token() csrf_token = self.get_csrf_token()
for project in self.projects: # We can create repos in parallel, as long as all the repos
if project['project'] in gitea_repos: # for the same org are in series (due to database contention,
create = False # until https://github.com/go-gitea/gitea/pull/7493 is
else: # merged). It doesn't help to have more than 2 since
create = True # openstack is the largest and everything else combined is
if create: # less than that.
# TODO: use threadpool when we're running with org_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=2)
# https://github.com/go-gitea/gitea/pull/7493 settings_thread_pool = concurrent.futures.ThreadPoolExecutor()
self.make_gitea_project(project, csrf_token) branches_thread_pool = concurrent.futures.ThreadPoolExecutor()
if create or self.always_update:
futures.append(thread_pool.submit( # The very first update to the repo_unit table needs to happen
self.update_gitea_project_settings, # without any other actions in parallel, otherwise a lock will
project, csrf_token)) # be held for a significant amount of time causing requests to
futures.append(thread_pool.submit( # back up (and some to fail). Work through the project list
self.update_gitea_project_branches, # in series until we find the first that updates the project
project, csrf_token)) # settings (this will be the first with any significant work).
org_task_lists = []
for org, projects in self.org_projects.items():
org_task_lists.append(projects)
first_settings = False
for task_list in org_task_lists:
while task_list:
project = task_list.pop(0)
self.make_projects([project], gitea_repos, csrf_token,
settings_thread_pool, branches_thread_pool,
futures)
if len(futures) > 1:
first_settings = True
self.wait_for_futures(futures)
futures = []
if first_settings:
break
# Once that is done, we can parallelize the rest. Sort the
# org task lists by length so that we pack them into our two
# threads efficiently.
sorted_task_lists = sorted(
org_task_lists, key=lambda x: len(x), reverse=True)
for projects in sorted_task_lists:
futures.append(org_thread_pool.submit(
self.make_projects,
projects, gitea_repos, csrf_token, settings_thread_pool,
branches_thread_pool, futures))
self.wait_for_futures(futures)
def wait_for_futures(self, futures):
for f in futures: for f in futures:
try: try:
r = f.result() r = f.result()
except Exception as e: except Exception as e:
self.log(str(e)) self.log(str(e))
self.failed = True self.failed = True
thread_pool.shutdown()
def ansible_main(): def ansible_main():