From b4d2d0bd0b245cba425d9e5d981e3831d9269931 Mon Sep 17 00:00:00 2001 From: Dean Herbert Date: Mon, 10 Jun 2019 13:19:58 +0900 Subject: [PATCH] Simplify and combine concurrency of ArchiveModelManager --- osu.Game/Beatmaps/BeatmapManager.cs | 77 ++----------------- osu.Game/Database/ArchiveModelManager.cs | 96 ++++++++++++------------ 2 files changed, 57 insertions(+), 116 deletions(-) diff --git a/osu.Game/Beatmaps/BeatmapManager.cs b/osu.Game/Beatmaps/BeatmapManager.cs index f5ef52a93b..9e7b6d7971 100644 --- a/osu.Game/Beatmaps/BeatmapManager.cs +++ b/osu.Game/Beatmaps/BeatmapManager.cs @@ -2,7 +2,6 @@ // See the LICENCE file in the repository root for full licence text. using System; -using System.Collections.Concurrent; using System.Collections.Generic; using System.IO; using System.Linq; @@ -111,7 +110,7 @@ namespace osu.Game.Beatmaps validateOnlineIds(beatmapSet); - await Task.WhenAll(beatmapSet.Beatmaps.Select(b => updateQueue.Enqueue(new UpdateItem(b, cancellationToken)).Task).ToArray()); + await Task.WhenAll(beatmapSet.Beatmaps.Select(b => updateQueue.Perform(b, cancellationToken)).ToArray()); } protected override void PreImport(BeatmapSetInfo beatmapSet) @@ -424,81 +423,24 @@ namespace osu.Game.Beatmaps private class BeatmapUpdateQueue { private readonly IAPIProvider api; - private readonly Queue queue = new Queue(); - private int activeThreads; + private readonly ThreadedTaskScheduler updateScheduler = new ThreadedTaskScheduler(4); public BeatmapUpdateQueue(IAPIProvider api) { this.api = api; } - public UpdateItem Enqueue(UpdateItem item) + public Task Perform(BeatmapInfo beatmap, CancellationToken cancellationToken) + => Task.Factory.StartNew(() => perform(beatmap, cancellationToken), cancellationToken, TaskCreationOptions.HideScheduler, updateScheduler); + + private void perform(BeatmapInfo beatmap, CancellationToken cancellation) { - lock (queue) - { - queue.Enqueue(item); - - if (activeThreads >= 16) - return item; - - new Thread(runWork) { IsBackground = true }.Start(); - activeThreads++; - } - - return item; - } - - private void runWork() - { - while (true) - { - UpdateItem toProcess; - - lock (queue) - { - if (queue.Count == 0) - break; - - toProcess = queue.Dequeue(); - } - - toProcess.PerformUpdate(api); - } - - lock (queue) - activeThreads--; - } - } - - private class UpdateItem - { - public Task Task => tcs.Task; - - private readonly BeatmapInfo beatmap; - private readonly CancellationToken cancellationToken; - - private readonly TaskCompletionSource tcs = new TaskCompletionSource(); - - public UpdateItem(BeatmapInfo beatmap, CancellationToken cancellationToken) - { - this.beatmap = beatmap; - this.cancellationToken = cancellationToken; - } - - public void PerformUpdate(IAPIProvider api) - { - if (cancellationToken.IsCancellationRequested) - { - tcs.SetCanceled(); + if (cancellation.IsCancellationRequested) return; - } if (api?.State != APIState.Online) - { - tcs.SetResult(false); return; - } Logger.Log("Attempting online lookup for the missing values...", LoggingTarget.Database); @@ -512,17 +454,14 @@ namespace osu.Game.Beatmaps beatmap.BeatmapSet.Status = res.BeatmapSet.Status; beatmap.BeatmapSet.OnlineBeatmapSetID = res.OnlineBeatmapSetID; beatmap.OnlineBeatmapID = res.OnlineBeatmapID; - - tcs.SetResult(true); }; req.Failure += e => { Logger.Log($"Failed ({e})", LoggingTarget.Database); - - tcs.SetResult(false); }; + // intentionally blocking to limit web request concurrency req.Perform(api); } } diff --git a/osu.Game/Database/ArchiveModelManager.cs b/osu.Game/Database/ArchiveModelManager.cs index afc614772e..e30e1cd3ee 100644 --- a/osu.Game/Database/ArchiveModelManager.cs +++ b/osu.Game/Database/ArchiveModelManager.cs @@ -1,4 +1,4 @@ -// Copyright (c) ppy Pty Ltd . Licensed under the MIT Licence. +// Copyright (c) ppy Pty Ltd . Licensed under the MIT Licence. // See the LICENCE file in the repository root for full licence text. using System; @@ -11,7 +11,6 @@ using JetBrains.Annotations; using Microsoft.EntityFrameworkCore; using osu.Framework; using osu.Framework.Extensions; -using osu.Framework.Extensions.TypeExtensions; using osu.Framework.IO.File; using osu.Framework.Logging; using osu.Framework.Platform; @@ -32,7 +31,7 @@ namespace osu.Game.Database /// /// The model type. /// The associated file join type. - public abstract class ArchiveModelManager : ICanAcceptFiles + public abstract class ArchiveModelManager : ArchiveModelManager, ICanAcceptFiles where TModel : class, IHasFiles, IHasPrimaryKey, ISoftDelete where TFileModel : INamedFileInfo, new() { @@ -112,11 +111,8 @@ namespace osu.Game.Database a.Invoke(); } - private readonly ThreadedTaskScheduler importScheduler; - protected ArchiveModelManager(Storage storage, IDatabaseContextFactory contextFactory, MutableDatabaseBackedStoreWithFileIncludes modelStore, IIpcHost importHost = null) { - importScheduler = new ThreadedTaskScheduler(16, $"{GetType().ReadableName()}.Import"); ContextFactory = contextFactory; ModelStore = modelStore; @@ -152,55 +148,55 @@ namespace osu.Game.Database var term = $"{typeof(TModel).Name.Replace("Info", "").ToLower()}"; - var tasks = new List(); - int current = 0; - foreach (string path in paths) + var imported = new List(); + + await Task.WhenAll(paths.Select(path => Import(path, notification.CancellationToken).ContinueWith(t => { - tasks.Add(Import(path, notification.CancellationToken).ContinueWith(t => + lock (notification) { - lock (notification) - { - current++; + current++; - notification.Text = $"Imported {current} of {paths.Length} {term}s"; - notification.Progress = (float)current / paths.Length; - } + notification.Text = $"Imported {current} of {paths.Length} {term}s"; + notification.Progress = (float)current / paths.Length; + } - if (t.Exception != null) - { - var e = t.Exception.InnerException ?? t.Exception; - Logger.Error(e, $@"Could not import ({Path.GetFileName(path)})"); - } - })); + if (t.Exception == null) + { + lock (imported) + imported.Add(t.Result); + } + else + { + var e = t.Exception.InnerException ?? t.Exception; + Logger.Error(e, $@"Could not import ({Path.GetFileName(path)})"); + } + }))); + + if (imported.Count == 0) + { + notification.Text = "Import failed!"; + notification.State = ProgressNotificationState.Cancelled; } + else + { + notification.CompletionText = imported.Count == 1 + ? $"Imported {imported.First()}!" + : $"Imported {current} {term}s!"; - await Task.WhenAll(tasks); + if (imported.Count > 0 && PresentImport != null) + { + notification.CompletionText += " Click to view."; + notification.CompletionClickAction = () => + { + PresentImport?.Invoke(imported); + return true; + }; + } - // if (imported.Count == 0) - // { - // notification.Text = "Import failed!"; - // notification.State = ProgressNotificationState.Cancelled; - // } - // else - // { - // notification.CompletionText = imported.Count == 1 - // ? $"Imported {imported.First()}!" - // : $"Imported {current} {term}s!"; - // - // if (imported.Count > 0 && PresentImport != null) - // { - // notification.CompletionText += " Click to view."; - // notification.CompletionClickAction = () => - // { - // PresentImport?.Invoke(imported); - // return true; - // }; - // } - // - // notification.State = ProgressNotificationState.Completed; - // } + notification.State = ProgressNotificationState.Completed; + } } /// @@ -368,7 +364,7 @@ namespace osu.Game.Database } return item; - }, CancellationToken.None, TaskCreationOptions.None, importScheduler).Unwrap(); + }, CancellationToken.None, TaskCreationOptions.HideScheduler, IMPORT_SCHEDULER).Unwrap(); /// /// Perform an update of the specified item. @@ -615,4 +611,10 @@ namespace osu.Game.Database throw new InvalidFormatException($"{path} is not a valid archive"); } } + + public abstract class ArchiveModelManager + { + // allow sharing static across all generic types + protected static readonly ThreadedTaskScheduler IMPORT_SCHEDULER = new ThreadedTaskScheduler(1); + } }