using Cysharp.Threading.Tasks.Internal; using System; using System.Threading; namespace Cysharp.Threading.Tasks.Linq { public static partial class UniTaskAsyncEnumerable { public static IConnectableUniTaskAsyncEnumerable Publish(this IUniTaskAsyncEnumerable source) { Error.ThrowArgumentNullException(source, nameof(source)); return new Publish(source); } } internal sealed class Publish : IConnectableUniTaskAsyncEnumerable { readonly IUniTaskAsyncEnumerable source; readonly CancellationTokenSource cancellationTokenSource; TriggerEvent trigger; IUniTaskAsyncEnumerator enumerator; IDisposable connectedDisposable; bool isCompleted; public Publish(IUniTaskAsyncEnumerable source) { this.source = source; this.cancellationTokenSource = new CancellationTokenSource(); } public IDisposable Connect() { if (connectedDisposable != null) return connectedDisposable; if (enumerator == null) { enumerator = source.GetAsyncEnumerator(cancellationTokenSource.Token); } ConsumeEnumerator().Forget(); connectedDisposable = new ConnectDisposable(cancellationTokenSource); return connectedDisposable; } async UniTaskVoid ConsumeEnumerator() { try { try { while (await enumerator.MoveNextAsync()) { trigger.SetResult(enumerator.Current); } trigger.SetCompleted(); } catch (Exception ex) { trigger.SetError(ex); } } finally { isCompleted = true; await enumerator.DisposeAsync(); } } public IUniTaskAsyncEnumerator GetAsyncEnumerator(CancellationToken cancellationToken = default) { return new _Publish(this, cancellationToken); } sealed class ConnectDisposable : IDisposable { readonly CancellationTokenSource cancellationTokenSource; public ConnectDisposable(CancellationTokenSource cancellationTokenSource) { this.cancellationTokenSource = cancellationTokenSource; } public void Dispose() { this.cancellationTokenSource.Cancel(); } } sealed class _Publish : MoveNextSource, IUniTaskAsyncEnumerator, ITriggerHandler { static readonly Action CancelDelegate = OnCanceled; readonly Publish parent; CancellationToken cancellationToken; CancellationTokenRegistration cancellationTokenRegistration; bool isDisposed; public _Publish(Publish parent, CancellationToken cancellationToken) { if (cancellationToken.IsCancellationRequested) return; this.parent = parent; this.cancellationToken = cancellationToken; if (cancellationToken.CanBeCanceled) { this.cancellationTokenRegistration = cancellationToken.RegisterWithoutCaptureExecutionContext(CancelDelegate, this); } parent.trigger.Add(this); TaskTracker.TrackActiveTask(this, 3); } public TSource Current { get; private set; } ITriggerHandler ITriggerHandler.Prev { get; set; } ITriggerHandler ITriggerHandler.Next { get; set; } public UniTask MoveNextAsync() { cancellationToken.ThrowIfCancellationRequested(); if (parent.isCompleted) return CompletedTasks.False; completionSource.Reset(); return new UniTask(this, completionSource.Version); } static void OnCanceled(object state) { var self = (_Publish)state; self.completionSource.TrySetCanceled(self.cancellationToken); self.DisposeAsync().Forget(); } public UniTask DisposeAsync() { if (!isDisposed) { isDisposed = true; TaskTracker.RemoveTracking(this); cancellationTokenRegistration.Dispose(); parent.trigger.Remove(this); } return default; } public void OnNext(TSource value) { Current = value; completionSource.TrySetResult(true); } public void OnCanceled(CancellationToken cancellationToken) { completionSource.TrySetCanceled(cancellationToken); } public void OnCompleted() { completionSource.TrySetResult(false); } public void OnError(Exception ex) { completionSource.TrySetException(ex); } } } }