using System; using System.Threading; namespace Cysharp.Threading.Tasks { public interface IReadOnlyAsyncReactiveProperty : IUniTaskAsyncEnumerable { T Value { get; } IUniTaskAsyncEnumerable WithoutCurrent(); UniTask WaitAsync(CancellationToken cancellationToken = default); } public interface IAsyncReactiveProperty : IReadOnlyAsyncReactiveProperty { new T Value { get; set; } } [Serializable] public class AsyncReactiveProperty : IAsyncReactiveProperty, IDisposable { TriggerEvent triggerEvent; #if UNITY_2018_3_OR_NEWER [UnityEngine.SerializeField] #endif T latestValue; public T Value { get { return latestValue; } set { this.latestValue = value; triggerEvent.SetResult(value); } } public AsyncReactiveProperty(T value) { this.latestValue = value; this.triggerEvent = default; } public IUniTaskAsyncEnumerable WithoutCurrent() { return new WithoutCurrentEnumerable(this); } public IUniTaskAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken) { return new Enumerator(this, cancellationToken, true); } public void Dispose() { triggerEvent.SetCompleted(); } public static implicit operator T(AsyncReactiveProperty value) { return value.Value; } public override string ToString() { if (isValueType) return latestValue.ToString(); return latestValue?.ToString(); } public UniTask WaitAsync(CancellationToken cancellationToken = default) { return new UniTask(WaitAsyncSource.Create(this, cancellationToken, out var token), token); } static bool isValueType; static AsyncReactiveProperty() { isValueType = typeof(T).IsValueType; } sealed class WaitAsyncSource : IUniTaskSource, ITriggerHandler, ITaskPoolNode { static Action cancellationCallback = CancellationCallback; static TaskPool pool; WaitAsyncSource nextNode; ref WaitAsyncSource ITaskPoolNode.NextNode => ref nextNode; static WaitAsyncSource() { TaskPool.RegisterSizeGetter(typeof(WaitAsyncSource), () => pool.Size); } AsyncReactiveProperty parent; CancellationToken cancellationToken; CancellationTokenRegistration cancellationTokenRegistration; UniTaskCompletionSourceCore core; WaitAsyncSource() { } public static IUniTaskSource Create(AsyncReactiveProperty parent, CancellationToken cancellationToken, out short token) { if (cancellationToken.IsCancellationRequested) { return AutoResetUniTaskCompletionSource.CreateFromCanceled(cancellationToken, out token); } if (!pool.TryPop(out var result)) { result = new WaitAsyncSource(); } result.parent = parent; result.cancellationToken = cancellationToken; if (cancellationToken.CanBeCanceled) { result.cancellationTokenRegistration = cancellationToken.RegisterWithoutCaptureExecutionContext(cancellationCallback, result); } result.parent.triggerEvent.Add(result); TaskTracker.TrackActiveTask(result, 3); token = result.core.Version; return result; } bool TryReturn() { TaskTracker.RemoveTracking(this); core.Reset(); cancellationTokenRegistration.Dispose(); cancellationTokenRegistration = default; parent.triggerEvent.Remove(this); parent = null; cancellationToken = default; return pool.TryPush(this); } static void CancellationCallback(object state) { var self = (WaitAsyncSource)state; self.OnCanceled(self.cancellationToken); } // IUniTaskSource public T GetResult(short token) { try { return core.GetResult(token); } finally { TryReturn(); } } void IUniTaskSource.GetResult(short token) { GetResult(token); } public void OnCompleted(Action continuation, object state, short token) { core.OnCompleted(continuation, state, token); } public UniTaskStatus GetStatus(short token) { return core.GetStatus(token); } public UniTaskStatus UnsafeGetStatus() { return core.UnsafeGetStatus(); } // ITriggerHandler ITriggerHandler ITriggerHandler.Prev { get; set; } ITriggerHandler ITriggerHandler.Next { get; set; } public void OnCanceled(CancellationToken cancellationToken) { core.TrySetCanceled(cancellationToken); } public void OnCompleted() { // Complete as Cancel. core.TrySetCanceled(CancellationToken.None); } public void OnError(Exception ex) { core.TrySetException(ex); } public void OnNext(T value) { core.TrySetResult(value); } } sealed class WithoutCurrentEnumerable : IUniTaskAsyncEnumerable { readonly AsyncReactiveProperty parent; public WithoutCurrentEnumerable(AsyncReactiveProperty parent) { this.parent = parent; } public IUniTaskAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = default) { return new Enumerator(parent, cancellationToken, false); } } sealed class Enumerator : MoveNextSource, IUniTaskAsyncEnumerator, ITriggerHandler { static Action cancellationCallback = CancellationCallback; readonly AsyncReactiveProperty parent; readonly CancellationToken cancellationToken; readonly CancellationTokenRegistration cancellationTokenRegistration; T value; bool isDisposed; bool firstCall; public Enumerator(AsyncReactiveProperty parent, CancellationToken cancellationToken, bool publishCurrentValue) { this.parent = parent; this.cancellationToken = cancellationToken; this.firstCall = publishCurrentValue; parent.triggerEvent.Add(this); TaskTracker.TrackActiveTask(this, 3); if (cancellationToken.CanBeCanceled) { cancellationTokenRegistration = cancellationToken.RegisterWithoutCaptureExecutionContext(cancellationCallback, this); } } public T Current => value; ITriggerHandler ITriggerHandler.Prev { get; set; } ITriggerHandler ITriggerHandler.Next { get; set; } public UniTask MoveNextAsync() { // raise latest value on first call. if (firstCall) { firstCall = false; value = parent.Value; return CompletedTasks.True; } completionSource.Reset(); return new UniTask(this, completionSource.Version); } public UniTask DisposeAsync() { if (!isDisposed) { isDisposed = true; TaskTracker.RemoveTracking(this); completionSource.TrySetCanceled(cancellationToken); parent.triggerEvent.Remove(this); } return default; } public void OnNext(T value) { this.value = value; completionSource.TrySetResult(true); } public void OnCanceled(CancellationToken cancellationToken) { DisposeAsync().Forget(); } public void OnCompleted() { completionSource.TrySetResult(false); } public void OnError(Exception ex) { completionSource.TrySetException(ex); } static void CancellationCallback(object state) { var self = (Enumerator)state; self.DisposeAsync().Forget(); } } } public class ReadOnlyAsyncReactiveProperty : IReadOnlyAsyncReactiveProperty, IDisposable { TriggerEvent triggerEvent; T latestValue; IUniTaskAsyncEnumerator enumerator; public T Value { get { return latestValue; } } public ReadOnlyAsyncReactiveProperty(T initialValue, IUniTaskAsyncEnumerable source, CancellationToken cancellationToken) { latestValue = initialValue; ConsumeEnumerator(source, cancellationToken).Forget(); } public ReadOnlyAsyncReactiveProperty(IUniTaskAsyncEnumerable source, CancellationToken cancellationToken) { ConsumeEnumerator(source, cancellationToken).Forget(); } async UniTaskVoid ConsumeEnumerator(IUniTaskAsyncEnumerable source, CancellationToken cancellationToken) { enumerator = source.GetAsyncEnumerator(cancellationToken); try { while (await enumerator.MoveNextAsync()) { var value = enumerator.Current; this.latestValue = value; triggerEvent.SetResult(value); } } finally { await enumerator.DisposeAsync(); enumerator = null; } } public IUniTaskAsyncEnumerable WithoutCurrent() { return new WithoutCurrentEnumerable(this); } public IUniTaskAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken) { return new Enumerator(this, cancellationToken, true); } public void Dispose() { if (enumerator != null) { enumerator.DisposeAsync().Forget(); } triggerEvent.SetCompleted(); } public static implicit operator T(ReadOnlyAsyncReactiveProperty value) { return value.Value; } public override string ToString() { if (isValueType) return latestValue.ToString(); return latestValue?.ToString(); } public UniTask WaitAsync(CancellationToken cancellationToken = default) { return new UniTask(WaitAsyncSource.Create(this, cancellationToken, out var token), token); } static bool isValueType; static ReadOnlyAsyncReactiveProperty() { isValueType = typeof(T).IsValueType; } sealed class WaitAsyncSource : IUniTaskSource, ITriggerHandler, ITaskPoolNode { static Action cancellationCallback = CancellationCallback; static TaskPool pool; WaitAsyncSource nextNode; ref WaitAsyncSource ITaskPoolNode.NextNode => ref nextNode; static WaitAsyncSource() { TaskPool.RegisterSizeGetter(typeof(WaitAsyncSource), () => pool.Size); } ReadOnlyAsyncReactiveProperty parent; CancellationToken cancellationToken; CancellationTokenRegistration cancellationTokenRegistration; UniTaskCompletionSourceCore core; WaitAsyncSource() { } public static IUniTaskSource Create(ReadOnlyAsyncReactiveProperty parent, CancellationToken cancellationToken, out short token) { if (cancellationToken.IsCancellationRequested) { return AutoResetUniTaskCompletionSource.CreateFromCanceled(cancellationToken, out token); } if (!pool.TryPop(out var result)) { result = new WaitAsyncSource(); } result.parent = parent; result.cancellationToken = cancellationToken; if (cancellationToken.CanBeCanceled) { result.cancellationTokenRegistration = cancellationToken.RegisterWithoutCaptureExecutionContext(cancellationCallback, result); } result.parent.triggerEvent.Add(result); TaskTracker.TrackActiveTask(result, 3); token = result.core.Version; return result; } bool TryReturn() { TaskTracker.RemoveTracking(this); core.Reset(); cancellationTokenRegistration.Dispose(); cancellationTokenRegistration = default; parent.triggerEvent.Remove(this); parent = null; cancellationToken = default; return pool.TryPush(this); } static void CancellationCallback(object state) { var self = (WaitAsyncSource)state; self.OnCanceled(self.cancellationToken); } // IUniTaskSource public T GetResult(short token) { try { return core.GetResult(token); } finally { TryReturn(); } } void IUniTaskSource.GetResult(short token) { GetResult(token); } public void OnCompleted(Action continuation, object state, short token) { core.OnCompleted(continuation, state, token); } public UniTaskStatus GetStatus(short token) { return core.GetStatus(token); } public UniTaskStatus UnsafeGetStatus() { return core.UnsafeGetStatus(); } // ITriggerHandler ITriggerHandler ITriggerHandler.Prev { get; set; } ITriggerHandler ITriggerHandler.Next { get; set; } public void OnCanceled(CancellationToken cancellationToken) { core.TrySetCanceled(cancellationToken); } public void OnCompleted() { // Complete as Cancel. core.TrySetCanceled(CancellationToken.None); } public void OnError(Exception ex) { core.TrySetException(ex); } public void OnNext(T value) { core.TrySetResult(value); } } sealed class WithoutCurrentEnumerable : IUniTaskAsyncEnumerable { readonly ReadOnlyAsyncReactiveProperty parent; public WithoutCurrentEnumerable(ReadOnlyAsyncReactiveProperty parent) { this.parent = parent; } public IUniTaskAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = default) { return new Enumerator(parent, cancellationToken, false); } } sealed class Enumerator : MoveNextSource, IUniTaskAsyncEnumerator, ITriggerHandler { static Action cancellationCallback = CancellationCallback; readonly ReadOnlyAsyncReactiveProperty parent; readonly CancellationToken cancellationToken; readonly CancellationTokenRegistration cancellationTokenRegistration; T value; bool isDisposed; bool firstCall; public Enumerator(ReadOnlyAsyncReactiveProperty parent, CancellationToken cancellationToken, bool publishCurrentValue) { this.parent = parent; this.cancellationToken = cancellationToken; this.firstCall = publishCurrentValue; parent.triggerEvent.Add(this); TaskTracker.TrackActiveTask(this, 3); if (cancellationToken.CanBeCanceled) { cancellationTokenRegistration = cancellationToken.RegisterWithoutCaptureExecutionContext(cancellationCallback, this); } } public T Current => value; ITriggerHandler ITriggerHandler.Prev { get; set; } ITriggerHandler ITriggerHandler.Next { get; set; } public UniTask MoveNextAsync() { // raise latest value on first call. if (firstCall) { firstCall = false; value = parent.Value; return CompletedTasks.True; } completionSource.Reset(); return new UniTask(this, completionSource.Version); } public UniTask DisposeAsync() { if (!isDisposed) { isDisposed = true; TaskTracker.RemoveTracking(this); completionSource.TrySetCanceled(cancellationToken); parent.triggerEvent.Remove(this); } return default; } public void OnNext(T value) { this.value = value; completionSource.TrySetResult(true); } public void OnCanceled(CancellationToken cancellationToken) { DisposeAsync().Forget(); } public void OnCompleted() { completionSource.TrySetResult(false); } public void OnError(Exception ex) { completionSource.TrySetException(ex); } static void CancellationCallback(object state) { var self = (Enumerator)state; self.DisposeAsync().Forget(); } } } public static class StateExtensions { public static ReadOnlyAsyncReactiveProperty ToReadOnlyAsyncReactiveProperty(this IUniTaskAsyncEnumerable source, CancellationToken cancellationToken) { return new ReadOnlyAsyncReactiveProperty(source, cancellationToken); } public static ReadOnlyAsyncReactiveProperty ToReadOnlyAsyncReactiveProperty(this IUniTaskAsyncEnumerable source, T initialValue, CancellationToken cancellationToken) { return new ReadOnlyAsyncReactiveProperty(initialValue, source, cancellationToken); } } }