From 4127aaa9882c3b2007849e982b9fcc1d0b551ec4 Mon Sep 17 00:00:00 2001 From: Dan Balasescu Date: Thu, 27 Oct 2022 14:34:24 +0900 Subject: [PATCH] Extract general elements from HubClientConnector into SocketClientConnector --- osu.Game/Online/HubClient.cs | 28 ++++ osu.Game/Online/HubClientConnector.cs | 161 +------------------- osu.Game/Online/SocketClient.cs | 24 +++ osu.Game/Online/SocketClientConnector.cs | 183 +++++++++++++++++++++++ 4 files changed, 242 insertions(+), 154 deletions(-) create mode 100644 osu.Game/Online/HubClient.cs create mode 100644 osu.Game/Online/SocketClient.cs create mode 100644 osu.Game/Online/SocketClientConnector.cs diff --git a/osu.Game/Online/HubClient.cs b/osu.Game/Online/HubClient.cs new file mode 100644 index 0000000000..262e298f34 --- /dev/null +++ b/osu.Game/Online/HubClient.cs @@ -0,0 +1,28 @@ +// Copyright (c) ppy Pty Ltd . Licensed under the MIT Licence. +// See the LICENCE file in the repository root for full licence text. + +using System.Threading; +using System.Threading.Tasks; +using Microsoft.AspNetCore.SignalR.Client; + +namespace osu.Game.Online +{ + public class HubClient : SocketClient + { + public readonly HubConnection Connection; + + public HubClient(HubConnection connection) + { + Connection = connection; + Connection.Closed += InvokeClosed; + } + + public override Task StartAsync(CancellationToken cancellationToken) => Connection.StartAsync(cancellationToken); + + public override async ValueTask DisposeAsync() + { + await base.DisposeAsync().ConfigureAwait(false); + await Connection.DisposeAsync().ConfigureAwait(false); + } + } +} diff --git a/osu.Game/Online/HubClientConnector.cs b/osu.Game/Online/HubClientConnector.cs index 6bfe09e911..33e9f92817 100644 --- a/osu.Game/Online/HubClientConnector.cs +++ b/osu.Game/Online/HubClientConnector.cs @@ -10,13 +10,11 @@ using Microsoft.AspNetCore.SignalR.Client; using Microsoft.Extensions.DependencyInjection; using Newtonsoft.Json; using osu.Framework; -using osu.Framework.Bindables; -using osu.Framework.Logging; using osu.Game.Online.API; namespace osu.Game.Online { - public class HubClientConnector : IHubClientConnector + public class HubClientConnector : SocketClientConnector, IHubClientConnector { public const string SERVER_SHUTDOWN_MESSAGE = "Server is shutting down."; @@ -25,7 +23,6 @@ namespace osu.Game.Online /// public Action? ConfigureConnection { get; set; } - private readonly string clientName; private readonly string endpoint; private readonly string versionHash; private readonly bool preferMessagePack; @@ -34,18 +31,7 @@ namespace osu.Game.Online /// /// The current connection opened by this connector. /// - public HubConnection? CurrentConnection { get; private set; } - - /// - /// Whether this is connected to the hub, use to access the connection, if this is true. - /// - public IBindable IsConnected => isConnected; - - private readonly Bindable isConnected = new Bindable(); - private readonly SemaphoreSlim connectionLock = new SemaphoreSlim(1); - private CancellationTokenSource connectCancelSource = new CancellationTokenSource(); - - private readonly IBindable apiState = new Bindable(); + public new HubConnection? CurrentConnection => ((HubClient?)base.CurrentConnection)?.Connection; /// /// Constructs a new . @@ -56,99 +42,16 @@ namespace osu.Game.Online /// The hash representing the current game version, used for verification purposes. /// Whether to use MessagePack for serialisation if available on this platform. public HubClientConnector(string clientName, string endpoint, IAPIProvider api, string versionHash, bool preferMessagePack = true) + : base(api) { - this.clientName = clientName; + ClientName = clientName; this.endpoint = endpoint; this.api = api; this.versionHash = versionHash; this.preferMessagePack = preferMessagePack; - - apiState.BindTo(api.State); - apiState.BindValueChanged(_ => Task.Run(connectIfPossible), true); } - public Task Reconnect() - { - Logger.Log($"{clientName} reconnecting...", LoggingTarget.Network); - return Task.Run(connectIfPossible); - } - - private async Task connectIfPossible() - { - switch (apiState.Value) - { - case APIState.Failing: - case APIState.Offline: - await disconnect(true); - break; - - case APIState.Online: - await connect(); - break; - } - } - - private async Task connect() - { - cancelExistingConnect(); - - if (!await connectionLock.WaitAsync(10000).ConfigureAwait(false)) - throw new TimeoutException("Could not obtain a lock to connect. A previous attempt is likely stuck."); - - try - { - while (apiState.Value == APIState.Online) - { - // ensure any previous connection was disposed. - // this will also create a new cancellation token source. - await disconnect(false).ConfigureAwait(false); - - // this token will be valid for the scope of this connection. - // if cancelled, we can be sure that a disconnect or reconnect is handled elsewhere. - var cancellationToken = connectCancelSource.Token; - - cancellationToken.ThrowIfCancellationRequested(); - - Logger.Log($"{clientName} connecting...", LoggingTarget.Network); - - try - { - // importantly, rebuild the connection each attempt to get an updated access token. - CurrentConnection = buildConnection(cancellationToken); - - await CurrentConnection.StartAsync(cancellationToken).ConfigureAwait(false); - - Logger.Log($"{clientName} connected!", LoggingTarget.Network); - isConnected.Value = true; - return; - } - catch (OperationCanceledException) - { - //connection process was cancelled. - throw; - } - catch (Exception e) - { - await handleErrorAndDelay(e, cancellationToken).ConfigureAwait(false); - } - } - } - finally - { - connectionLock.Release(); - } - } - - /// - /// Handles an exception and delays an async flow. - /// - private async Task handleErrorAndDelay(Exception exception, CancellationToken cancellationToken) - { - Logger.Log($"{clientName} connect attempt failed: {exception.Message}", LoggingTarget.Network); - await Task.Delay(5000, cancellationToken).ConfigureAwait(false); - } - - private HubConnection buildConnection(CancellationToken cancellationToken) + protected override Task BuildConnectionAsync(CancellationToken cancellationToken) { var builder = new HubConnectionBuilder() .WithUrl(endpoint, options => @@ -188,59 +91,9 @@ namespace osu.Game.Online ConfigureConnection?.Invoke(newConnection); - newConnection.Closed += ex => onConnectionClosed(ex, cancellationToken); - return newConnection; + return Task.FromResult((SocketClient)new HubClient(newConnection)); } - private async Task onConnectionClosed(Exception? ex, CancellationToken cancellationToken) - { - isConnected.Value = false; - - if (ex != null) - await handleErrorAndDelay(ex, cancellationToken).ConfigureAwait(false); - else - Logger.Log($"{clientName} disconnected", LoggingTarget.Network); - - // make sure a disconnect wasn't triggered (and this is still the active connection). - if (!cancellationToken.IsCancellationRequested) - await Task.Run(connect, default).ConfigureAwait(false); - } - - private async Task disconnect(bool takeLock) - { - cancelExistingConnect(); - - if (takeLock) - { - if (!await connectionLock.WaitAsync(10000).ConfigureAwait(false)) - throw new TimeoutException("Could not obtain a lock to disconnect. A previous attempt is likely stuck."); - } - - try - { - if (CurrentConnection != null) - await CurrentConnection.DisposeAsync().ConfigureAwait(false); - } - finally - { - CurrentConnection = null; - if (takeLock) - connectionLock.Release(); - } - } - - private void cancelExistingConnect() - { - connectCancelSource.Cancel(); - connectCancelSource = new CancellationTokenSource(); - } - - public override string ToString() => $"Connector for {clientName} ({(IsConnected.Value ? "connected" : "not connected")}"; - - public void Dispose() - { - apiState.UnbindAll(); - cancelExistingConnect(); - } + protected override string ClientName { get; } } } diff --git a/osu.Game/Online/SocketClient.cs b/osu.Game/Online/SocketClient.cs new file mode 100644 index 0000000000..3b4aa1b49b --- /dev/null +++ b/osu.Game/Online/SocketClient.cs @@ -0,0 +1,24 @@ +// Copyright (c) ppy Pty Ltd . Licensed under the MIT Licence. +// See the LICENCE file in the repository root for full licence text. + +using System; +using System.Threading; +using System.Threading.Tasks; + +namespace osu.Game.Online +{ + public abstract class SocketClient : IAsyncDisposable + { + public event Func? Closed; + + protected Task InvokeClosed(Exception? exception) => Closed?.Invoke(exception) ?? Task.CompletedTask; + + public abstract Task StartAsync(CancellationToken cancellationToken); + + public virtual ValueTask DisposeAsync() + { + Closed = null; + return new ValueTask(Task.CompletedTask); + } + } +} diff --git a/osu.Game/Online/SocketClientConnector.cs b/osu.Game/Online/SocketClientConnector.cs new file mode 100644 index 0000000000..823e724ef9 --- /dev/null +++ b/osu.Game/Online/SocketClientConnector.cs @@ -0,0 +1,183 @@ +// Copyright (c) ppy Pty Ltd . Licensed under the MIT Licence. +// See the LICENCE file in the repository root for full licence text. + +using System; +using System.Threading; +using System.Threading.Tasks; +using osu.Framework.Bindables; +using osu.Framework.Extensions.TypeExtensions; +using osu.Framework.Graphics; +using osu.Framework.Logging; +using osu.Game.Online.API; + +namespace osu.Game.Online +{ + public abstract class SocketClientConnector : Component + { + /// + /// Whether this is connected to the hub, use to access the connection, if this is true. + /// + public IBindable IsConnected => isConnected; + + /// + /// The current connection opened by this connector. + /// + public SocketClient? CurrentConnection { get; private set; } + + private readonly Bindable isConnected = new Bindable(); + private readonly SemaphoreSlim connectionLock = new SemaphoreSlim(1); + private CancellationTokenSource connectCancelSource = new CancellationTokenSource(); + + private readonly IBindable apiState = new Bindable(); + + /// + /// Constructs a new . + /// + /// An API provider used to react to connection state changes. + protected SocketClientConnector(IAPIProvider api) + { + apiState.BindTo(api.State); + apiState.BindValueChanged(_ => Task.Run(connectIfPossible), true); + } + + public Task Reconnect() + { + Logger.Log($"{ClientName} reconnecting...", LoggingTarget.Network); + return Task.Run(connectIfPossible); + } + + private async Task connectIfPossible() + { + switch (apiState.Value) + { + case APIState.Failing: + case APIState.Offline: + await disconnect(true); + break; + + case APIState.Online: + await connect(); + break; + } + } + + private async Task connect() + { + cancelExistingConnect(); + + if (!await connectionLock.WaitAsync(10000).ConfigureAwait(false)) + throw new TimeoutException("Could not obtain a lock to connect. A previous attempt is likely stuck."); + + try + { + while (apiState.Value == APIState.Online) + { + // ensure any previous connection was disposed. + // this will also create a new cancellation token source. + await disconnect(false).ConfigureAwait(false); + + // this token will be valid for the scope of this connection. + // if cancelled, we can be sure that a disconnect or reconnect is handled elsewhere. + var cancellationToken = connectCancelSource.Token; + + cancellationToken.ThrowIfCancellationRequested(); + + Logger.Log($"{ClientName} connecting...", LoggingTarget.Network); + + try + { + // importantly, rebuild the connection each attempt to get an updated access token. + CurrentConnection = await BuildConnectionAsync(cancellationToken).ConfigureAwait(false); + CurrentConnection.Closed += ex => onConnectionClosed(ex, cancellationToken); + + cancellationToken.ThrowIfCancellationRequested(); + + await CurrentConnection.StartAsync(cancellationToken).ConfigureAwait(false); + + Logger.Log($"{ClientName} connected!", LoggingTarget.Network); + isConnected.Value = true; + return; + } + catch (OperationCanceledException) + { + //connection process was cancelled. + throw; + } + catch (Exception e) + { + await handleErrorAndDelay(e, cancellationToken).ConfigureAwait(false); + } + } + } + finally + { + connectionLock.Release(); + } + } + + /// + /// Handles an exception and delays an async flow. + /// + private async Task handleErrorAndDelay(Exception exception, CancellationToken cancellationToken) + { + Logger.Log($"{ClientName} connect attempt failed: {exception.Message}", LoggingTarget.Network); + await Task.Delay(5000, cancellationToken).ConfigureAwait(false); + } + + protected abstract Task BuildConnectionAsync(CancellationToken cancellationToken); + + private async Task onConnectionClosed(Exception? ex, CancellationToken cancellationToken) + { + isConnected.Value = false; + + if (ex != null) + await handleErrorAndDelay(ex, cancellationToken).ConfigureAwait(false); + else + Logger.Log($"{ClientName} disconnected", LoggingTarget.Network); + + // make sure a disconnect wasn't triggered (and this is still the active connection). + if (!cancellationToken.IsCancellationRequested) + await Task.Run(connect, default).ConfigureAwait(false); + } + + private async Task disconnect(bool takeLock) + { + cancelExistingConnect(); + + if (takeLock) + { + if (!await connectionLock.WaitAsync(10000).ConfigureAwait(false)) + throw new TimeoutException("Could not obtain a lock to disconnect. A previous attempt is likely stuck."); + } + + try + { + if (CurrentConnection != null) + await CurrentConnection.DisposeAsync().ConfigureAwait(false); + } + finally + { + CurrentConnection = null; + if (takeLock) + connectionLock.Release(); + } + } + + private void cancelExistingConnect() + { + connectCancelSource.Cancel(); + connectCancelSource = new CancellationTokenSource(); + } + + protected virtual string ClientName => GetType().ReadableName(); + + public override string ToString() => $"{ClientName} ({(IsConnected.Value ? "connected" : "not connected")})"; + + protected override void Dispose(bool isDisposing) + { + base.Dispose(isDisposing); + apiState.UnbindAll(); + cancelExistingConnect(); + } + } +}