using Cysharp.Threading.Tasks.Internal; using System; using System.Threading; using Subscribes = Cysharp.Threading.Tasks.Linq.Subscribe; namespace Cysharp.Threading.Tasks.Linq { public static partial class UniTaskAsyncEnumerable { // OnNext public static IDisposable Subscribe(this IUniTaskAsyncEnumerable source, Action action) { Error.ThrowArgumentNullException(source, nameof(source)); Error.ThrowArgumentNullException(action, nameof(action)); var cts = new CancellationTokenDisposable(); Subscribes.SubscribeCore(source, action, Subscribes.NopError, Subscribes.NopCompleted, cts.Token).Forget(); return cts; } public static IDisposable Subscribe(this IUniTaskAsyncEnumerable source, Func action) { Error.ThrowArgumentNullException(source, nameof(source)); Error.ThrowArgumentNullException(action, nameof(action)); var cts = new CancellationTokenDisposable(); Subscribes.SubscribeCore(source, action, Subscribes.NopError, Subscribes.NopCompleted, cts.Token).Forget(); return cts; } public static IDisposable Subscribe(this IUniTaskAsyncEnumerable source, Func action) { Error.ThrowArgumentNullException(source, nameof(source)); Error.ThrowArgumentNullException(action, nameof(action)); var cts = new CancellationTokenDisposable(); Subscribes.SubscribeCore(source, action, Subscribes.NopError, Subscribes.NopCompleted, cts.Token).Forget(); return cts; } public static void Subscribe(this IUniTaskAsyncEnumerable source, Action action, CancellationToken cancellationToken) { Error.ThrowArgumentNullException(source, nameof(source)); Error.ThrowArgumentNullException(action, nameof(action)); Subscribes.SubscribeCore(source, action, Subscribes.NopError, Subscribes.NopCompleted, cancellationToken).Forget(); } public static void Subscribe(this IUniTaskAsyncEnumerable source, Func action, CancellationToken cancellationToken) { Error.ThrowArgumentNullException(source, nameof(source)); Error.ThrowArgumentNullException(action, nameof(action)); Subscribes.SubscribeCore(source, action, Subscribes.NopError, Subscribes.NopCompleted, cancellationToken).Forget(); } public static void Subscribe(this IUniTaskAsyncEnumerable source, Func action, CancellationToken cancellationToken) { Error.ThrowArgumentNullException(source, nameof(source)); Error.ThrowArgumentNullException(action, nameof(action)); Subscribes.SubscribeCore(source, action, Subscribes.NopError, Subscribes.NopCompleted, cancellationToken).Forget(); } public static IDisposable SubscribeAwait(this IUniTaskAsyncEnumerable source, Func onNext) { Error.ThrowArgumentNullException(source, nameof(source)); Error.ThrowArgumentNullException(onNext, nameof(onNext)); var cts = new CancellationTokenDisposable(); Subscribes.SubscribeAwaitCore(source, onNext, Subscribes.NopError, Subscribes.NopCompleted, cts.Token).Forget(); return cts; } public static void SubscribeAwait(this IUniTaskAsyncEnumerable source, Func onNext, CancellationToken cancellationToken) { Error.ThrowArgumentNullException(source, nameof(source)); Error.ThrowArgumentNullException(onNext, nameof(onNext)); Subscribes.SubscribeAwaitCore(source, onNext, Subscribes.NopError, Subscribes.NopCompleted, cancellationToken).Forget(); } public static IDisposable SubscribeAwait(this IUniTaskAsyncEnumerable source, Func onNext) { Error.ThrowArgumentNullException(source, nameof(source)); Error.ThrowArgumentNullException(onNext, nameof(onNext)); var cts = new CancellationTokenDisposable(); Subscribes.SubscribeAwaitCore(source, onNext, Subscribes.NopError, Subscribes.NopCompleted, cts.Token).Forget(); return cts; } public static void SubscribeAwait(this IUniTaskAsyncEnumerable source, Func onNext, CancellationToken cancellationToken) { Error.ThrowArgumentNullException(source, nameof(source)); Error.ThrowArgumentNullException(onNext, nameof(onNext)); Subscribes.SubscribeAwaitCore(source, onNext, Subscribes.NopError, Subscribes.NopCompleted, cancellationToken).Forget(); } // OnNext, OnError public static IDisposable Subscribe(this IUniTaskAsyncEnumerable source, Action onNext, Action onError) { Error.ThrowArgumentNullException(source, nameof(source)); Error.ThrowArgumentNullException(onNext, nameof(onNext)); Error.ThrowArgumentNullException(onError, nameof(onError)); var cts = new CancellationTokenDisposable(); Subscribes.SubscribeCore(source, onNext, onError, Subscribes.NopCompleted, cts.Token).Forget(); return cts; } public static IDisposable Subscribe(this IUniTaskAsyncEnumerable source, Func onNext, Action onError) { Error.ThrowArgumentNullException(source, nameof(source)); Error.ThrowArgumentNullException(onNext, nameof(onNext)); Error.ThrowArgumentNullException(onError, nameof(onError)); var cts = new CancellationTokenDisposable(); Subscribes.SubscribeCore(source, onNext, onError, Subscribes.NopCompleted, cts.Token).Forget(); return cts; } public static void Subscribe(this IUniTaskAsyncEnumerable source, Action onNext, Action onError, CancellationToken cancellationToken) { Error.ThrowArgumentNullException(source, nameof(source)); Error.ThrowArgumentNullException(onNext, nameof(onNext)); Error.ThrowArgumentNullException(onError, nameof(onError)); Subscribes.SubscribeCore(source, onNext, onError, Subscribes.NopCompleted, cancellationToken).Forget(); } public static void Subscribe(this IUniTaskAsyncEnumerable source, Func onNext, Action onError, CancellationToken cancellationToken) { Error.ThrowArgumentNullException(source, nameof(source)); Error.ThrowArgumentNullException(onNext, nameof(onNext)); Error.ThrowArgumentNullException(onError, nameof(onError)); Subscribes.SubscribeCore(source, onNext, onError, Subscribes.NopCompleted, cancellationToken).Forget(); } public static IDisposable SubscribeAwait(this IUniTaskAsyncEnumerable source, Func onNext, Action onError) { Error.ThrowArgumentNullException(source, nameof(source)); Error.ThrowArgumentNullException(onNext, nameof(onNext)); Error.ThrowArgumentNullException(onError, nameof(onError)); var cts = new CancellationTokenDisposable(); Subscribes.SubscribeAwaitCore(source, onNext, onError, Subscribes.NopCompleted, cts.Token).Forget(); return cts; } public static void SubscribeAwait(this IUniTaskAsyncEnumerable source, Func onNext, Action onError, CancellationToken cancellationToken) { Error.ThrowArgumentNullException(source, nameof(source)); Error.ThrowArgumentNullException(onNext, nameof(onNext)); Error.ThrowArgumentNullException(onError, nameof(onError)); Subscribes.SubscribeAwaitCore(source, onNext, onError, Subscribes.NopCompleted, cancellationToken).Forget(); } public static IDisposable SubscribeAwait(this IUniTaskAsyncEnumerable source, Func onNext, Action onError) { Error.ThrowArgumentNullException(source, nameof(source)); Error.ThrowArgumentNullException(onNext, nameof(onNext)); Error.ThrowArgumentNullException(onError, nameof(onError)); var cts = new CancellationTokenDisposable(); Subscribes.SubscribeAwaitCore(source, onNext, onError, Subscribes.NopCompleted, cts.Token).Forget(); return cts; } public static void SubscribeAwait(this IUniTaskAsyncEnumerable source, Func onNext, Action onError, CancellationToken cancellationToken) { Error.ThrowArgumentNullException(source, nameof(source)); Error.ThrowArgumentNullException(onNext, nameof(onNext)); Error.ThrowArgumentNullException(onError, nameof(onError)); Subscribes.SubscribeAwaitCore(source, onNext, onError, Subscribes.NopCompleted, cancellationToken).Forget(); } // OnNext, OnCompleted public static IDisposable Subscribe(this IUniTaskAsyncEnumerable source, Action onNext, Action onCompleted) { Error.ThrowArgumentNullException(source, nameof(source)); Error.ThrowArgumentNullException(onNext, nameof(onNext)); Error.ThrowArgumentNullException(onCompleted, nameof(onCompleted)); var cts = new CancellationTokenDisposable(); Subscribes.SubscribeCore(source, onNext, Subscribes.NopError, onCompleted, cts.Token).Forget(); return cts; } public static IDisposable Subscribe(this IUniTaskAsyncEnumerable source, Func onNext, Action onCompleted) { Error.ThrowArgumentNullException(source, nameof(source)); Error.ThrowArgumentNullException(onNext, nameof(onNext)); Error.ThrowArgumentNullException(onCompleted, nameof(onCompleted)); var cts = new CancellationTokenDisposable(); Subscribes.SubscribeCore(source, onNext, Subscribes.NopError, onCompleted, cts.Token).Forget(); return cts; } public static void Subscribe(this IUniTaskAsyncEnumerable source, Action onNext, Action onCompleted, CancellationToken cancellationToken) { Error.ThrowArgumentNullException(source, nameof(source)); Error.ThrowArgumentNullException(onNext, nameof(onNext)); Error.ThrowArgumentNullException(onCompleted, nameof(onCompleted)); Subscribes.SubscribeCore(source, onNext, Subscribes.NopError, onCompleted, cancellationToken).Forget(); } public static void Subscribe(this IUniTaskAsyncEnumerable source, Func onNext, Action onCompleted, CancellationToken cancellationToken) { Error.ThrowArgumentNullException(source, nameof(source)); Error.ThrowArgumentNullException(onNext, nameof(onNext)); Error.ThrowArgumentNullException(onCompleted, nameof(onCompleted)); Subscribes.SubscribeCore(source, onNext, Subscribes.NopError, onCompleted, cancellationToken).Forget(); } public static IDisposable SubscribeAwait(this IUniTaskAsyncEnumerable source, Func onNext, Action onCompleted) { Error.ThrowArgumentNullException(source, nameof(source)); Error.ThrowArgumentNullException(onNext, nameof(onNext)); Error.ThrowArgumentNullException(onCompleted, nameof(onCompleted)); var cts = new CancellationTokenDisposable(); Subscribes.SubscribeAwaitCore(source, onNext, Subscribes.NopError, onCompleted, cts.Token).Forget(); return cts; } public static void SubscribeAwait(this IUniTaskAsyncEnumerable source, Func onNext, Action onCompleted, CancellationToken cancellationToken) { Error.ThrowArgumentNullException(source, nameof(source)); Error.ThrowArgumentNullException(onNext, nameof(onNext)); Error.ThrowArgumentNullException(onCompleted, nameof(onCompleted)); Subscribes.SubscribeAwaitCore(source, onNext, Subscribes.NopError, onCompleted, cancellationToken).Forget(); } public static IDisposable SubscribeAwait(this IUniTaskAsyncEnumerable source, Func onNext, Action onCompleted) { Error.ThrowArgumentNullException(source, nameof(source)); Error.ThrowArgumentNullException(onNext, nameof(onNext)); Error.ThrowArgumentNullException(onCompleted, nameof(onCompleted)); var cts = new CancellationTokenDisposable(); Subscribes.SubscribeAwaitCore(source, onNext, Subscribes.NopError, onCompleted, cts.Token).Forget(); return cts; } public static void SubscribeAwait(this IUniTaskAsyncEnumerable source, Func onNext, Action onCompleted, CancellationToken cancellationToken) { Error.ThrowArgumentNullException(source, nameof(source)); Error.ThrowArgumentNullException(onNext, nameof(onNext)); Error.ThrowArgumentNullException(onCompleted, nameof(onCompleted)); Subscribes.SubscribeAwaitCore(source, onNext, Subscribes.NopError, onCompleted, cancellationToken).Forget(); } // IObserver public static IDisposable Subscribe(this IUniTaskAsyncEnumerable source, IObserver observer) { Error.ThrowArgumentNullException(source, nameof(source)); Error.ThrowArgumentNullException(observer, nameof(observer)); var cts = new CancellationTokenDisposable(); Subscribes.SubscribeCore(source, observer, cts.Token).Forget(); return cts; } public static void Subscribe(this IUniTaskAsyncEnumerable source, IObserver observer, CancellationToken cancellationToken) { Error.ThrowArgumentNullException(source, nameof(source)); Error.ThrowArgumentNullException(observer, nameof(observer)); Subscribes.SubscribeCore(source, observer, cancellationToken).Forget(); } } internal sealed class CancellationTokenDisposable : IDisposable { readonly CancellationTokenSource cts = new CancellationTokenSource(); public CancellationToken Token => cts.Token; public void Dispose() { if (!cts.IsCancellationRequested) { cts.Cancel(); } } } internal static class Subscribe { public static readonly Action NopError = _ => { }; public static readonly Action NopCompleted = () => { }; public static async UniTaskVoid SubscribeCore(IUniTaskAsyncEnumerable source, Action onNext, Action onError, Action onCompleted, CancellationToken cancellationToken) { var e = source.GetAsyncEnumerator(cancellationToken); try { while (await e.MoveNextAsync()) { try { onNext(e.Current); } catch (Exception ex) { UniTaskScheduler.PublishUnobservedTaskException(ex); } } onCompleted(); } catch (Exception ex) { if (onError == NopError) { UniTaskScheduler.PublishUnobservedTaskException(ex); return; } if (ex is OperationCanceledException) return; onError(ex); } finally { if (e != null) { await e.DisposeAsync(); } } } public static async UniTaskVoid SubscribeCore(IUniTaskAsyncEnumerable source, Func onNext, Action onError, Action onCompleted, CancellationToken cancellationToken) { var e = source.GetAsyncEnumerator(cancellationToken); try { while (await e.MoveNextAsync()) { try { onNext(e.Current).Forget(); } catch (Exception ex) { UniTaskScheduler.PublishUnobservedTaskException(ex); } } onCompleted(); } catch (Exception ex) { if (onError == NopError) { UniTaskScheduler.PublishUnobservedTaskException(ex); return; } if (ex is OperationCanceledException) return; onError(ex); } finally { if (e != null) { await e.DisposeAsync(); } } } public static async UniTaskVoid SubscribeCore(IUniTaskAsyncEnumerable source, Func onNext, Action onError, Action onCompleted, CancellationToken cancellationToken) { var e = source.GetAsyncEnumerator(cancellationToken); try { while (await e.MoveNextAsync()) { try { onNext(e.Current, cancellationToken).Forget(); } catch (Exception ex) { UniTaskScheduler.PublishUnobservedTaskException(ex); } } onCompleted(); } catch (Exception ex) { if (onError == NopError) { UniTaskScheduler.PublishUnobservedTaskException(ex); return; } if (ex is OperationCanceledException) return; onError(ex); } finally { if (e != null) { await e.DisposeAsync(); } } } public static async UniTaskVoid SubscribeCore(IUniTaskAsyncEnumerable source, IObserver observer, CancellationToken cancellationToken) { var e = source.GetAsyncEnumerator(cancellationToken); try { while (await e.MoveNextAsync()) { try { observer.OnNext(e.Current); } catch (Exception ex) { UniTaskScheduler.PublishUnobservedTaskException(ex); } } observer.OnCompleted(); } catch (Exception ex) { if (ex is OperationCanceledException) return; observer.OnError(ex); } finally { if (e != null) { await e.DisposeAsync(); } } } public static async UniTaskVoid SubscribeAwaitCore(IUniTaskAsyncEnumerable source, Func onNext, Action onError, Action onCompleted, CancellationToken cancellationToken) { var e = source.GetAsyncEnumerator(cancellationToken); try { while (await e.MoveNextAsync()) { try { await onNext(e.Current); } catch (Exception ex) { UniTaskScheduler.PublishUnobservedTaskException(ex); } } onCompleted(); } catch (Exception ex) { if (onError == NopError) { UniTaskScheduler.PublishUnobservedTaskException(ex); return; } if (ex is OperationCanceledException) return; onError(ex); } finally { if (e != null) { await e.DisposeAsync(); } } } public static async UniTaskVoid SubscribeAwaitCore(IUniTaskAsyncEnumerable source, Func onNext, Action onError, Action onCompleted, CancellationToken cancellationToken) { var e = source.GetAsyncEnumerator(cancellationToken); try { while (await e.MoveNextAsync()) { try { await onNext(e.Current, cancellationToken); } catch (Exception ex) { UniTaskScheduler.PublishUnobservedTaskException(ex); } } onCompleted(); } catch (Exception ex) { if (onError == NopError) { UniTaskScheduler.PublishUnobservedTaskException(ex); return; } if (ex is OperationCanceledException) return; onError(ex); } finally { if (e != null) { await e.DisposeAsync(); } } } } }