Merge pull request #16547 from peppy/realm-stable-subscriptions

Fix realm subscriptions getting lost after a context recycle
This commit is contained in:
Dan Balasescu
2022-01-24 20:20:04 +09:00
committed by GitHub
18 changed files with 515 additions and 112 deletions

View File

@ -0,0 +1,46 @@
// Copyright (c) ppy Pty Ltd <contact@ppy.sh>. Licensed under the MIT Licence.
// See the LICENCE file in the repository root for full licence text.
using System;
using System.Collections;
using System.Collections.Generic;
using System.Collections.Specialized;
using System.ComponentModel;
using Realms;
using Realms.Schema;
#nullable enable
namespace osu.Game.Database
{
public class EmptyRealmSet<T> : IRealmCollection<T>
{
private IList<T> emptySet => Array.Empty<T>();
public IEnumerator<T> GetEnumerator() => emptySet.GetEnumerator();
IEnumerator IEnumerable.GetEnumerator() => emptySet.GetEnumerator();
public int Count => emptySet.Count;
public T this[int index] => emptySet[index];
public int IndexOf(object item) => emptySet.IndexOf((T)item);
public bool Contains(object item) => emptySet.Contains((T)item);
public event NotifyCollectionChangedEventHandler? CollectionChanged
{
add => throw new NotImplementedException();
remove => throw new NotImplementedException();
}
public event PropertyChangedEventHandler? PropertyChanged
{
add => throw new NotImplementedException();
remove => throw new NotImplementedException();
}
public IRealmCollection<T> Freeze() => throw new NotImplementedException();
public IDisposable SubscribeForNotifications(NotificationCallbackDelegate<T> callback) => throw new NotImplementedException();
public bool IsValid => throw new NotImplementedException();
public Realm Realm => throw new NotImplementedException();
public ObjectSchema ObjectSchema => throw new NotImplementedException();
public bool IsFrozen => throw new NotImplementedException();
}
}

View File

