Merge remote-tracking branch 'upstream/master' into generic-download-model-manager

This commit is contained in:
naoey
2019-06-12 17:36:43 +05:30
41 changed files with 647 additions and 252 deletions

View File

@ -5,6 +5,7 @@ using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using JetBrains.Annotations;
using Microsoft.EntityFrameworkCore;
@ -13,6 +14,7 @@ using osu.Framework.Extensions;
using osu.Framework.IO.File;
using osu.Framework.Logging;
using osu.Framework.Platform;
using osu.Framework.Threading;
using osu.Game.IO;
using osu.Game.IO.Archives;
using osu.Game.IPC;
@ -29,7 +31,7 @@ namespace osu.Game.Database
/// </summary>
/// <typeparam name="TModel">The model type.</typeparam>
/// <typeparam name="TFileModel">The associated file join type.</typeparam>
public abstract class ArchiveModelManager<TModel, TFileModel> : ICanAcceptFiles, IModelManager<TModel>
public abstract class ArchiveModelManager<TModel, TFileModel> : ArchiveModelManager, ICanAcceptFiles, IModelManager<TModel>
where TModel : class, IHasFiles<TFileModel>, IHasPrimaryKey, ISoftDelete
where TFileModel : INamedFileInfo, new()
{
@ -130,56 +132,50 @@ namespace osu.Game.Database
/// This will post notifications tracking progress.
/// </summary>
/// <param name="paths">One or more archive locations on disk.</param>
public void Import(params string[] paths)
public Task Import(params string[] paths)
{
var notification = new ProgressNotification { State = ProgressNotificationState.Active };
PostNotification?.Invoke(notification);
Import(notification, paths);
return Import(notification, paths);
}
protected void Import(ProgressNotification notification, params string[] paths)
protected async Task Import(ProgressNotification notification, params string[] paths)
{
notification.Progress = 0;
notification.Text = "Import is initialising...";
var term = $"{typeof(TModel).Name.Replace("Info", "").ToLower()}";
List<TModel> imported = new List<TModel>();
int current = 0;
foreach (string path in paths)
var imported = new List<TModel>();
await Task.WhenAll(paths.Select(async path =>
{
if (notification.State == ProgressNotificationState.Cancelled)
// user requested abort
return;
notification.CancellationToken.ThrowIfCancellationRequested();
try
{
var text = "Importing ";
var model = await Import(path, notification.CancellationToken);
if (path.Length > 1)
text += $"{++current} of {paths.Length} {term}s..";
else
text += $"{term}..";
lock (imported)
{
imported.Add(model);
current++;
// only show the filename if it isn't a temporary one (as those look ugly).
if (!path.Contains(Path.GetTempPath()))
text += $"\n{Path.GetFileName(path)}";
notification.Text = text;
imported.Add(Import(path));
notification.Progress = (float)current / paths.Length;
notification.Text = $"Imported {current} of {paths.Length} {humanisedModelName}s";
notification.Progress = (float)current / paths.Length;
}
}
catch (TaskCanceledException)
{
throw;
}
catch (Exception e)
{
e = e.InnerException ?? e;
Logger.Error(e, $@"Could not import ({Path.GetFileName(path)})");
Logger.Error(e, $@"Could not import ({Path.GetFileName(path)})", LoggingTarget.Database);
}
}
}));
if (imported.Count == 0)
{
@ -190,7 +186,7 @@ namespace osu.Game.Database
{
notification.CompletionText = imported.Count == 1
? $"Imported {imported.First()}!"
: $"Imported {current} {term}s!";
: $"Imported {current} {humanisedModelName}s!";
if (imported.Count > 0 && PresentImport != null)
{
@ -210,12 +206,15 @@ namespace osu.Game.Database
/// Import one <see cref="TModel"/> from the filesystem and delete the file on success.
/// </summary>
/// <param name="path">The archive location on disk.</param>
/// <param name="cancellationToken">An optional cancellation token.</param>
/// <returns>The imported model, if successful.</returns>
public TModel Import(string path)
public async Task<TModel> Import(string path, CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
TModel import;
using (ArchiveReader reader = getReaderFrom(path))
import = Import(reader);
import = await Import(reader, cancellationToken);
// We may or may not want to delete the file depending on where it is stored.
// e.g. reconstructing/repairing database with items from default storage.
@ -228,7 +227,7 @@ namespace osu.Game.Database
}
catch (Exception e)
{
Logger.Error(e, $@"Could not delete original file after import ({Path.GetFileName(path)})");
LogForModel(import, $@"Could not delete original file after import ({Path.GetFileName(path)})", e);
}
return import;
@ -243,23 +242,32 @@ namespace osu.Game.Database
/// Import an item from an <see cref="ArchiveReader"/>.
/// </summary>
/// <param name="archive">The archive to be imported.</param>
public TModel Import(ArchiveReader archive)
/// <param name="cancellationToken">An optional cancellation token.</param>
public Task<TModel> Import(ArchiveReader archive, CancellationToken cancellationToken = default)
{
cancellationToken.ThrowIfCancellationRequested();
TModel model = null;
try
{
var model = CreateModel(archive);
model = CreateModel(archive);
if (model == null) return null;
model.Hash = computeHash(archive);
return Import(model, archive);
}
catch (TaskCanceledException)
{
throw;
}
catch (Exception e)
{
Logger.Error(e, $"Model creation of {archive.Name} failed.", LoggingTarget.Database);
LogForModel(model, $"Model creation of {archive.Name} failed.", e);
return null;
}
return Import(model, archive, cancellationToken);
}
/// <summary>
@ -269,6 +277,16 @@ namespace osu.Game.Database
/// </summary>
protected abstract string[] HashableFileTypes { get; }
protected static void LogForModel(TModel model, string message, Exception e = null)
{
string prefix = $"[{(model?.Hash ?? "?????").Substring(0, 5)}]";
if (e != null)
Logger.Error(e, $"{prefix} {message}", LoggingTarget.Database);
else
Logger.Log($"{prefix} {message}", LoggingTarget.Database);
}
/// <summary>
/// Create a SHA-2 hash from the provided archive based on file content of all files matching <see cref="HashableFileTypes"/>.
/// </summary>
@ -288,13 +306,30 @@ namespace osu.Game.Database
/// </summary>
/// <param name="item">The model to be imported.</param>
/// <param name="archive">An optional archive to use for model population.</param>
public TModel Import(TModel item, ArchiveReader archive = null)
/// <param name="cancellationToken">An optional cancellation token.</param>
public async Task<TModel> Import(TModel item, ArchiveReader archive = null, CancellationToken cancellationToken = default) => await Task.Factory.StartNew(async () =>
{
cancellationToken.ThrowIfCancellationRequested();
delayEvents();
void rollback()
{
if (!Delete(item))
{
// We may have not yet added the model to the underlying table, but should still clean up files.
LogForModel(item, "Dereferencing files for incomplete import.");
Files.Dereference(item.Files.Select(f => f.FileInfo).ToArray());
}
}
try
{
Logger.Log($"Importing {item}...", LoggingTarget.Database);
LogForModel(item, "Beginning import...");
item.Files = archive != null ? createFileInfos(archive, Files) : new List<TFileModel>();
await Populate(item, archive, cancellationToken);
using (var write = ContextFactory.GetForWrite()) // used to share a context for full import. keep in mind this will block all writes.
{
@ -302,11 +337,6 @@ namespace osu.Game.Database
{
if (!write.IsTransactionLeader) throw new InvalidOperationException($"Ensure there is no parent transaction so errors can correctly be handled by {this}");
if (archive != null)
item.Files = createFileInfos(archive, Files);
Populate(item, archive);
var existing = CheckForExisting(item);
if (existing != null)
@ -314,15 +344,17 @@ namespace osu.Game.Database
if (CanUndelete(existing, item))
{
Undelete(existing);
Logger.Log($"Found existing {typeof(TModel)} for {item} (ID {existing.ID}). Skipping import.", LoggingTarget.Database);
LogForModel(item, $"Found existing {humanisedModelName} for {item} (ID {existing.ID}) skipping import.");
handleEvent(() => ItemAdded?.Invoke(existing, true));
// existing item will be used; rollback new import and exit early.
rollback();
flushEvents(true);
return existing;
}
else
{
Delete(existing);
ModelStore.PurgeDeletable(s => s.ID == existing.ID);
}
Delete(existing);
ModelStore.PurgeDeletable(s => s.ID == existing.ID);
}
PreImport(item);
@ -337,21 +369,21 @@ namespace osu.Game.Database
}
}
Logger.Log($"Import of {item} successfully completed!", LoggingTarget.Database);
LogForModel(item, "Import successfully completed!");
}
catch (Exception e)
{
Logger.Error(e, $"Import of {item} failed and has been rolled back.", LoggingTarget.Database);
item = null;
}
finally
{
// we only want to flush events after we've confirmed the write context didn't have any errors.
flushEvents(item != null);
if (!(e is TaskCanceledException))
LogForModel(item, "Database import or population failed and has been rolled back.", e);
rollback();
flushEvents(false);
throw;
}
flushEvents(true);
return item;
}
}, cancellationToken, TaskCreationOptions.HideScheduler, IMPORT_SCHEDULER).Unwrap();
/// <summary>
/// Perform an update of the specified item.
@ -533,7 +565,7 @@ namespace osu.Game.Database
return Task.CompletedTask;
}
return Task.Factory.StartNew(() => Import(stable.GetDirectories(ImportFromStablePath).Select(f => stable.GetFullPath(f)).ToArray()), TaskCreationOptions.LongRunning);
return Task.Run(async () => await Import(stable.GetDirectories(ImportFromStablePath).Select(f => stable.GetFullPath(f)).ToArray()));
}
#endregion
@ -552,9 +584,8 @@ namespace osu.Game.Database
/// </summary>
/// <param name="model">The model to populate.</param>
/// <param name="archive">The archive to use as a reference for population. May be null.</param>
protected virtual void Populate(TModel model, [CanBeNull] ArchiveReader archive)
{
}
/// <param name="cancellationToken">An optional cancellation token.</param>
protected virtual Task Populate(TModel model, [CanBeNull] ArchiveReader archive, CancellationToken cancellationToken = default) => Task.CompletedTask;
/// <summary>
/// Perform any final actions before the import to database executes.
@ -582,6 +613,8 @@ namespace osu.Game.Database
private DbSet<TModel> queryModel() => ContextFactory.Get().Set<TModel>();
private string humanisedModelName => $"{typeof(TModel).Name.Replace("Info", "").ToLower()}";
/// <summary>
/// Creates an <see cref="ArchiveReader"/> from a valid storage path.
/// </summary>
@ -599,4 +632,18 @@ namespace osu.Game.Database
throw new InvalidFormatException($"{path} is not a valid archive");
}
}
public abstract class ArchiveModelManager
{
private const int import_queue_request_concurrency = 1;
/// <summary>
/// A singleton scheduler shared by all <see cref="ArchiveModelManager{TModel,TFileModel}"/>.
/// </summary>
/// <remarks>
/// This scheduler generally performs IO and CPU intensive work so concurrency is limited harshly.
/// It is mainly being used as a queue mechanism for large imports.
/// </remarks>
protected static readonly ThreadedTaskScheduler IMPORT_SCHEDULER = new ThreadedTaskScheduler(import_queue_request_concurrency, nameof(ArchiveModelManager));
}
}