using Cysharp.Threading.Tasks.Internal; using System; using System.Threading; namespace Cysharp.Threading.Tasks.Linq { public static partial class UniTaskAsyncEnumerable { public static IObservable ToObservable(this IUniTaskAsyncEnumerable source) { Error.ThrowArgumentNullException(source, nameof(source)); return new ToObservable(source); } } internal sealed class ToObservable : IObservable { readonly IUniTaskAsyncEnumerable source; public ToObservable(IUniTaskAsyncEnumerable source) { this.source = source; } public IDisposable Subscribe(IObserver observer) { var ctd = new CancellationTokenDisposable(); RunAsync(source, observer, ctd.Token).Forget(); return ctd; } static async UniTaskVoid RunAsync(IUniTaskAsyncEnumerable src, IObserver observer, CancellationToken cancellationToken) { // cancellationToken.IsCancellationRequested is called when Rx's Disposed. // when disposed, finish silently. var e = src.GetAsyncEnumerator(cancellationToken); try { bool hasNext; do { try { hasNext = await e.MoveNextAsync(); } catch (Exception ex) { if (cancellationToken.IsCancellationRequested) { return; } observer.OnError(ex); return; } if (hasNext) { observer.OnNext(e.Current); } else { observer.OnCompleted(); return; } } while (!cancellationToken.IsCancellationRequested); } finally { if (e != null) { await e.DisposeAsync(); } } } internal sealed class CancellationTokenDisposable : IDisposable { readonly CancellationTokenSource cts = new CancellationTokenSource(); public CancellationToken Token => cts.Token; public void Dispose() { if (!cts.IsCancellationRequested) { cts.Cancel(); } } } } }