Move ArchiveModelManager import process to async flow

This commit is contained in:
smoogipoo
2019-05-28 18:59:21 +09:00
committed by Dean Herbert
parent 3182f88ea8
commit f090e292c9
14 changed files with 273 additions and 165 deletions

View File

@ -5,14 +5,17 @@ using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using JetBrains.Annotations;
using Microsoft.EntityFrameworkCore;
using osu.Framework;
using osu.Framework.Extensions;
using osu.Framework.Extensions.TypeExtensions;
using osu.Framework.IO.File;
using osu.Framework.Logging;
using osu.Framework.Platform;
using osu.Framework.Threading;
using osu.Game.IO;
using osu.Game.IO.Archives;
using osu.Game.IPC;
@ -109,8 +112,11 @@ namespace osu.Game.Database
a.Invoke();
}
private readonly ThreadedTaskScheduler importScheduler;
protected ArchiveModelManager(Storage storage, IDatabaseContextFactory contextFactory, MutableDatabaseBackedStoreWithFileIncludes<TModel, TFileModel> modelStore, IIpcHost importHost = null)
{
importScheduler = new ThreadedTaskScheduler(16, $"{GetType().ReadableName()}.Import");
ContextFactory = contextFactory;
ModelStore = modelStore;
@ -130,92 +136,84 @@ namespace osu.Game.Database
/// This will post notifications tracking progress.
/// </summary>
/// <param name="paths">One or more archive locations on disk.</param>
public void Import(params string[] paths)
public async Task Import(params string[] paths)
{
var notification = new ProgressNotification { State = ProgressNotificationState.Active };
PostNotification?.Invoke(notification);
Import(notification, paths);
await Import(notification, paths);
}
protected void Import(ProgressNotification notification, params string[] paths)
protected async Task Import(ProgressNotification notification, params string[] paths)
{
notification.Progress = 0;
notification.Text = "Import is initialising...";
var term = $"{typeof(TModel).Name.Replace("Info", "").ToLower()}";
List<TModel> imported = new List<TModel>();
var tasks = new List<Task>();
int current = 0;
foreach (string path in paths)
{
if (notification.State == ProgressNotificationState.Cancelled)
// user requested abort
return;
try
tasks.Add(Import(path, notification.CancellationToken).ContinueWith(t =>
{
var text = "Importing ";
if (path.Length > 1)
text += $"{++current} of {paths.Length} {term}s..";
else
text += $"{term}..";
// only show the filename if it isn't a temporary one (as those look ugly).
if (!path.Contains(Path.GetTempPath()))
text += $"\n{Path.GetFileName(path)}";
notification.Text = text;
imported.Add(Import(path));
notification.Progress = (float)current / paths.Length;
}
catch (Exception e)
{
e = e.InnerException ?? e;
Logger.Error(e, $@"Could not import ({Path.GetFileName(path)})");
}
}
if (imported.Count == 0)
{
notification.Text = "Import failed!";
notification.State = ProgressNotificationState.Cancelled;
}
else
{
notification.CompletionText = imported.Count == 1
? $"Imported {imported.First()}!"
: $"Imported {current} {term}s!";
if (imported.Count > 0 && PresentImport != null)
{
notification.CompletionText += " Click to view.";
notification.CompletionClickAction = () =>
lock (notification)
{
PresentImport?.Invoke(imported);
return true;
};
}
current++;
notification.State = ProgressNotificationState.Completed;
notification.Text = $"Imported {current} of {paths.Length} {term}s";
notification.Progress = (float)current / paths.Length;
}
if (t.Exception != null)
{
var e = t.Exception.InnerException ?? t.Exception;
Logger.Error(e, $@"Could not import ({Path.GetFileName(path)})");
}
}));
}
await Task.WhenAll(tasks);
// if (imported.Count == 0)
// {
// notification.Text = "Import failed!";
// notification.State = ProgressNotificationState.Cancelled;
// }
// else
// {
// notification.CompletionText = imported.Count == 1
// ? $"Imported {imported.First()}!"
// : $"Imported {current} {term}s!";
//
// if (imported.Count > 0 && PresentImport != null)
// {
// notification.CompletionText += " Click to view.";
// notification.CompletionClickAction = () =>
// {
// PresentImport?.Invoke(imported);
// return true;
// };
// }
//
// notification.State = ProgressNotificationState.Completed;
// }
}
/// <summary>
/// Import one <see cref="TModel"/> from the filesystem and delete the file on success.
/// </summary>
/// <param name="path">The archive location on disk.</param>
/// <param name="cancellationToken">An optional cancellation token.</param>
/// <returns>The imported model, if successful.</returns>
public TModel Import(string path)
public async Task<TModel> Import(string path, CancellationToken cancellationToken = default)
{
TModel import;
using (ArchiveReader reader = getReaderFrom(path))
import = Import(reader);
import = await Import(reader, cancellationToken);
// We may or may not want to delete the file depending on where it is stored.
// e.g. reconstructing/repairing database with items from default storage.
@ -243,7 +241,8 @@ namespace osu.Game.Database
/// Import an item from an <see cref="ArchiveReader"/>.
/// </summary>
/// <param name="archive">The archive to be imported.</param>
public TModel Import(ArchiveReader archive)
/// <param name="cancellationToken">An optional cancellation token.</param>
public async Task<TModel> Import(ArchiveReader archive, CancellationToken cancellationToken = default)
{
try
{
@ -253,7 +252,7 @@ namespace osu.Game.Database
model.Hash = computeHash(archive);
return Import(model, archive);
return await Import(model, archive, cancellationToken);
}
catch (Exception e)
{
@ -288,7 +287,8 @@ namespace osu.Game.Database
/// </summary>
/// <param name="item">The model to be imported.</param>
/// <param name="archive">An optional archive to use for model population.</param>
public TModel Import(TModel item, ArchiveReader archive = null)
/// <param name="cancellationToken">An optional cancellation token.</param>
public async Task<TModel> Import(TModel item, ArchiveReader archive = null, CancellationToken cancellationToken = default) => await Task.Factory.StartNew(async () =>
{
delayEvents();
@ -296,17 +296,31 @@ namespace osu.Game.Database
{
Logger.Log($"Importing {item}...", LoggingTarget.Database);
if (archive != null)
item.Files = createFileInfos(archive, Files);
var localItem = item;
try
{
await Populate(item, archive, cancellationToken);
}
catch (TaskCanceledException)
{
return item = null;
}
finally
{
if (!Delete(localItem))
Files.Dereference(localItem.Files.Select(f => f.FileInfo).ToArray());
}
using (var write = ContextFactory.GetForWrite()) // used to share a context for full import. keep in mind this will block all writes.
{
try
{
if (!write.IsTransactionLeader) throw new InvalidOperationException($"Ensure there is no parent transaction so errors can correctly be handled by {this}");
if (archive != null)
item.Files = createFileInfos(archive, Files);
Populate(item, archive);
var existing = CheckForExisting(item);
if (existing != null)
@ -332,6 +346,9 @@ namespace osu.Game.Database
}
catch (Exception e)
{
if (!Delete(item))
Files.Dereference(item.Files.Select(f => f.FileInfo).ToArray());
write.Errors.Add(e);
throw;
}
@ -351,7 +368,7 @@ namespace osu.Game.Database
}
return item;
}
}, CancellationToken.None, TaskCreationOptions.None, importScheduler).Unwrap();
/// <summary>
/// Perform an update of the specified item.
@ -516,24 +533,24 @@ namespace osu.Game.Database
/// <summary>
/// This is a temporary method and will likely be replaced by a full-fledged (and more correctly placed) migration process in the future.
/// </summary>
public Task ImportFromStableAsync()
public async Task ImportFromStableAsync()
{
var stable = GetStableStorage?.Invoke();
if (stable == null)
{
Logger.Log("No osu!stable installation available!", LoggingTarget.Information, LogLevel.Error);
return Task.CompletedTask;
return;
}
if (!stable.ExistsDirectory(ImportFromStablePath))
{
// This handles situations like when the user does not have a Skins folder
Logger.Log($"No {ImportFromStablePath} folder available in osu!stable installation", LoggingTarget.Information, LogLevel.Error);
return Task.CompletedTask;
return;
}
return Task.Factory.StartNew(() => Import(stable.GetDirectories(ImportFromStablePath).Select(f => stable.GetFullPath(f)).ToArray()), TaskCreationOptions.LongRunning);
await Task.Run(async () => await Import(stable.GetDirectories(ImportFromStablePath).Select(f => stable.GetFullPath(f)).ToArray()));
}
#endregion
@ -552,9 +569,8 @@ namespace osu.Game.Database
/// </summary>
/// <param name="model">The model to populate.</param>
/// <param name="archive">The archive to use as a reference for population. May be null.</param>
protected virtual void Populate(TModel model, [CanBeNull] ArchiveReader archive)
{
}
/// <param name="cancellationToken">An optional cancellation token.</param>
protected virtual async Task Populate(TModel model, [CanBeNull] ArchiveReader archive, CancellationToken cancellationToken = default) => await Task.CompletedTask;
/// <summary>
/// Perform any final actions before the import to database executes.