mirror of
https://github.com/osukey/osukey.git
synced 2025-08-03 06:36:31 +09:00
Merge branch 'master' into realm-migration-operation-blocking
This commit is contained in:
@ -22,7 +22,6 @@ using osu.Game.IO.Archives;
|
||||
using osu.Game.IPC;
|
||||
using osu.Game.Overlays.Notifications;
|
||||
using SharpCompress.Archives.Zip;
|
||||
using FileInfo = osu.Game.IO.FileInfo;
|
||||
|
||||
namespace osu.Game.Database
|
||||
{
|
||||
@ -38,6 +37,11 @@ namespace osu.Game.Database
|
||||
{
|
||||
private const int import_queue_request_concurrency = 1;
|
||||
|
||||
/// <summary>
|
||||
/// The size of a batch import operation before considering it a lower priority operation.
|
||||
/// </summary>
|
||||
private const int low_priority_import_batch_size = 1;
|
||||
|
||||
/// <summary>
|
||||
/// A singleton scheduler shared by all <see cref="ArchiveModelManager{TModel,TFileModel}"/>.
|
||||
/// </summary>
|
||||
@ -47,6 +51,13 @@ namespace osu.Game.Database
|
||||
/// </remarks>
|
||||
private static readonly ThreadedTaskScheduler import_scheduler = new ThreadedTaskScheduler(import_queue_request_concurrency, nameof(ArchiveModelManager<TModel, TFileModel>));
|
||||
|
||||
/// <summary>
|
||||
/// A second scheduler for lower priority imports.
|
||||
/// For simplicity, these will just run in parallel with normal priority imports, but a future refactor would see this implemented via a custom scheduler/queue.
|
||||
/// See https://gist.github.com/peppy/f0e118a14751fc832ca30dd48ba3876b for an incomplete version of this.
|
||||
/// </summary>
|
||||
private static readonly ThreadedTaskScheduler import_scheduler_low_priority = new ThreadedTaskScheduler(import_queue_request_concurrency, nameof(ArchiveModelManager<TModel, TFileModel>));
|
||||
|
||||
/// <summary>
|
||||
/// Set an endpoint for notifications to be posted to.
|
||||
/// </summary>
|
||||
@ -103,8 +114,11 @@ namespace osu.Game.Database
|
||||
|
||||
/// <summary>
|
||||
/// Import one or more <typeparamref name="TModel"/> items from filesystem <paramref name="paths"/>.
|
||||
/// This will post notifications tracking progress.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// This will be treated as a low priority import if more than one path is specified; use <see cref="Import(ImportTask[])"/> to always import at standard priority.
|
||||
/// This will post notifications tracking progress.
|
||||
/// </remarks>
|
||||
/// <param name="paths">One or more archive locations on disk.</param>
|
||||
public Task Import(params string[] paths)
|
||||
{
|
||||
@ -126,6 +140,13 @@ namespace osu.Game.Database
|
||||
|
||||
protected async Task<IEnumerable<TModel>> Import(ProgressNotification notification, params ImportTask[] tasks)
|
||||
{
|
||||
if (tasks.Length == 0)
|
||||
{
|
||||
notification.CompletionText = $"No {HumanisedModelName}s were found to import!";
|
||||
notification.State = ProgressNotificationState.Completed;
|
||||
return Enumerable.Empty<TModel>();
|
||||
}
|
||||
|
||||
notification.Progress = 0;
|
||||
notification.Text = $"{HumanisedModelName.Humanize(LetterCasing.Title)} import is initialising...";
|
||||
|
||||
@ -133,13 +154,15 @@ namespace osu.Game.Database
|
||||
|
||||
var imported = new List<TModel>();
|
||||
|
||||
bool isLowPriorityImport = tasks.Length > low_priority_import_batch_size;
|
||||
|
||||
await Task.WhenAll(tasks.Select(async task =>
|
||||
{
|
||||
notification.CancellationToken.ThrowIfCancellationRequested();
|
||||
|
||||
try
|
||||
{
|
||||
var model = await Import(task, notification.CancellationToken);
|
||||
var model = await Import(task, isLowPriorityImport, notification.CancellationToken).ConfigureAwait(false);
|
||||
|
||||
lock (imported)
|
||||
{
|
||||
@ -159,7 +182,7 @@ namespace osu.Game.Database
|
||||
{
|
||||
Logger.Error(e, $@"Could not import ({task})", LoggingTarget.Database);
|
||||
}
|
||||
}));
|
||||
})).ConfigureAwait(false);
|
||||
|
||||
if (imported.Count == 0)
|
||||
{
|
||||
@ -193,15 +216,16 @@ namespace osu.Game.Database
|
||||
/// Note that this bypasses the UI flow and should only be used for special cases or testing.
|
||||
/// </summary>
|
||||
/// <param name="task">The <see cref="ImportTask"/> containing data about the <typeparamref name="TModel"/> to import.</param>
|
||||
/// <param name="lowPriority">Whether this is a low priority import.</param>
|
||||
/// <param name="cancellationToken">An optional cancellation token.</param>
|
||||
/// <returns>The imported model, if successful.</returns>
|
||||
internal async Task<TModel> Import(ImportTask task, CancellationToken cancellationToken = default)
|
||||
internal async Task<TModel> Import(ImportTask task, bool lowPriority = false, CancellationToken cancellationToken = default)
|
||||
{
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
|
||||
TModel import;
|
||||
using (ArchiveReader reader = task.GetReader())
|
||||
import = await Import(reader, cancellationToken);
|
||||
import = await Import(reader, lowPriority, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
// We may or may not want to delete the file depending on where it is stored.
|
||||
// e.g. reconstructing/repairing database with items from default storage.
|
||||
@ -226,11 +250,12 @@ namespace osu.Game.Database
|
||||
public Action<IEnumerable<TModel>> PresentImport;
|
||||
|
||||
/// <summary>
|
||||
/// Import an item from an <see cref="ArchiveReader"/>.
|
||||
/// Silently import an item from an <see cref="ArchiveReader"/>.
|
||||
/// </summary>
|
||||
/// <param name="archive">The archive to be imported.</param>
|
||||
/// <param name="lowPriority">Whether this is a low priority import.</param>
|
||||
/// <param name="cancellationToken">An optional cancellation token.</param>
|
||||
public Task<TModel> Import(ArchiveReader archive, CancellationToken cancellationToken = default)
|
||||
public Task<TModel> Import(ArchiveReader archive, bool lowPriority = false, CancellationToken cancellationToken = default)
|
||||
{
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
|
||||
@ -253,7 +278,7 @@ namespace osu.Game.Database
|
||||
return null;
|
||||
}
|
||||
|
||||
return Import(model, archive, cancellationToken);
|
||||
return Import(model, archive, lowPriority, cancellationToken);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
@ -303,12 +328,13 @@ namespace osu.Game.Database
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Import an item from a <typeparamref name="TModel"/>.
|
||||
/// Silently import an item from a <typeparamref name="TModel"/>.
|
||||
/// </summary>
|
||||
/// <param name="item">The model to be imported.</param>
|
||||
/// <param name="archive">An optional archive to use for model population.</param>
|
||||
/// <param name="lowPriority">Whether this is a low priority import.</param>
|
||||
/// <param name="cancellationToken">An optional cancellation token.</param>
|
||||
public async Task<TModel> Import(TModel item, ArchiveReader archive = null, CancellationToken cancellationToken = default) => await Task.Factory.StartNew(async () =>
|
||||
public virtual async Task<TModel> Import(TModel item, ArchiveReader archive = null, bool lowPriority = false, CancellationToken cancellationToken = default) => await Task.Factory.StartNew(async () =>
|
||||
{
|
||||
cancellationToken.ThrowIfCancellationRequested();
|
||||
|
||||
@ -331,7 +357,7 @@ namespace osu.Game.Database
|
||||
item.Files = archive != null ? createFileInfos(archive, Files) : new List<TFileModel>();
|
||||
item.Hash = ComputeHash(item, archive);
|
||||
|
||||
await Populate(item, archive, cancellationToken);
|
||||
await Populate(item, archive, cancellationToken).ConfigureAwait(false);
|
||||
|
||||
using (var write = ContextFactory.GetForWrite()) // used to share a context for full import. keep in mind this will block all writes.
|
||||
{
|
||||
@ -383,7 +409,7 @@ namespace osu.Game.Database
|
||||
|
||||
flushEvents(true);
|
||||
return item;
|
||||
}, cancellationToken, TaskCreationOptions.HideScheduler, import_scheduler).Unwrap();
|
||||
}, cancellationToken, TaskCreationOptions.HideScheduler, lowPriority ? import_scheduler_low_priority : import_scheduler).Unwrap().ConfigureAwait(false);
|
||||
|
||||
/// <summary>
|
||||
/// Exports an item to a legacy (.zip based) package.
|
||||
@ -436,6 +462,8 @@ namespace osu.Game.Database
|
||||
// Dereference the existing file info, since the file model will be removed.
|
||||
if (file.FileInfo != null)
|
||||
{
|
||||
file.Requery(usage.Context);
|
||||
|
||||
Files.Dereference(file.FileInfo);
|
||||
|
||||
// This shouldn't be required, but here for safety in case the provided TModel is not being change tracked
|
||||
@ -594,7 +622,7 @@ namespace osu.Game.Database
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Create all required <see cref="FileInfo"/>s for the provided archive, adding them to the global file store.
|
||||
/// Create all required <see cref="IO.FileInfo"/>s for the provided archive, adding them to the global file store.
|
||||
/// </summary>
|
||||
private List<TFileModel> createFileInfos(ArchiveReader reader, FileStore files)
|
||||
{
|
||||
@ -609,10 +637,12 @@ namespace osu.Game.Database
|
||||
{
|
||||
using (Stream s = reader.GetStream(file))
|
||||
{
|
||||
var fileInfo = files.Add(s);
|
||||
fileInfos.Add(new TFileModel
|
||||
{
|
||||
Filename = file.Substring(prefix.Length).ToStandardisedPath(),
|
||||
FileInfo = files.Add(s)
|
||||
FileInfo = fileInfo,
|
||||
FileInfoID = fileInfo.ID // workaround for efcore 5 compatibility.
|
||||
});
|
||||
}
|
||||
}
|
||||
@ -625,7 +655,7 @@ namespace osu.Game.Database
|
||||
/// <summary>
|
||||
/// Set a storage with access to an osu-stable install for import purposes.
|
||||
/// </summary>
|
||||
public Func<Storage> GetStableStorage { private get; set; }
|
||||
public Func<StableStorage> GetStableStorage { private get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Denotes whether an osu-stable installation is present to perform automated imports from.
|
||||
@ -638,9 +668,10 @@ namespace osu.Game.Database
|
||||
protected virtual string ImportFromStablePath => null;
|
||||
|
||||
/// <summary>
|
||||
/// Select paths to import from stable. Default implementation iterates all directories in <see cref="ImportFromStablePath"/>.
|
||||
/// Select paths to import from stable where all paths should be absolute. Default implementation iterates all directories in <see cref="ImportFromStablePath"/>.
|
||||
/// </summary>
|
||||
protected virtual IEnumerable<string> GetStableImportPaths(Storage stableStoage) => stableStoage.GetDirectories(ImportFromStablePath);
|
||||
protected virtual IEnumerable<string> GetStableImportPaths(Storage storage) => storage.GetDirectories(ImportFromStablePath)
|
||||
.Select(path => storage.GetFullPath(path));
|
||||
|
||||
/// <summary>
|
||||
/// Whether this specified path should be removed after successful import.
|
||||
@ -654,24 +685,33 @@ namespace osu.Game.Database
|
||||
/// </summary>
|
||||
public Task ImportFromStableAsync()
|
||||
{
|
||||
var stable = GetStableStorage?.Invoke();
|
||||
var stableStorage = GetStableStorage?.Invoke();
|
||||
|
||||
if (stable == null)
|
||||
if (stableStorage == null)
|
||||
{
|
||||
Logger.Log("No osu!stable installation available!", LoggingTarget.Information, LogLevel.Error);
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
if (!stable.ExistsDirectory(ImportFromStablePath))
|
||||
var storage = PrepareStableStorage(stableStorage);
|
||||
|
||||
if (!storage.ExistsDirectory(ImportFromStablePath))
|
||||
{
|
||||
// This handles situations like when the user does not have a Skins folder
|
||||
Logger.Log($"No {ImportFromStablePath} folder available in osu!stable installation", LoggingTarget.Information, LogLevel.Error);
|
||||
return Task.CompletedTask;
|
||||
}
|
||||
|
||||
return Task.Run(async () => await Import(GetStableImportPaths(GetStableStorage()).Select(f => stable.GetFullPath(f)).ToArray()));
|
||||
return Task.Run(async () => await Import(GetStableImportPaths(storage).ToArray()).ConfigureAwait(false));
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Run any required traversal operations on the stable storage location before performing operations.
|
||||
/// </summary>
|
||||
/// <param name="stableStorage">The stable storage.</param>
|
||||
/// <returns>The usable storage. Return the unchanged <paramref name="stableStorage"/> if no traversal is required.</returns>
|
||||
protected virtual Storage PrepareStableStorage(StableStorage stableStorage) => stableStorage;
|
||||
|
||||
#endregion
|
||||
|
||||
/// <summary>
|
||||
|
71
osu.Game/Database/DatabaseWorkaroundExtensions.cs
Normal file
71
osu.Game/Database/DatabaseWorkaroundExtensions.cs
Normal file
@ -0,0 +1,71 @@
|
||||
// Copyright (c) ppy Pty Ltd <contact@ppy.sh>. Licensed under the MIT Licence.
|
||||
// See the LICENCE file in the repository root for full licence text.
|
||||
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using osu.Game.Beatmaps;
|
||||
using osu.Game.Scoring;
|
||||
using osu.Game.Skinning;
|
||||
|
||||
namespace osu.Game.Database
|
||||
{
|
||||
/// <summary>
|
||||
/// Extension methods which contain workarounds to make EFcore 5.x work with our existing (incorrect) thread safety.
|
||||
/// The intention is to avoid blocking package updates while we consider the future of the database backend, with a potential backend switch imminent.
|
||||
/// </summary>
|
||||
public static class DatabaseWorkaroundExtensions
|
||||
{
|
||||
/// <summary>
|
||||
/// Re-query the provided model to ensure it is in a sane state. This method requires explicit implementation per model type.
|
||||
/// </summary>
|
||||
/// <param name="model"></param>
|
||||
/// <param name="contextFactory"></param>
|
||||
public static void Requery(this IHasPrimaryKey model, IDatabaseContextFactory contextFactory)
|
||||
{
|
||||
switch (model)
|
||||
{
|
||||
case SkinInfo skinInfo:
|
||||
requeryFiles(skinInfo.Files, contextFactory);
|
||||
break;
|
||||
|
||||
case ScoreInfo scoreInfo:
|
||||
requeryFiles(scoreInfo.Beatmap.BeatmapSet.Files, contextFactory);
|
||||
requeryFiles(scoreInfo.Files, contextFactory);
|
||||
break;
|
||||
|
||||
case BeatmapSetInfo beatmapSetInfo:
|
||||
var context = contextFactory.Get();
|
||||
|
||||
foreach (var beatmap in beatmapSetInfo.Beatmaps)
|
||||
{
|
||||
// Workaround System.InvalidOperationException
|
||||
// The instance of entity type 'RulesetInfo' cannot be tracked because another instance with the same key value for {'ID'} is already being tracked.
|
||||
beatmap.Ruleset = context.RulesetInfo.Find(beatmap.RulesetID);
|
||||
}
|
||||
|
||||
requeryFiles(beatmapSetInfo.Files, contextFactory);
|
||||
break;
|
||||
|
||||
default:
|
||||
throw new ArgumentException($"{nameof(Requery)} does not have support for the provided model type", nameof(model));
|
||||
}
|
||||
|
||||
void requeryFiles<T>(List<T> files, IDatabaseContextFactory databaseContextFactory) where T : class, INamedFileInfo
|
||||
{
|
||||
var dbContext = databaseContextFactory.Get();
|
||||
|
||||
foreach (var file in files)
|
||||
{
|
||||
Requery(file, dbContext);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static void Requery(this INamedFileInfo file, OsuDbContext dbContext)
|
||||
{
|
||||
// Workaround System.InvalidOperationException
|
||||
// The instance of entity type 'FileInfo' cannot be tracked because another instance with the same key value for {'ID'} is already being tracked.
|
||||
file.FileInfo = dbContext.FileInfo.Find(file.FileInfoID);
|
||||
}
|
||||
}
|
||||
}
|
@ -54,10 +54,5 @@ namespace osu.Game.Database
|
||||
Dispose(true);
|
||||
GC.SuppressFinalize(this);
|
||||
}
|
||||
|
||||
~DatabaseWriteUsage()
|
||||
{
|
||||
Dispose(false);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -82,7 +82,7 @@ namespace osu.Game.Database
|
||||
Task.Factory.StartNew(async () =>
|
||||
{
|
||||
// This gets scheduled back to the update thread, but we want the import to run in the background.
|
||||
var imported = await Import(notification, new ImportTask(filename));
|
||||
var imported = await Import(notification, new ImportTask(filename)).ConfigureAwait(false);
|
||||
|
||||
// for now a failed import will be marked as a failed download for simplicity.
|
||||
if (!imported.Any())
|
||||
|
@ -29,7 +29,7 @@ namespace osu.Game.Database
|
||||
if (CheckExists(lookup, out TValue performance))
|
||||
return performance;
|
||||
|
||||
var computed = await ComputeValueAsync(lookup, token);
|
||||
var computed = await ComputeValueAsync(lookup, token).ConfigureAwait(false);
|
||||
|
||||
if (computed != null || CacheNullValues)
|
||||
cache[lookup] = computed;
|
||||
|
@ -3,7 +3,6 @@
|
||||
|
||||
using System;
|
||||
using Microsoft.EntityFrameworkCore;
|
||||
using Microsoft.EntityFrameworkCore.Diagnostics;
|
||||
using Microsoft.Extensions.Logging;
|
||||
using osu.Framework.Logging;
|
||||
using osu.Framework.Statistics;
|
||||
@ -113,10 +112,10 @@ namespace osu.Game.Database
|
||||
{
|
||||
base.OnConfiguring(optionsBuilder);
|
||||
optionsBuilder
|
||||
// this is required for the time being due to the way we are querying in places like BeatmapStore.
|
||||
// if we ever move to having consumers file their own .Includes, or get eager loading support, this could be re-enabled.
|
||||
.ConfigureWarnings(warnings => warnings.Ignore(CoreEventId.IncludeIgnoredWarning))
|
||||
.UseSqlite(connectionString, sqliteOptions => sqliteOptions.CommandTimeout(10))
|
||||
.UseSqlite(connectionString,
|
||||
sqliteOptions => sqliteOptions
|
||||
.CommandTimeout(10)
|
||||
.UseQuerySplittingBehavior(QuerySplittingBehavior.SingleQuery))
|
||||
.UseLoggerFactory(logger.Value);
|
||||
}
|
||||
|
||||
|
@ -28,7 +28,7 @@ namespace osu.Game.Database
|
||||
public Task<User> GetUserAsync(int userId, CancellationToken token = default) => GetAsync(userId, token);
|
||||
|
||||
protected override async Task<User> ComputeValueAsync(int lookup, CancellationToken token = default)
|
||||
=> await queryUser(lookup);
|
||||
=> await queryUser(lookup).ConfigureAwait(false);
|
||||
|
||||
private readonly Queue<(int id, TaskCompletionSource<User>)> pendingUserTasks = new Queue<(int, TaskCompletionSource<User>)>();
|
||||
private Task pendingRequestTask;
|
||||
|
Reference in New Issue
Block a user