mirror of
https://github.com/osukey/osukey.git
synced 2025-08-03 14:46:38 +09:00
Merge branch 'master' into realm-integration/skins-rebase
This commit is contained in:
149
osu.Game/Database/BeatmapLookupCache.cs
Normal file
149
osu.Game/Database/BeatmapLookupCache.cs
Normal file
@ -0,0 +1,149 @@
|
||||
// Copyright (c) ppy Pty Ltd <contact@ppy.sh>. Licensed under the MIT Licence.
|
||||
// See the LICENCE file in the repository root for full licence text.
|
||||
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using System.Threading;
|
||||
using System.Threading.Tasks;
|
||||
using JetBrains.Annotations;
|
||||
using osu.Framework.Allocation;
|
||||
using osu.Game.Online.API;
|
||||
using osu.Game.Online.API.Requests;
|
||||
using osu.Game.Online.API.Requests.Responses;
|
||||
|
||||
namespace osu.Game.Database
|
||||
{
|
||||
// This class is based on `UserLookupCache` which is well tested.
|
||||
// If modifications are to be made here, a base abstract implementation should likely be created and shared between the two.
|
||||
public class BeatmapLookupCache : MemoryCachingComponent<int, APIBeatmap>
|
||||
{
|
||||
[Resolved]
|
||||
private IAPIProvider api { get; set; }
|
||||
|
||||
/// <summary>
|
||||
/// Perform an API lookup on the specified beatmap, populating a <see cref="APIBeatmap"/> model.
|
||||
/// </summary>
|
||||
/// <param name="beatmapId">The beatmap to lookup.</param>
|
||||
/// <param name="token">An optional cancellation token.</param>
|
||||
/// <returns>The populated beatmap, or null if the beatmap does not exist or the request could not be satisfied.</returns>
|
||||
[ItemCanBeNull]
|
||||
public Task<APIBeatmap> GetBeatmapAsync(int beatmapId, CancellationToken token = default) => GetAsync(beatmapId, token);
|
||||
|
||||
/// <summary>
|
||||
/// Perform an API lookup on the specified beatmaps, populating a <see cref="APIBeatmap"/> model.
|
||||
/// </summary>
|
||||
/// <param name="beatmapIds">The beatmaps to lookup.</param>
|
||||
/// <param name="token">An optional cancellation token.</param>
|
||||
/// <returns>The populated beatmaps. May include null results for failed retrievals.</returns>
|
||||
public Task<APIBeatmap[]> GetBeatmapsAsync(int[] beatmapIds, CancellationToken token = default)
|
||||
{
|
||||
var beatmapLookupTasks = new List<Task<APIBeatmap>>();
|
||||
|
||||
foreach (int u in beatmapIds)
|
||||
{
|
||||
beatmapLookupTasks.Add(GetBeatmapAsync(u, token).ContinueWith(task =>
|
||||
{
|
||||
if (!task.IsCompletedSuccessfully)
|
||||
return null;
|
||||
|
||||
return task.Result;
|
||||
}, token));
|
||||
}
|
||||
|
||||
return Task.WhenAll(beatmapLookupTasks);
|
||||
}
|
||||
|
||||
protected override async Task<APIBeatmap> ComputeValueAsync(int lookup, CancellationToken token = default)
|
||||
=> await queryBeatmap(lookup).ConfigureAwait(false);
|
||||
|
||||
private readonly Queue<(int id, TaskCompletionSource<APIBeatmap>)> pendingBeatmapTasks = new Queue<(int, TaskCompletionSource<APIBeatmap>)>();
|
||||
private Task pendingRequestTask;
|
||||
private readonly object taskAssignmentLock = new object();
|
||||
|
||||
private Task<APIBeatmap> queryBeatmap(int beatmapId)
|
||||
{
|
||||
lock (taskAssignmentLock)
|
||||
{
|
||||
var tcs = new TaskCompletionSource<APIBeatmap>();
|
||||
|
||||
// Add to the queue.
|
||||
pendingBeatmapTasks.Enqueue((beatmapId, tcs));
|
||||
|
||||
// Create a request task if there's not already one.
|
||||
if (pendingRequestTask == null)
|
||||
createNewTask();
|
||||
|
||||
return tcs.Task;
|
||||
}
|
||||
}
|
||||
|
||||
private void performLookup()
|
||||
{
|
||||
// contains at most 50 unique beatmap IDs from beatmapTasks, which is used to perform the lookup.
|
||||
var beatmapTasks = new Dictionary<int, List<TaskCompletionSource<APIBeatmap>>>();
|
||||
|
||||
// Grab at most 50 unique beatmap IDs from the queue.
|
||||
lock (taskAssignmentLock)
|
||||
{
|
||||
while (pendingBeatmapTasks.Count > 0 && beatmapTasks.Count < 50)
|
||||
{
|
||||
(int id, TaskCompletionSource<APIBeatmap> task) next = pendingBeatmapTasks.Dequeue();
|
||||
|
||||
// Perform a secondary check for existence, in case the beatmap was queried in a previous batch.
|
||||
if (CheckExists(next.id, out var existing))
|
||||
next.task.SetResult(existing);
|
||||
else
|
||||
{
|
||||
if (beatmapTasks.TryGetValue(next.id, out var tasks))
|
||||
tasks.Add(next.task);
|
||||
else
|
||||
beatmapTasks[next.id] = new List<TaskCompletionSource<APIBeatmap>> { next.task };
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (beatmapTasks.Count == 0)
|
||||
return;
|
||||
|
||||
// Query the beatmaps.
|
||||
var request = new GetBeatmapsRequest(beatmapTasks.Keys.ToArray());
|
||||
|
||||
// rather than queueing, we maintain our own single-threaded request stream.
|
||||
// todo: we probably want retry logic here.
|
||||
api.Perform(request);
|
||||
|
||||
// Create a new request task if there's still more beatmaps to query.
|
||||
lock (taskAssignmentLock)
|
||||
{
|
||||
pendingRequestTask = null;
|
||||
if (pendingBeatmapTasks.Count > 0)
|
||||
createNewTask();
|
||||
}
|
||||
|
||||
List<APIBeatmap> foundBeatmaps = request.Response?.Beatmaps;
|
||||
|
||||
if (foundBeatmaps != null)
|
||||
{
|
||||
foreach (var beatmap in foundBeatmaps)
|
||||
{
|
||||
if (beatmapTasks.TryGetValue(beatmap.OnlineID, out var tasks))
|
||||
{
|
||||
foreach (var task in tasks)
|
||||
task.SetResult(beatmap);
|
||||
|
||||
beatmapTasks.Remove(beatmap.OnlineID);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// if any tasks remain which were not satisfied, return null.
|
||||
foreach (var tasks in beatmapTasks.Values)
|
||||
{
|
||||
foreach (var task in tasks)
|
||||
task.SetResult(null);
|
||||
}
|
||||
}
|
||||
|
||||
private void createNewTask() => pendingRequestTask = Task.Run(performLookup);
|
||||
}
|
||||
}
|
@ -1,12 +1,16 @@
|
||||
// Copyright (c) ppy Pty Ltd <contact@ppy.sh>. Licensed under the MIT Licence.
|
||||
// See the LICENCE file in the repository root for full licence text.
|
||||
|
||||
using System;
|
||||
using System.Collections.Generic;
|
||||
using System.Linq;
|
||||
using AutoMapper;
|
||||
using osu.Framework.Development;
|
||||
using osu.Game.Input.Bindings;
|
||||
using Realms;
|
||||
|
||||
#nullable enable
|
||||
|
||||
namespace osu.Game.Database
|
||||
{
|
||||
public static class RealmObjectExtensions
|
||||
@ -60,5 +64,109 @@ namespace osu.Game.Database
|
||||
{
|
||||
return new RealmLive<T>(realmObject);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// Register a callback to be invoked each time this <see cref="T:Realms.IRealmCollection`1" /> changes.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// <para>
|
||||
/// This adds osu! specific thread and managed state safety checks on top of <see cref="IRealmCollection{T}.SubscribeForNotifications"/>.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// The first callback will be invoked with the initial <see cref="T:Realms.IRealmCollection`1" /> after the asynchronous query completes,
|
||||
/// and then called again after each write transaction which changes either any of the objects in the collection, or
|
||||
/// which objects are in the collection. The <c>changes</c> parameter will
|
||||
/// be <c>null</c> the first time the callback is invoked with the initial results. For each call after that,
|
||||
/// it will contain information about which rows in the results were added, removed or modified.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// If a write transaction did not modify any objects in this <see cref="T:Realms.IRealmCollection`1" />, the callback is not invoked at all.
|
||||
/// If an error occurs the callback will be invoked with <c>null</c> for the <c>sender</c> parameter and a non-<c>null</c> <c>error</c>.
|
||||
/// Currently the only errors that can occur are when opening the <see cref="T:Realms.Realm" /> on the background worker thread.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// At the time when the block is called, the <see cref="T:Realms.IRealmCollection`1" /> object will be fully evaluated
|
||||
/// and up-to-date, and as long as you do not perform a write transaction on the same thread
|
||||
/// or explicitly call <see cref="M:Realms.Realm.Refresh" />, accessing it will never perform blocking work.
|
||||
/// </para>
|
||||
/// <para>
|
||||
/// Notifications are delivered via the standard event loop, and so can't be delivered while the event loop is blocked by other activity.
|
||||
/// When notifications can't be delivered instantly, multiple notifications may be coalesced into a single notification.
|
||||
/// This can include the notification with the initial collection.
|
||||
/// </para>
|
||||
/// </remarks>
|
||||
/// <param name="collection">The <see cref="IRealmCollection{T}"/> to observe for changes.</param>
|
||||
/// <param name="callback">The callback to be invoked with the updated <see cref="T:Realms.IRealmCollection`1" />.</param>
|
||||
/// <returns>
|
||||
/// A subscription token. It must be kept alive for as long as you want to receive change notifications.
|
||||
/// To stop receiving notifications, call <see cref="M:System.IDisposable.Dispose" />.
|
||||
///
|
||||
/// May be null in the case the provided collection is not managed.
|
||||
/// </returns>
|
||||
/// <seealso cref="M:Realms.CollectionExtensions.SubscribeForNotifications``1(System.Collections.Generic.IList{``0},Realms.NotificationCallbackDelegate{``0})" />
|
||||
/// <seealso cref="M:Realms.CollectionExtensions.SubscribeForNotifications``1(System.Linq.IQueryable{``0},Realms.NotificationCallbackDelegate{``0})" />
|
||||
public static IDisposable? QueryAsyncWithNotifications<T>(this IRealmCollection<T> collection, NotificationCallbackDelegate<T> callback)
|
||||
where T : RealmObjectBase
|
||||
{
|
||||
// Subscriptions can only work on the main thread.
|
||||
if (!ThreadSafety.IsUpdateThread)
|
||||
throw new InvalidOperationException("Cannot subscribe for realm notifications from a non-update thread.");
|
||||
|
||||
return collection.SubscribeForNotifications(callback);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// A convenience method that casts <see cref="IQueryable{T}"/> to <see cref="IRealmCollection{T}"/> and subscribes for change notifications.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// This adds osu! specific thread and managed state safety checks on top of <see cref="IRealmCollection{T}.SubscribeForNotifications"/>.
|
||||
/// </remarks>
|
||||
/// <param name="list">The <see cref="IQueryable{T}"/> to observe for changes.</param>
|
||||
/// <typeparam name="T">Type of the elements in the list.</typeparam>
|
||||
/// <seealso cref="IRealmCollection{T}.SubscribeForNotifications"/>
|
||||
/// <param name="callback">The callback to be invoked with the updated <see cref="IRealmCollection{T}"/>.</param>
|
||||
/// <returns>
|
||||
/// A subscription token. It must be kept alive for as long as you want to receive change notifications.
|
||||
/// To stop receiving notifications, call <see cref="IDisposable.Dispose"/>.
|
||||
///
|
||||
/// May be null in the case the provided collection is not managed.
|
||||
/// </returns>
|
||||
public static IDisposable? QueryAsyncWithNotifications<T>(this IQueryable<T> list, NotificationCallbackDelegate<T> callback)
|
||||
where T : RealmObjectBase
|
||||
{
|
||||
// Subscribing to non-managed instances doesn't work.
|
||||
// In this usage, the instance may be non-managed in tests.
|
||||
if (!(list is IRealmCollection<T> realmCollection))
|
||||
return null;
|
||||
|
||||
return QueryAsyncWithNotifications(realmCollection, callback);
|
||||
}
|
||||
|
||||
/// <summary>
|
||||
/// A convenience method that casts <see cref="IList{T}"/> to <see cref="IRealmCollection{T}"/> and subscribes for change notifications.
|
||||
/// </summary>
|
||||
/// <remarks>
|
||||
/// This adds osu! specific thread and managed state safety checks on top of <see cref="IRealmCollection{T}.SubscribeForNotifications"/>.
|
||||
/// </remarks>
|
||||
/// <param name="list">The <see cref="IList{T}"/> to observe for changes.</param>
|
||||
/// <typeparam name="T">Type of the elements in the list.</typeparam>
|
||||
/// <seealso cref="IRealmCollection{T}.SubscribeForNotifications"/>
|
||||
/// <param name="callback">The callback to be invoked with the updated <see cref="IRealmCollection{T}"/>.</param>
|
||||
/// <returns>
|
||||
/// A subscription token. It must be kept alive for as long as you want to receive change notifications.
|
||||
/// To stop receiving notifications, call <see cref="IDisposable.Dispose"/>.
|
||||
///
|
||||
/// May be null in the case the provided collection is not managed.
|
||||
/// </returns>
|
||||
public static IDisposable? QueryAsyncWithNotifications<T>(this IList<T> list, NotificationCallbackDelegate<T> callback)
|
||||
where T : RealmObjectBase
|
||||
{
|
||||
// Subscribing to non-managed instances doesn't work.
|
||||
// In this usage, the instance may be non-managed in tests.
|
||||
if (!(list is IRealmCollection<T> realmCollection))
|
||||
return null;
|
||||
|
||||
return QueryAsyncWithNotifications(realmCollection, callback);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -100,6 +100,9 @@ namespace osu.Game.Database
|
||||
}
|
||||
}
|
||||
|
||||
if (userTasks.Count == 0)
|
||||
return;
|
||||
|
||||
// Query the users.
|
||||
var request = new GetUsersRequest(userTasks.Keys.ToArray());
|
||||
|
||||
|
Reference in New Issue
Block a user