@ -2,6 +2,8 @@
// See the LICENCE file in the repository root for full licence text.
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.IO;
using System.Linq;
using System.Reflection;
@ -61,33 +63,60 @@ namespace osu.Game.Database
private readonly ThreadLocal<bool> currentThreadCanCreateContexts = new ThreadLocal<bool>();
/// <summary>
/// Holds a map of functions registered via <see cref="RegisterCustomSubscription"/> and <see cref="RegisterForNotifications{T}"/> and a coinciding action which when triggered,
/// will unregister the subscription from realm.
///
/// Put another way, the key is an action which registers the subscription with realm. The returned <see cref="IDisposable"/> from the action is stored as the value and only
/// used internally.
///
/// Entries in this dictionary are only removed when a consumer signals that the subscription should be permanently ceased (via their own <see cref="IDisposable"/>).
/// </summary>
private readonly Dictionary<Func<Realm, IDisposable?>, IDisposable?> customSubscriptionsResetMap = new Dictionary<Func<Realm, IDisposable?>, IDisposable?>();
/// <summary>
/// Holds a map of functions registered via <see cref="RegisterForNotifications{T}"/> and a coinciding action which when triggered,
/// fires a change set event with an empty collection. This is used to inform subscribers when a realm context goes away, and ensure they don't use invalidated
/// managed realm objects from a previous firing.
/// </summary>
private readonly Dictionary<Func<Realm, IDisposable?>, Action> notificationsResetMap = new Dictionary<Func<Realm, IDisposable?>, Action>();
private static readonly GlobalStatistic<int> contexts_created = GlobalStatistics.Get<int>(@"Realm", @"Contexts (Created)");
private readonly object contextLock = new object();
private Realm? context;
public Realm Context
public Realm Context => ensureUpdateContext();
private Realm ensureUpdateContext()
{
get
if (!ThreadSafety.IsUpdateThread)
throw new InvalidOperationException(@$"Use {nameof(createContext)} when performing realm operations from a non-update thread");
lock (contextLock)
{
if (!ThreadSafety.IsUpdateThread)
throw new InvalidOperationException(@$"Use {nameof(Run)}/{nameof(Write)} when performing realm operations from a non-update thread");
lock (contextLock)
if (context == null)
{
if (context == null)
{
context = createContext();
Logger.Log(@$"Opened realm ""{context.Config.DatabasePath}"" at version {context.Config.SchemaVersion}");
}
context = createContext();
Logger.Log(@$"Opened realm ""{context.Config.DatabasePath}"" at version {context.Config.SchemaVersion}");
// creating a context will ensure our schema is up-to-date and migrated.
return context;
// Resubscribe any subscriptions
foreach (var action in customSubscriptionsResetMap.Keys)
registerSubscription(action);
}
Debug.Assert(context != null);
// creating a context will ensure our schema is up-to-date and migrated.
return context;
}
}
internal static bool CurrentThreadSubscriptionsAllowed => current_thread_subscriptions_allowed.Value;
private static readonly ThreadLocal<bool> current_thread_subscriptions_allowed = new ThreadLocal<bool>();
/// <summary>
/// Construct a new instance of a realm context factory.
/// </summary>
@ -222,6 +251,117 @@ namespace osu.Game.Database
}
}
/// <summary>
/// Subscribe to a realm collection and begin watching for asynchronous changes.
/// </summary>
/// <remarks>
/// This adds osu! specific thread and managed state safety checks on top of <see cref="IRealmCollection{T}.SubscribeForNotifications"/>.
///
/// In addition to the documented realm behaviour, we have the additional requirement of handling subscriptions over potential context loss.
/// When this happens, callback events will be automatically fired:
/// - On context loss, a callback with an empty collection and <c>null</c> <see cref="ChangeSet"/> will be invoked.
/// - On context revival, a standard initial realm callback will arrive, with <c>null</c> <see cref="ChangeSet"/> and an up-to-date collection.
/// </remarks>
/// <param name="query">The <see cref="IQueryable{T}"/> to observe for changes.</param>
/// <typeparam name="T">Type of the elements in the list.</typeparam>
/// <param name="callback">The callback to be invoked with the updated <see cref="IRealmCollection{T}"/>.</param>
/// <returns>
/// A subscription token. It must be kept alive for as long as you want to receive change notifications.
/// To stop receiving notifications, call <see cref="IDisposable.Dispose"/>.
/// </returns>
/// <seealso cref="IRealmCollection{T}.SubscribeForNotifications"/>
public IDisposable RegisterForNotifications<T>(Func<Realm, IQueryable<T>> query, NotificationCallbackDelegate<T> callback)
where T : RealmObjectBase
{
if (!ThreadSafety.IsUpdateThread)
throw new InvalidOperationException(@$"{nameof(RegisterForNotifications)} must be called from the update thread.");
lock (contextLock)
{
Func<Realm, IDisposable?> action = realm => query(realm).QueryAsyncWithNotifications(callback);
// Store an action which is used when blocking to ensure consumers don't use results of a stale changeset firing.
notificationsResetMap.Add(action, () => callback(new EmptyRealmSet<T>(), null, null));
return RegisterCustomSubscription(action);
}
}
/// <summary>
/// Run work on realm that will be run every time the update thread realm context gets recycled.
/// </summary>
/// <param name="action">The work to run. Return value should be an <see cref="IDisposable"/> from QueryAsyncWithNotifications, or an <see cref="InvokeOnDisposal"/> to clean up any bindings.</param>
/// <returns>An <see cref="IDisposable"/> which should be disposed to unsubscribe any inner subscription.</returns>
public IDisposable RegisterCustomSubscription(Func<Realm, IDisposable?> action)
{
if (!ThreadSafety.IsUpdateThread)
throw new InvalidOperationException(@$"{nameof(RegisterForNotifications)} must be called from the update thread.");
var syncContext = SynchronizationContext.Current;
registerSubscription(action);
// This token is returned to the consumer.
// When disposed, it will cause the registration to be permanently ceased (unsubscribed with realm and unregistered by this class).
return new InvokeOnDisposal(() =>
{
if (ThreadSafety.IsUpdateThread)
unsubscribe();
else
syncContext.Post(_ => unsubscribe(), null);
void unsubscribe()
{
lock (contextLock)
{
if (customSubscriptionsResetMap.TryGetValue(action, out var unsubscriptionAction))
{
unsubscriptionAction?.Dispose();
customSubscriptionsResetMap.Remove(action);
notificationsResetMap.Remove(action);
}
}
}
});
}
private void registerSubscription(Func<Realm, IDisposable?> action)
{
Debug.Assert(ThreadSafety.IsUpdateThread);
lock (contextLock)
{
// Retrieve context outside of flag update to ensure that the context is constructed,
// as attempting to access it inside the subscription if it's not constructed would lead to
// cyclic invocations of the subscription callback.
var realm = Context;
Debug.Assert(!customSubscriptionsResetMap.TryGetValue(action, out var found) || found == null);
current_thread_subscriptions_allowed.Value = true;
customSubscriptionsResetMap[action] = action(realm);
current_thread_subscriptions_allowed.Value = false;
}
}
/// <summary>
/// Unregister all subscriptions when the realm context is to be recycled.
/// Subscriptions will still remain and will be re-subscribed when the realm context returns.
/// </summary>
private void unregisterAllSubscriptions()
{
lock (contextLock)
{
foreach (var action in notificationsResetMap.Values)
action();
foreach (var action in customSubscriptionsResetMap)
{
action.Value?.Dispose();
customSubscriptionsResetMap[action.Key] = null;
}
}
}
private Realm createContext()
{
if (isDisposed)
@ -454,14 +594,29 @@ namespace osu.Game.Database
if (isDisposed)
throw new ObjectDisposedException(nameof(RealmContextFactory));
SynchronizationContext? syncContext = null;
try
{
contextCreationLock.Wait();
lock (contextLock)
{
if (!ThreadSafety.IsUpdateThread && context != null)
throw new InvalidOperationException(@$"{nameof(BlockAllOperations)} must be called from the update thread.");
if (context == null)
{
// null context means the update thread has not yet retrieved its context.
// we don't need to worry about reviving the update context in this case, so don't bother with the SynchronizationContext.
Debug.Assert(!ThreadSafety.IsUpdateThread);
}
else
{
if (!ThreadSafety.IsUpdateThread)
throw new InvalidOperationException(@$"{nameof(BlockAllOperations)} must be called from the update thread.");
syncContext = SynchronizationContext.Current;
}
unregisterAllSubscriptions();
Logger.Log(@"Blocking realm operations.", LoggingTarget.Database);
@ -501,6 +656,9 @@ namespace osu.Game.Database
{
factory.contextCreationLock.Release();
Logger.Log(@"Restoring realm operations.", LoggingTarget.Database);
// Post back to the update thread to revive any subscriptions.
syncContext?.Post(_ => ensureUpdateContext(), null);
});
}

View File

@ -7,7 +7,6 @@ using System.Linq;
using System.Runtime.Serialization;
using AutoMapper;
using AutoMapper.Internal;
using osu.Framework.Development;
using osu.Game.Beatmaps;
using osu.Game.Input.Bindings;
using osu.Game.Models;
@ -272,9 +271,8 @@ namespace osu.Game.Database
public static IDisposable? QueryAsyncWithNotifications<T>(this IRealmCollection<T> collection, NotificationCallbackDelegate<T> callback)
where T : RealmObjectBase
{
// Subscriptions can only work on the main thread.
if (!ThreadSafety.IsUpdateThread)
throw new InvalidOperationException("Cannot subscribe for realm notifications from a non-update thread.");
if (!RealmContextFactory.CurrentThreadSubscriptionsAllowed)
throw new InvalidOperationException($"Make sure to call {nameof(RealmContextFactory)}.{nameof(RealmContextFactory.RegisterForNotifications)}");
return collection.SubscribeForNotifications(callback);
}