
рд╕рднреА рдХреЛ рдирдорд╕реНрдХрд╛рд░!
рд╣рд╛рд▓ рдХреЗ рд╡рд░реНрд╖реЛрдВ рдореЗрдВ, рдореИрдВ рдХреЛрдЯрд▓рд┐рди рдкрд░ рдПрдВрдбреНрд░реЙрдЗрдб рдХреЗ рд▓рд┐рдП рд╡рд┐рдХрд╛рд╕ рдХрд░ рд░рд╣рд╛ рд╣реВрдВред рдмрд╣реБрдд рд╕рдордп рдкрд╣рд▓реЗ рдирд╣реАрдВ, рдХреЛрдЯрд▓рд┐рди рдорд▓реНрдЯреАрдкреНрд▓реЗрдЯ рд░рд┐рдХреЙрд░реНрдбрд░ рдкрд░ RxJava рдХреА рдХрдореА рдХреЗ рд▓рд┐рдП, рд╣рдордиреЗ рдХреЙрдЯрд▓рд┐рди рдФрд░ рдкреНрд░рд╡рд╛рд╣ рдХрд╛ рдЙрдкрдпреЛрдЧ рдХрд░рдирд╛ рд╢реБрд░реВ рдХрд░ рджрд┐рдпрд╛ - рдХреЛрдЯрд▓рд┐рди рдЖрдЙрдЯ рдмреЙрдХреНрд╕ рдХреЗ рд▓рд┐рдП рдардВрдбреА рдзрд╛рд░рд╛рдПрдВред рдПрдВрдбреНрд░реЙрдЗрдб рд╕реЗ рдкрд╣рд▓реЗ, рдореИрдВрдиреЗ рд╕реА # рдХреЗ рд╕рд╛рде рдХрдИ рд╕рд╛рд▓ рдмрд┐рддрд╛рдП, рдФрд░ рд╡рд╣рд╛рдВ рдмрд╣реБрдд рд▓рдВрдмреЗ рд╕рдордп рддрдХ рдореЗрд░реЗ рдХреЙрд░рдЖрдЙрдЯ рд░рд╣реЗ рд╣реИрдВ, рдХреЗрд╡рд▓ рдЙрдиреНрд╣реЗрдВ рдЗрд╕ рддрд░рд╣ рд╕реЗ рдирд╣реАрдВ рдмреБрд▓рд╛рдпрд╛ рдЬрд╛рддрд╛ рд╣реИред рд▓реЗрдХрд┐рди рдореИрдВрдиреЗ рдПрд╕рд┐рдВрдХреНрд╕ / рд╡реЗрдЯ рдкрд░ рдкреНрд░рд╡рд╛рд╣ рдХреЗ рдПрдирд╛рд▓реЙрдЧ рдХреЗ рдмрд╛рд░реЗ рдореЗрдВ рдирд╣реАрдВ рд╕реБрдирд╛ред рдкреНрд░рддрд┐рдХреНрд░рд┐рдпрд╛рд╢реАрд▓ рдкреНрд░реЛрдЧреНрд░рд╛рдорд┐рдВрдЧ рдХрд╛ рдореБрдЦреНрдп рдЙрдкрдХрд░рдг рдЖрд░рдПрдХреНрд╕.рдиреЗрдЯ рд╣реИ (рд╡рд╛рд╕реНрддрд╡ рдореЗрдВ, рдЖрд░рдПрдХреНрд╕ рдХрд╛ рдЬрдиреНрдо рдпрд╣рд╛рдВ рд╣реБрдЖ рдерд╛)ред рдЗрд╕рд▓рд┐рдП рдореИрдВрдиреЗ рдЙрджрд╛рд╕реАрди рд╣реЛрдиреЗ рдХрд╛ рдлреИрд╕рд▓рд╛ рдХрд┐рдпрд╛ рдФрд░ рдПрдХ рдмрд╛рдЗрдХ рдХреЛ рджреЗрдЦрдиреЗ рдХреА рдХреЛрд╢рд┐рд╢ рдХреАред
рдпрд╣ рдЖрдЧреЗ рд╕рдордЭрд╛ рдЧрдпрд╛ рд╣реИ рдХрд┐ рдкрд╛рдардХ рдЙрди рдЪреАрдЬреЛрдВ рд╕реЗ рдЕрд╡рдЧрдд рд╣реИ рдЬреЛ рдкрд┐рдЫрд▓реЗ рдкреИрд░рд╛рдЧреНрд░рд╛рдл рдореЗрдВ рдЪрд░реНрдЪрд╛ рдХреА рдЧрдИ рдереАрдВред рдЕрдзреАрд░ рдХреЗ рд▓рд┐рдП - рддреБрд░рдВрдд рднрдВрдбрд╛рд░ рд╕реЗ рд▓рд┐рдВрдХ рдХрд░реЗрдВ ред
рдбрд┐рд╕реНрдХреНрд▓реЗрдорд░: рдпрд╣ рдХреЛрдб рдЙрддреНрдкрд╛рджрди рдореЗрдВ рдЙрдкрдпреЛрдЧ рдХрд┐рдП рдЬрд╛рдиреЗ рдХреЗ рд▓рд┐рдП рдирд╣реАрдВ рд╣реИред рдпрд╣ рдПрдХ рдЕрд╡рдзрд╛рд░рдгрд╛ рд╣реИ, рдЕрдзрд┐рдХ рдХреБрдЫ рдирд╣реАрдВред рдХреБрдЫ рдмрд┐рд▓реНрдХреБрд▓ рдХреЗ рд░реВрдк рдореЗрдВ рдХрд╛рдо рдирд╣реАрдВ рдХрд░ рд╕рдХрддреЗред
IFlow рдФрд░ IFlowCollector
рдареАрдХ рд╣реИ, рдЪрд▓реЛ рдорд╛рдереЗ рдкрд░ рд╕реА # рдореЗрдВ рдлреНрд▓реЛ рдФрд░ рдлреНрд▓реЛрдХреЛрд▓реЗрдХреНрдЯрд░ рдЗрдВрдЯрд░рдлреЗрд╕ рдХреЛ рдлрд┐рд░ рд╕реЗ рд▓рд┐рдЦрдирд╛ рд╢реБрд░реВ рдХрд░реЗрдВред
рдпрд╣ рдерд╛:
interface Flow<out T> { suspend fun collect(collector: FlowCollector<T>) } interface FlowCollector<in T> { suspend fun emit(value: T) }
рдпрд╣ рдмрди рдЧрдпрд╛:
public interface IFlow<out T> { Task Collect(IFlowCollector<T> collector); } public interface IFlowCollector<in T> { Task Emit(T item); }
рдореЗрд░рд╛ рдорд╛рдирдирд╛ тАЛтАЛрд╣реИ рдХрд┐ рдЕрдВрддрд░ рд╕рдордЭрдиреЗ рдФрд░ рдЕрддреБрд▓реНрдпрдХрд╛рд▓рд┐рдХ рдХреЗ рдЕрд▓рдЧ-рдЕрд▓рдЧ рдХрд╛рд░реНрдпрд╛рдиреНрд╡рдпрди рджреНрд╡рд╛рд░рд╛ рд╕рдордЭрд╛рдпрд╛ рдЧрдпрд╛ рд╣реИред
рдЗрди рдЗрдВрдЯрд░рдлреЗрд╕ рдХрд╛ рдЙрдкрдпреЛрдЧ рдХрд░рдиреЗ рдХреЗ рд▓рд┐рдП, рдЙрдиреНрд╣реЗрдВ рд▓рд╛рдЧреВ рдХрд┐рдпрд╛ рдЬрд╛рдирд╛ рдЪрд╛рд╣рд┐рдПред рдпрд╣рд╛рдБ рдХреНрдпрд╛ рд╣реБрдЖ:
internal class Flow<T> : IFlow<T> { private readonly Func<IFlowCollector<T>, Task> _emitter; public Flow(Func<IFlowCollector<T>, Task> emitter) { _emitter = emitter; } public Task Collect(IFlowCollector<T> collector) { return _emitter(collector); } } internal class FlowCollector<T> : IFlowCollector<T> { private readonly Func<T, Task> _handler; public FlowCollector(Func<T, Task> handler) { _handler = handler; } public Task Emit(T item) { return _handler(item); } }
рдкреНрд░рд╡рд╛рд╣ рдХреЗ рдирд┐рд░реНрдорд╛рддрд╛ рдореЗрдВ, рд╣рдо рдПрдХ рдлрд╝рдВрдХреНрд╢рди рдкрд╛рд╕ рдХрд░рддреЗ рд╣реИрдВ рдЬреЛ рдореВрд▓реНрдпреЛрдВ рдХрд╛ рдЙрддреНрд╕рд░реНрдЬрди рдХрд░реЗрдЧрд╛ред рдФрд░ рдХрд▓реЗрдХреНрдЯрд░ рдХреЗ рдирд┐рд░реНрдорд╛рдг рдХреЗ рд▓рд┐рдП, рдПрдХ рдлрд╝рдВрдХреНрд╢рди рдЬреЛ рдкреНрд░рддреНрдпреЗрдХ рдЙрддреНрд╕рд░реНрдЬрд┐рдд рдореВрд▓реНрдп рдХреЛ рд╕рдВрд╕рд╛рдзрд┐рдд рдХрд░реЗрдЧрд╛ред
рдЖрдк рдЗрд╕реЗ рдЗрд╕ рддрд░рд╣ рд╕реЗ рдЙрдкрдпреЛрдЧ рдХрд░ рд╕рдХрддреЗ рд╣реИрдВ
var flow = new Flow<int>(async collector => { await collector.Emit(1); await Task.Delay(1000); await collector.Emit(2); await Task.Delay(1000); await collector.Emit(3); }); var collector = new FlowCollector<int>(async item => Console.WriteLine(item)); await flow.Collect(collector);
рдореБрдЭреЗ рд▓рдЧрддрд╛ рд╣реИ рдХрд┐ рдЙрдкрд░реЛрдХреНрдд рдХреЛрдб рдореЗрдВ рд╕рдм рдХреБрдЫ рд╕реНрдкрд╖реНрдЯ рд╣реИред рдкрд╣рд▓реЗ рд╣рдо рдПрдХ рдлреНрд▓реЛ рдмрдирд╛рддреЗ рд╣реИрдВ, рдлрд┐рд░ рдПрдХ рдХрд▓реЗрдХреНрдЯрд░ рдмрдирд╛рддреЗ рд╣реИрдВ (рдкреНрд░рддреНрдпреЗрдХ рддрддреНрд╡ рдХреЗ рд▓рд┐рдП рд╣реИрдВрдбрд▓рд░)ред рдлрд┐рд░ рд╣рдо рдлреНрд▓реЛ рд╢реБрд░реВ рдХрд░рддреЗ рд╣реИрдВ, рдЙрд╕ рдкрд░ рдПрдХ рдХрд▓реЗрдХреНрдЯрд░ "рд╣рд╕реНрддрд╛рдХреНрд╖рд░рд┐рдд" рдХрд░рддреЗ рд╣реИрдВред рдпрджрд┐ рдЖрдк рдереЛрдбрд╝реА рд╕реА рдЪреАрдиреА (рдЬреАрдереВрдм рджреЗрдЦреЗрдВ) рдЬреЛрдбрд╝рддреЗ рд╣реИрдВ, рддреЛ рд╣рдореЗрдВ рдХреБрдЫ рдРрд╕рд╛ рдорд┐рд▓рддрд╛ рд╣реИ:
await Flow<int>(async collector => { await collector.Emit(1); await Task.Delay(1000); await collector.Emit(2); await Task.Delay(1000); await collector.Emit(3); }) .Collect(Console.WriteLine);
рдХреЛрдЯрд▓рд┐рди рдкрд░ рдпрд╣ рдЗрд╕ рддрд░рд╣ рджрд┐рдЦрддрд╛ рд╣реИ:
scope.launch{ flow{ emit(1) delay(1000) тАж }.collect{ printl(it) } }
рд╡реНрдпрдХреНрддрд┐рдЧрдд рд░реВрдк рд╕реЗ, рдореИрдВ рд╕рднреА рдХреЛ рдПрдХ рдкреНрд░рд╡рд╛рд╣ рдмрдирд╛рддреЗ рд╕рдордп рд╕реНрдкрд╖реНрдЯ рд░реВрдк рд╕реЗ рддрддреНрд╡ рдХреЗ рдкреНрд░рдХрд╛рд░ рдХреЛ рдЗрдВрдЧрд┐рдд рдХрд░рдиреЗ рдХреЗ рд▓рд┐рдП рд╢рд╛рд░реНрдк рдкрд░ рд╡рд┐рдХрд▓реНрдк рдкрд╕рдВрдж рдирд╣реАрдВ рдХрд░рддрд╛ рд╣реВрдВред рд▓реЗрдХрд┐рди рдпрд╣рд╛рдБ рдмрд╛рдд рдпрд╣ рдирд╣реАрдВ рд╣реИ рдХрд┐ рдХреЛрдЯрд▓рд┐рди рдореЗрдВ рдЯрд╛рдЗрдк рдЗрдВрдЯреНрд░реЗрдВрд╕ рдмрд╣реБрдд рдЕрдзрд┐рдХ рд╣реИред рдкреНрд░рд╡рд╛рд╣ рд╕рдорд╛рд░реЛрд╣ рдЗрд╕ рддрд░рд╣ рджрд┐рдЦрддрд╛ рд╣реИ:
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
рдЬреИрд╕рд╛ рдХрд┐ рд╣рдо рджреЗрдЦ рд╕рдХрддреЗ рд╣реИрдВ, рдмреНрд▓реЙрдХ рдкреИрд░рд╛рдореАрдЯрд░ рдХреЛ рдмрд┐рд▓реНрдбрд░рдЗрдВрдЯрд░рдиреЗрд╢рди рдПрдиреЛрдЯреЗрд╢рди рдХреЗ рд╕рд╛рде рдЪрд┐рд╣реНрдирд┐рдд рдХрд┐рдпрд╛ рдЧрдпрд╛ рд╣реИ, рдЬреЛ рдХрдВрдкрд╛рдЗрд▓рд░ рдХреЛ рдмрддрд╛рддрд╛ рд╣реИ рдХрд┐ рдЗрд╕ рдкреИрд░рд╛рдореАрдЯрд░ рд╕реЗ рдЯрд╛рдЗрдк рд▓рд┐рдпрд╛ рдЬрд╛рдирд╛ рдЪрд╛рд╣рд┐рдПред рдХреНрдпрд╛ рдХрд┐рд╕реА рдХреЛ рдкрддрд╛ рд╣реИ рдХрд┐ рдХреНрдпрд╛ рд░реЛрдЬрд▓рд┐рди рдкрд░ C # рдХреЗ рд▓рд┐рдП рд╕рдорд╛рди рджрд░реНрдЬ рдХрд░рдирд╛ рд╕рдВрднрд╡ рд╣реИ?
CancellationToken
рдЖрд░рдПрдХреНрд╕ рдореЗрдВ рдПрдХ рд╕рджрд╕реНрдпрддрд╛ рд╣реИ рдЬрд┐рд╕рдореЗрдВ рд╕реЗ рдЖрдк рд╕рджрд╕реНрдпрддрд╛ рд╕рдорд╛рдкреНрдд рдХрд░ рд╕рдХрддреЗ рд╣реИрдВред рдХреЛрдЯрд▓рд┐рди рдлреНрд▓реЛ рдореЗрдВ, рдиреМрдХрд░реА рд░рджреНрдж рдХрд░рдиреЗ рдХреЗ рд▓рд┐рдП рдЬрд╝рд┐рдореНрдореЗрджрд╛рд░ рд╣реИ, рдЬрд┐рд╕реЗ рдмрд┐рд▓реНрдбрд░, рдпрд╛ рдХреЙрд░рдЯреАрди рд╕реНрдХреЛрдк рджреНрд╡рд╛рд░рд╛ рд▓реМрдЯрд╛рдпрд╛ рдЬрд╛рддрд╛ рд╣реИред рд╣рдореЗрдВ рдирд┐рд╢реНрдЪрд┐рдд рд░реВрдк рд╕реЗ рдПрдХ рдЙрдкрдХрд░рдг рдХреА рдЖрд╡рд╢реНрдпрдХрддрд╛ рд╣реИ рдЬреЛ рдлреНрд▓реЛ рдХреЛ рдЬрд▓реНрджреА рдкреВрд░рд╛ рдХрд░рдиреЗ рдХреА рдЕрдиреБрдорддрд┐ рджреЗрддрд╛ рд╣реИред C # рдореЗрдВ, рдПрд╕рд┐рдВрдХреНрд░реЛрдирд╕ рдСрдкрд░реЗрд╢рдВрд╕ рдХреЛ рд░рджреНрдж рдХрд░рдиреЗ рдХреЗ рд▓рд┐рдП, рдореИрдВ рдЗрд╕ рд╢рдмреНрдж рд╕реЗ рдирд╣реАрдВ рдбрд░рддрд╛, рдХреИрдВрд╕рд┐рд▓реЗрд╢рди рдЯреЛрдХрди рдкреИрдЯрд░реНрди рдХрд╛ рдЙрдкрдпреЛрдЧ рдХрд┐рдпрд╛ рдЬрд╛рддрд╛ рд╣реИред рдХреИрдВрд╕реЗрд▓реЗрд╢рдирдЯреЛрдХрди рдПрдХ рдРрд╕рд╛ рд╡рд░реНрдЧ рд╣реИ, рдЬрд┐рд╕рдХрд╛ рдСрдмреНрдЬреЗрдХреНрдЯ рдПрд╕рд┐рдВрдХреНрд░реЛрдирд╕ рдСрдкрд░реЗрд╢рди рдХреЛ рдЬрд╛рдирдХрд╛рд░реА рдкреНрд░рджрд╛рди рдХрд░рддрд╛ рд╣реИ рдЬрд┐рд╕реЗ рдЗрд╕реЗ рд░рджреНрдж рдХрд░ рджрд┐рдпрд╛ рдЧрдпрд╛ рд╣реИред рдпрд╣ рд╕реНрдЯрд╛рд░реНрдЯрдЕрдк рдкрд░ рдПрдХ рдЕрддреБрд▓реНрдпрдХрд╛рд▓рд┐рдХ рдСрдкрд░реЗрд╢рди рдореЗрдВ рдЦреБрдж рдХреЛ рдлреЗрдВрдХрддрд╛ рд╣реИ, рдФрд░ рдпрд╣ рдСрдкрд░реЗрд╢рди рдЦреБрдж рдЕрдкрдиреА рд╕реНрдерд┐рддрд┐ рдХреЛ рджреЗрдЦрддрд╛ рд╣реИред рдФрд░ рд░рд╛рдЬреНрдп рдмрд╛рд╣рд░ рд╕реЗ рдмрджрд▓ рдЬрд╛рддрд╛ рд╣реИред
рд╕рдВрдХреНрд╖реЗрдк рдореЗрдВ, рд╣рдореЗрдВ рдЕрдкрдиреЗ рдкреНрд░рд╡рд╛рд╣ рдФрд░ рдлреНрд▓реЛрдХреЛрд▓реЗрдХреНрдЯрд░ рдореЗрдВ рдХреИрдВрд╕реЗрд▓реЗрд╢рдирдЯреЛрдХрди рдХреЛ рдлреЗрдВрдХрдиреЗ рдХреА рдЖрд╡рд╢реНрдпрдХрддрд╛ рд╣реИред
public interface IFlow<out T> { Task Collect(IFlowCollector<T> collector, CancellationToken cancellationToken = default); } public interface IFlowCollector<in T> { Task Emit(T item, CancellationToken cancellationToken = default); }
рдореИрдВ рдпрд╣рд╛рдВ рдХрд╛рд░реНрдпрд╛рдиреНрд╡рдпрди рдХреЛ рдирд╣реАрдВ рдЪрд┐рдкрдХрд╛рдКрдВрдЧрд╛ - рдЬреАрдердм рджреЗрдЦреЗрдВред
рдкрд░реАрдХреНрд╖рдг рдЕрдм рдЗрд╕ рддрд░рд╣ рджрд┐рдЦреЗрдЧрд╛:
var cts = new CancellationTokenSource(); var flowTask = Flow<int>(async (collector, cancellationToken) => { await collector.Emit(1); await Task.Delay(2000, cancellationToken); await collector.Emit(2); await Task.Delay(2000, cancellationToken); await collector.Emit(3); }) .Collect(item => Log(item), cts.Token); var cancelationTask = Task.Run(async () => { await Task.Delay(3000); cts.Cancel(); }); await flowTask;
рдмрд╛рдд рдпрд╣ рд╣реИред рдлреНрд▓реЛ рдХреЗ рд╕рдорд╛рдирд╛рдВрддрд░, рд╣рдо рдПрдХ рдСрдкрд░реЗрд╢рди рд╢реБрд░реВ рдХрд░рддреЗ рд╣реИрдВ рдЬреЛ 3 рд╕реЗрдХрдВрдб рдХреЗ рдмрд╛рдж рдЗрд╕реЗ рд░рджреНрдж рдХрд░ рджреЗрдЧрд╛ред рдирддреАрдЬрддрди, рдлреНрд▓реЛ рддреАрд╕рд░реЗ рддрддреНрд╡ рдХрд╛ рдЙрддреНрд╕рд░реНрдЬрди рдХрд░рдиреЗ рдХрд╛ рдкреНрд░рдмрдВрдзрди рдирд╣реАрдВ рдХрд░рддрд╛ рд╣реИ рдФрд░ рдЯрд╛рд╕реНрдХрдХреИрдиреНрд▓реЗрдб тАЛтАЛрдЕрдкрд╡рд╛рдж рдХреЗ рд╕рд╛рде рд╕рдорд╛рдкреНрдд рд╣реЛрддрд╛ рд╣реИ, рдЬреЛ рдЖрд╡рд╢реНрдпрдХ рд╡реНрдпрд╡рд╣рд╛рд░ рд╣реИред
рдереЛрдбрд╝рд╛ рд╕рд╛ рдЕрднреНрдпрд╛рд╕
рдЖрдЗрдП рдкреНрд░рдпреЛрдЧ рдХрд░рдиреЗ рдХрд╛ рдкреНрд░рдпрд╛рд╕ рдХрд░реЗрдВ рдХрд┐ рд╡реНрдпрд╡рд╣рд╛рд░ рдореЗрдВ рдХреНрдпрд╛ рд╣реБрдЖред рдЙрджрд╛рд╣рд░рдг рдХреЗ рд▓рд┐рдП, рд╣рдорд╛рд░реЗ рдлрд╝реНрд▓реЛ рдореЗрдВ рдХреБрдЫ рдИрд╡реЗрдВрдЯ рд▓рдкреЗрдЯреЗрдВред Rx.Net рдореЗрдВ рдЗрд╕рдХреЗ рд▓рд┐рдП рдПрдХ рдкреБрд╕реНрддрдХрд╛рд▓рдп рд╡рд┐рдзрд┐ FromEventPattern рднреА рд╣реИред
рд╡рд╛рд╕реНрддрд╡рд┐рдХ UI рдХреЗ рд╕рд╛рде рдЧрдбрд╝рдмрдбрд╝ рди рдХрд░рдиреЗ рдХреЗ рд▓рд┐рдП, рдореИрдВрдиреЗ ClicksEmulator рд╡рд░реНрдЧ рд▓рд┐рдЦрд╛, рдЬреЛ рдпрд╛рджреГрдЪреНрдЫрд┐рдХ рдЕрдВрддрд░рд╛рд▓ рдкрд░ рд╕рд╢рд░реНрдд рдорд╛рдЙрд╕ рдХреНрд▓рд┐рдХ рдЙрддреНрдкрдиреНрди рдХрд░рддрд╛ рд╣реИред
public class ClicksEmulator { public enum Button { Left, Right } public class ClickEventArgs : EventArgs {
рдореИрдВрдиреЗ рдХрд╛рд░реНрдпрд╛рдиреНрд╡рдпрди рдХреЛ рдЫреЛрдбрд╝ рджрд┐рдпрд╛ рд╡рд╣ рдпрд╣рд╛рдВ рдмрд╣реБрдд рдорд╣рддреНрд╡рдкреВрд░реНрдг рдирд╣реАрдВ рд╣реИред рдореБрдЦреНрдп рдмрд╛рдд рдИрд╡реЗрдВрдЯ рдмрдЯрдирдХреНрд▓рд┐рдХ рд╣реИ, рдЬрд┐рд╕реЗ рд╣рдо рдлреНрд▓реЛ рдореЗрдВ рдмрджрд▓рдирд╛ рдЪрд╛рд╣рддреЗ рд╣реИрдВред рдЗрд╕рдХреЗ рд▓рд┐рдП рд╣рдо рдПрдХ рд╡рд┐рд╕реНрддрд╛рд░ рд╡рд┐рдзрд┐ рд▓рд┐рдЦрддреЗ рд╣реИрдВ
public static IFlow<ClicksEmulator.ClickEventArgs> Clicks(this ClicksEmulator emulator) { return FlowFactory.Flow<ClicksEmulator.ClickEventArgs>(async (collector, cancellationToken) => { void clickHandler(object sender, ClicksEmulator.ClickEventArgs args) => collector.Emit(args); emulator.ButtonClick += clickHandler; cancellationToken.Register(() => { emulator.ButtonClick -= clickHandler; }); await Task.Delay(-1, cancellationToken); }); }
рд╕рдмрд╕реЗ рдкрд╣рд▓реЗ, рд╣рдо рдПрдХ рдИрд╡реЗрдВрдЯ рд╣реИрдВрдбрд▓рд░ рдШреЛрд╖рд┐рдд рдХрд░рддреЗ рд╣реИрдВ, рдЬреЛ рдХрд▓реЗрдХреНрдЯрд░ рдХреЗ рдкрд╛рд╕ рдИрд╡реЗрдВрдЯ рдХреЗ рддрд░реНрдХ рдХреЗ рдЕрд▓рд╛рд╡рд╛ рдХреБрдЫ рдирд╣реАрдВ рдХрд░рддрд╛ рд╣реИред рддрдм рд╣рдо рдИрд╡реЗрдВрдЯ рд░рджреНрдж рдХрд░рдиреЗ рдФрд░ рдкреНрд░рд╡рд╛рд╣ рдХреЛ рд░рджреНрдж рдХрд░рдиреЗ (рдкреВрд░рд╛ рд╣реЛрдиреЗ) рдХреЗ рдорд╛рдорд▓реЗ рдореЗрдВ рдПрдХ рд╕рджрд╕реНрдпрддрд╛ рд╕рдорд╛рдкреНрдд рдХрд░рдиреЗ рдХреЗ рд▓рд┐рдП рд╕рджрд╕реНрдпрддрд╛ рд▓реЗрддреЗ рд╣реИрдВред рдареАрдХ рд╣реИ, рддреЛ рд╣рдо рдЕрдВрддрд╣реАрди рдЗрдВрддрдЬрд╛рд░ рдХрд░рддреЗ рд╣реИрдВ рдФрд░ рд░рджреНрдж рдХрд░рдиреЗ рддрдХ рдмрдЯрди рдХреНрд▓рд┐рдХ рдХрд░реЗрдВ рдШрдЯрдирд╛рдУрдВ рдХреЛ рд╕реБрдиреЗрдВред
рдпрджрд┐ рдЖрдкрдиреЗ рдХреЙрд▓рдмреИрдХрдлреНрд▓реЛ рдпрд╛ рдЪреИрдирд▓рдлреНрд▓реЛ рдХрд╛ рдЙрдкрдпреЛрдЧ рдХреЛрдЯрд▓рд┐рди рдореЗрдВ рдХрд┐рдпрд╛ рд╣реИ рдпрд╛ рдЖрд░рдПрдХреНрд╕ рдореЗрдВ рд╢реНрд░реЛрддрд╛рдУрдВ рд╕реЗ рдардВрдбрд╛ рдСрдмреНрдЬрд░реНрд╡реЗрдмрд▓ рдмрдирд╛рдпрд╛ рд╣реИ, рддреЛ рдЖрдк рджреЗрдЦреЗрдВрдЧреЗ рдХрд┐ рдХреЛрдб рд╕рдВрд░рдЪрдирд╛ рд╕рднреА рдорд╛рдорд▓реЛрдВ рдореЗрдВ рдмрд╣реБрдд рд╕рдорд╛рди рд╣реИред рдпрд╣ рдареАрдХ рд╣реИ, рд▓реЗрдХрд┐рди рд╕рд╡рд╛рд▓ рдЙрдарддрд╛ рд╣реИ - рдЗрд╕ рдорд╛рдорд▓реЗ рдореЗрдВ рдПрдХ рдХреНрд░реВрдб рдШрдЯрдирд╛ рдХреА рддреБрд▓рдирд╛ рдореЗрдВ рдлреНрд▓реЛ рд╕реЗ рдмреЗрд╣рддрд░ рдХреНрдпрд╛ рд╣реИ? рдЬреЗрдЯ рд╕реНрдЯреНрд░реАрдо рдХреА рд╕рд╛рд░реА рд╢рдХреНрддрд┐ рдЙрди рдСрдкрд░реЗрдЯрд░реЛрдВ рдореЗрдВ рдирд┐рд╣рд┐рдд рд╣реИ рдЬреЛ рдЙрди рдкрд░ рд╡рд┐рднрд┐рдиреНрди рдкрд░рд┐рд╡рд░реНрддрди рдХрд░рддреЗ рд╣реИрдВ: рдлрд╝рд┐рд▓реНрдЯрд░рд┐рдВрдЧ, рдореИрдкрд┐рдВрдЧ, рдФрд░ рдХрдИ рдЕрдиреНрдп, рдЕрдзрд┐рдХ рдЬрдЯрд┐рд▓ред рд▓реЗрдХрд┐рди рд╣рдорд╛рд░реЗ рдкрд╛рд╕ рдЙрдиреНрд╣реЗрдВ рдЕрднреА рддрдХ рдирд╣реАрдВ рд╣реИред рдЖрдЗрдП рдЗрд╕рдХреЗ рдмрд╛рд░реЗ рдореЗрдВ рдХреБрдЫ рдХрд░рдиреЗ рдХреА рдХреЛрд╢рд┐рд╢ рдХрд░реЗрдВред
рдлрд╝рд┐рд▓реНрдЯрд░, рдорд╛рдирдЪрд┐рддреНрд░, OnNext
рдЖрдЗрдП рд╕рдмрд╕реЗ рд╕рд░рд▓ рдСрдкрд░реЗрдЯрд░реЛрдВ рдореЗрдВ рд╕реЗ рдПрдХ рдХреЗ рд╕рд╛рде рд╢реБрд░реВ рдХрд░реЗрдВ - рдлрд╝рд┐рд▓реНрдЯрд░ред рдЬреИрд╕рд╛ рдХрд┐ рдирд╛рдо рд╕реЗ рд╣реА рд╕реНрдкрд╖реНрдЯ рд╣реИ рдХрд┐ рдпрд╣ рджрд┐рдП рдЧрдП рд╡рд┐рдзреЗрдп рдХреЗ рдЕрдиреБрд╕рд╛рд░ рдкреНрд░рд╡рд╛рд╣ рддрддреНрд╡реЛрдВ рдХреЛ рдлрд╝рд┐рд▓реНрдЯрд░ рдХрд░реЗрдЧрд╛ред рдпрд╣ рдореВрд▓ рдкреНрд░рд╡рд╛рд╣ рдФрд░ рдХреЗрд╡рд▓ рдлрд╝рд┐рд▓реНрдЯрд░реНрдб рддрддреНрд╡реЛрдВ рдХреЗ рд╕рд╛рде рд╡рд╛рдкрд╕реА рдкреНрд░рд╡рд╛рд╣ рдХреЗ рд▓рд┐рдП рд▓рд╛рдЧреВ рдПрдХ рд╡рд┐рд╕реНрддрд╛рд░ рд╡рд┐рдзрд┐ рд╣реЛрдЧреАред рдпрд╣ рдкрддрд╛ рдЪрд▓рд╛ рд╣реИ рдХрд┐ рд╣рдореЗрдВ рдореВрд▓ рдкреНрд░рд╡рд╛рд╣ рд╕реЗ рдкреНрд░рддреНрдпреЗрдХ рддрддреНрд╡ рдХреЛ рд▓реЗрдиреЗ рдХреА рдЬрд░реВрд░рдд рд╣реИ, рдЬрд╛рдВрдЪ рдХрд░реЗрдВ рдФрд░ рдЖрдЧреЗ рдирд┐рдХрд▓ рдЬрд╛рдПрдВ рдпрджрд┐ рд╡рд┐рдзреЗрдп рд╕рд╣реА рд╣реИред рддреЛ рдЪрд▓рд┐рдП рдЗрд╕реЗ рдХрд░рддреЗ рд╣реИрдВ:
public static IFlow<T> Filter<T>(this IFlow<T> source, Func<T, bool> predicate) => FlowFactory.Flow<T>((collector, cancellationToken) => source.Collect(item => { if (predicate(item)) collector.Emit(item); }, cancellationToken) );
рдЕрдм, рдпрджрд┐ рд╣рдореЗрдВ рдХреЗрд╡рд▓ рдмрд╛рдИрдВ рдорд╛рдЙрд╕ рдмрдЯрди рдкрд░ рдХреНрд▓рд┐рдХ рдХрд░рдирд╛ рд╣реИ, рддреЛ рд╣рдо рдЗрд╕реЗ рд▓рд┐рдЦ рд╕рдХрддреЗ рд╣реИрдВ:
emulator .Clicks() .Filter(click => click.Button == ClicksEmulator.Button.Left) .Collect(item => Log($"{item.Button} {item.X} {item.Y}"), cts.Token);
рд╕рд╛рджреГрд╢реНрдп рд╕реЗ, рд╣рдо рдСрдкрд░реЗрдЯрд░реЛрдВ рдорд╛рдирдЪрд┐рддреНрд░ рдФрд░ OnNext рд▓рд┐рдЦрддреЗ рд╣реИрдВред рдкрд╣рд▓реЗ рдкрд╛рд╕ рдХрд┐рдП рдЧрдП рдореИрдкрд░ рдлрд╝рдВрдХреНрд╢рди рдХрд╛ рдЙрдкрдпреЛрдЧ рдХрд░рдХреЗ рдореВрд▓ рдкреНрд░рд╡рд╛рд╣ рдХреЗ рдкреНрд░рддреНрдпреЗрдХ рддрддреНрд╡ рдХреЛ рджреВрд╕рд░реЗ рдореЗрдВ рдкрд░рд┐рд╡рд░реНрддрд┐рдд рдХрд░рддрд╛ рд╣реИред рджреВрд╕рд░рд╛ рдореВрд▓ рдХреЗ рд╕рдорд╛рди рддрддреНрд╡реЛрдВ рдХреЗ рд╕рд╛рде рд╡рд╛рдкрд╕ рдЖ рдЬрд╛рдПрдЧрд╛, рд▓реЗрдХрд┐рди рдкреНрд░рддреНрдпреЗрдХ рдкрд░ рдПрдХ рдХрд╛рд░реНрд░рд╡рд╛рдИ рдХрд░ рд░рд╣рд╛ рд╣реИ (рдЖрдорддреМрд░ рдкрд░ рд▓реЙрдЧрд┐рдВрдЧ)ред
public static IFlow<R> Map<T, R>(this IFlow<T> source, Func<T, R> mapper) => FlowFactory.Flow<R>((collector, cancellationToken) => source.Collect( item => collector.Emit(mapper(item)), cancellationToken ) ); public static IFlow<T> OnNext<T>(this IFlow<T> source, Action<T> action) => FlowFactory.Flow<T>((collector, cancellationToken) => source.Collect(item => { action(item); collector.Emit(item); }, cancellationToken) );
рдФрд░ рдЙрдкрдпреЛрдЧ рдХрд╛ рдПрдХ рдЙрджрд╛рд╣рд░рдг:
emulator .Clicks() .OnNext(click => Log($"{click.Button} {click.X} {click.Y}")) .Map(click => click.Button == ClicksEmulator.Button.Left ? 0 : 1) .Collect(item => Log($"{item}"), cts.Token);
рд╕рд╛рдорд╛рдиреНрдп рддреМрд░ рдкрд░, рдЬреЗрдЯ рд╕реНрдЯреНрд░реАрдо рдХреЗ рд▓рд┐рдП рдмрд╣реБрдд рд╕рд╛рд░реЗ рдСрдкрд░реЗрдЯрд░реЛрдВ рдХрд╛ рдЖрд╡рд┐рд╖реНрдХрд╛рд░ рдХрд┐рдпрд╛ рдЧрдпрд╛ рдерд╛, рдЙрдиреНрд╣реЗрдВ рдкрд╛рдпрд╛ рдЬрд╛ рд╕рдХрддрд╛ рд╣реИ, рдЙрджрд╛рд╣рд░рдг рдХреЗ рд▓рд┐рдП, рдпрд╣рд╛рдВ ред
рдФрд░ рдХреБрдЫ рднреА рдирд╣реАрдВ IF IF рдХреЗ рд▓рд┐рдП рдЙрдирдореЗрдВ рд╕реЗ рдХрд┐рд╕реА рдХреЛ рднреА рд▓рд╛рдЧреВ рдХрд░рдиреЗ рд╕реЗ рд░реЛрдХрддрд╛ рд╣реИред
Rx.Net рд╕реЗ рдкрд░рд┐рдЪрд┐рдд рд▓реЛрдЧ рдЬрд╛рдирддреЗ рд╣реИрдВ рдХрд┐ IObservable рдХреЗ рд▓рд┐рдП рдирдП рдФрд░ рд╡рд┐рд╢рд┐рд╖реНрдЯ рдСрдкрд░реЗрдЯрд░реЛрдВ рдХреЗ рдЕрд▓рд╛рд╡рд╛, Linq-to-рдСрдмреНрдЬреЗрдХреНрдЯреНрд╕ рдХреА рдПрдХреНрд╕рдЯреЗрдВрд╢рди рд╡рд┐рдзрд┐рдпреЛрдВ рдХрд╛ рдЙрдкрдпреЛрдЧ рдХрд┐рдпрд╛ рдЬрд╛рддрд╛ рд╣реИ, рдФрд░ рдпрд╣ рдЖрдкрдХреЛ рдзрд╛рд░рд╛рдУрдВ рдХреЛ "рдШрдЯрдирд╛рдУрдВ рдХреЗ рд╕рдВрдЧреНрд░рд╣" рдХреЗ рд░реВрдк рдореЗрдВ рд╡рд┐рдЪрд╛рд░ рдХрд░рдиреЗ рдФрд░ рд╕рд╛рдорд╛рдиреНрдп Linq рдХреЗ рд╕рд╛рде рд╣реЗрд░рдлреЗрд░ рдХрд░рдиреЗ рдХреА рдЕрдиреБрдорддрд┐ рджреЗрддрд╛ рд╣реИред -methodsред рдХреНрдпреЛрдВ, рдЦреБрдж рдмрдпрд╛рди рд▓рд┐рдЦрдиреЗ рдХреЗ рдмрдЬрд╛рдп, рдЗрдлреНрд▓реЛ рдХреЛ рд▓рд┐рдирдХ рд░реЗрд▓ рдкрд░ рд░рдЦрдиреЗ рдХреА рдХреЛрд╢рд┐рд╢ рди рдХрд░реЗрдВ?
IAsyncEnumerable
C # 8 рдореЗрдВ, IEnumerable рдХрд╛ рдПрдХ рдЕрддреБрд▓реНрдпрдХрд╛рд▓рд┐рдХ рд╕рдВрд╕реНрдХрд░рдг рдкреЗрд╢ рдХрд┐рдпрд╛ рдЧрдпрд╛ рдерд╛ - IAsyncEnumerable - рдПрдХ рд╕рдВрдЧреНрд░рд╣ рдЗрдВрдЯрд░рдлрд╝реЗрд╕ рдЬрд┐рд╕реЗ рдЕрддреБрд▓реНрдпрдХрд╛рд▓рд┐рдХ рд░реВрдк рд╕реЗ рдкреБрдирд░рд╛рд╡реГрддреНрдд рдХрд┐рдпрд╛ рдЬрд╛ рд╕рдХрддрд╛ рд╣реИред IAsyncEnumerable рдФрд░ рдкреНрд░рддрд┐рдХреНрд░рд┐рдпрд╛рд╢реАрд▓ рдзрд╛рд░рд╛рдУрдВ (IObservable рдФрд░ IFlow) рдХреЗ рдмреАрдЪ рдореВрд▓рднреВрдд рдЕрдВрддрд░ рдпрд╣ рд╣реИред IAsyncEnumerable, IEnumerable рдХреА рддрд░рд╣, рдПрдХ рдкреБрд▓ рдореЙрдбрд▓ рд╣реИред рд╣рдо рдЗрд╕ рд╕рдВрдЧреНрд░рд╣ рдкрд░ рдкреБрдирд░рд╛рд╡реГрддреНрддрд┐ рдХрд░рддреЗ рд╣реИрдВ рдХрд┐ рд╣рдореЗрдВ рдХрд┐рддрдиреА рдФрд░ рдХрдм рдЬрд░реВрд░рдд рд╣реИ рдФрд░ рд╣рдо рдЦреБрдж рд╕реЗ рддрддреНрд╡реЛрдВ рдХреЛ рдЖрдХрд░реНрд╖рд┐рдд рдХрд░рддреЗ рд╣реИрдВред рдзрд╛рд░рд╛рдПрдБ рдзрдХреНрдХрд╛ рд╣реИрдВред рд╣рдо рдЖрдпреЛрдЬрдиреЛрдВ рдХреА рд╕рджрд╕реНрдпрддрд╛ рд▓реЗрддреЗ рд╣реИрдВ рдФрд░ рдЬрдм рд╡реЗ рдЖрддреЗ рд╣реИрдВ рддреЛ рдЙрдиреНрд╣реЗрдВ "рдкреНрд░рддрд┐рдХреНрд░рд┐рдпрд╛" рджреЗрддреЗ рд╣реИрдВ - рдЗрд╕рдХреЗ рд▓рд┐рдП рд╡реЗ рдкреНрд░рддрд┐рдХреНрд░рд┐рдпрд╛рд╢реАрд▓ рд╣реЛрддреЗ рд╣реИрдВред рд╣рд╛рд▓рд╛рдВрдХрд┐, рдкреБрд▓ рдореЙрдбрд▓ рд╕реЗ рдзрдХреНрдХрд╛-рдЬреИрд╕рд╛ рд╡реНрдпрд╡рд╣рд╛рд░ рдкреНрд░рд╛рдкреНрдд рдХрд┐рдпрд╛ рдЬрд╛ рд╕рдХрддрд╛ рд╣реИред рдЗрд╕реЗ рд▓реЙрдиреНрдЧ рдкреЛрд▓рд┐рдВрдЧ https://en.wikipedia.org/wiki/Push_technology#Long_polling рдХрд╣рд╛ рдЬрд╛рддрд╛ рд╣реИред рд╕рд╛рд░ рдпрд╣ рд╣реИ: рд╣рдо рд╕рдВрдЧреНрд░рд╣ рдкрд░ рдкреБрдирд░рд╛рд╡реГрддрд┐ рдХрд░рддреЗ рд╣реИрдВ, рдЗрд╕рдХреЗ рдЕрдЧрд▓реЗ рддрддреНрд╡ рдХрд╛ рдЕрдиреБрд░реЛрдз рдХрд░рддреЗ рд╣реИрдВ рдФрд░ рдЬрдм рддрдХ рд╣рдо рдЗрд╕реЗ рдкрд╕рдВрдж рдХрд░рддреЗ рд╣реИрдВ, рддрдм рддрдХ рдкреНрд░рддреАрдХреНрд╖рд╛ рдХрд░рддреЗ рд╣реИрдВ, рдЬрдм рддрдХ рдХрд┐ рд╕рдВрдЧреНрд░рд╣ рд╣рдореЗрдВ рд╡рд╛рдкрд╕ рдирд╣реАрдВ рдХрд░рддрд╛, рдЕрд░реНрдерд╛рддред рдЕрдЧрд▓реА рдШрдЯрдирд╛ рдЖрдиреЗ рддрдХред IAsyncEnumerable, IEnumerable рдХреЗ рд╡рд┐рдкрд░реАрдд, рд╣рдореЗрдВ рдЕрддреБрд▓реНрдпрдХрд╛рд▓рд┐рдХ рд░реВрдк рд╕реЗ рдкреНрд░рддреАрдХреНрд╖рд╛ рдХрд░рдиреЗ рдХреА рдЕрдиреБрдорддрд┐ рджреЗрдЧрд╛ред рд╕рдВрдХреНрд╖реЗрдк рдореЗрдВ, рд╣рдореЗрдВ рдХрд┐рд╕реА рддрд░рд╣ рд╕реЗ рдЗрдлреНрд▓реЛ рдкрд░ IAsyncEnumerable рдЦреАрдВрдЪрдиреЗ рдХреА рдЬрд░реВрд░рдд рд╣реИред
рдЬреИрд╕рд╛ рдХрд┐ рдЖрдк рдЬрд╛рдирддреЗ рд╣реИрдВ, IAsyncEnumerator рдЗрдВрдЯрд░рдлрд╝реЗрд╕ IAsyncEnumerable рд╕рдВрдЧреНрд░рд╣ рдХреЗ рд╡рд░реНрддрдорд╛рди рддрддреНрд╡ рдХреЛ рд╡рд╛рдкрд╕ рдХрд░рдиреЗ рдФрд░ рдЕрдЧрд▓реЗ рддрддреНрд╡ рдкрд░ рдЬрд╛рдиреЗ рдХреЗ рд▓рд┐рдП рдЬрд┐рдореНрдореЗрджрд╛рд░ рд╣реИред рдЗрд╕ рдорд╛рдорд▓реЗ рдореЗрдВ, рд╣рдореЗрдВ IFlow рд╕реЗ рддрддреНрд╡реЛрдВ рдХреЛ рд▓реЗрдиреЗ рдХреА рдЖрд╡рд╢реНрдпрдХрддрд╛ рд╣реИ, рдФрд░ IFlowCollector рдРрд╕рд╛ рдХрд░рддрд╛ рд╣реИред рдпрд╣ рдкрддрд╛ рдЪрд▓рд╛ рд╣реИ рдХрд┐ рдПрдХ рд╡рд╕реНрддреБ рд╣реИ рдЬреЛ рдЗрди рдЗрдВрдЯрд░рдлреЗрд╕ рдХреЛ рд▓рд╛рдЧреВ рдХрд░рддреА рд╣реИ:
internal class FlowCollectorEnumerator<T> : IFlowCollector<T>, IAsyncEnumerator<T> { private readonly SemaphoreSlim _backpressureSemaphore = new SemaphoreSlim(0, 1); private readonly SemaphoreSlim _longPollingSemaphore = new SemaphoreSlim(0, 1); private bool _isFinished; public T Current { get; private set; } public async ValueTask DisposeAsync() { } public async Task Emit(T item, CancellationToken cancellationToken) { await _backpressureSemaphore.WaitAsync(cancellationToken); Current = item; _longPollingSemaphore.Release(); } public async Task Finish() { await _backpressureSemaphore.WaitAsync(); _isFinished = true; _longPollingSemaphore.Release(); } public async ValueTask<bool> MoveNextAsync() { _backpressureSemaphore.Release(); await _longPollingSemaphore.WaitAsync(); return !_isFinished; } }
рдпрд╣рд╛рдБ рдореБрдЦреНрдп рд╡рд┐рдзрд┐рдпрд╛рдБ Emit , Finish, рдФрд░ MoveNextAsync рд╣реИрдВ ред
рд╢реБрд░реБрдЖрдд рдореЗрдВ рдПрдорд┐рдЯ рдЙрд╕ рдкрд▓ рдХреА рдкреНрд░рддреАрдХреНрд╖рд╛ рдХрд░ рд░рд╣рд╛ рд╣реИ рдЬрдм рд╕рдВрдЧреНрд░рд╣ рд╕реЗ рдЕрдЧрд▓реЗ рдЖрдЗрдЯрдо рдХрд╛ рдЕрдиреБрд░реЛрдз рдХрд┐рдпрд╛ рдЬрд╛рдПрдЧрд╛ред рдпрд╛рдиреА рдХрд┐рд╕реА рд╡рд╕реНрддреБ рдХрд╛ рддрдм рддрдХ рдЙрддреНрд╕рд░реНрдЬрди рдирд╣реАрдВ рдХрд░рддрд╛, рдЬрдм рддрдХ рдХрд┐ рдЙрд╕рдХреА рдЬрд░реВрд░рдд рди рд╣реЛред рдЗрд╕реЗ рдмреИрдХрд╕реНрдкреЗрд╕ рдХрд╣рд╛ рдЬрд╛рддрд╛ рд╣реИ, рдЗрд╕рд▓рд┐рдП рд╕реЗрдорд╛рдлреЛрд░ рдХрд╛ рдирд╛рдоред рдлрд┐рд░ рд╡рд░реНрддрдорд╛рди рдЖрдЗрдЯрдо рд╕реЗрдЯ рдХрд┐рдпрд╛ рдЧрдпрд╛ рд╣реИ рдФрд░ рдпрд╣ рдмрддрд╛рдпрд╛ рдЧрдпрд╛ рд╣реИ рдХрд┐ рдПрдХ рд▓рдВрдмреЗ рдорддрджрд╛рди рдЕрдиреБрд░реЛрдз рдХрд╛ рдкрд░рд┐рдгрд╛рдо рдорд┐рд▓ рд╕рдХрддрд╛ рд╣реИред
MoveNextAsync рдХреЛ рдЙрд╕ рд╕рдордп рдХрд╣рд╛ рдЬрд╛рддрд╛ рд╣реИ рдЬрдм рд╕рдВрдЧреНрд░рд╣ рд╕реЗ рдХреЛрдИ рдЕрдиреНрдп рдЖрдЗрдЯрдо рдЦреАрдВрдЪрд╛ рдЬрд╛рддрд╛ рд╣реИред рд╡рд╣ _backpressureSemaphore рдЬрд╛рд░реА рдХрд░рддрд╛ рд╣реИ рдФрд░ рдЕрдЧрд▓реЗ рддрддреНрд╡ рдХреЛ рдЯреНрд░рд┐рдЧрд░ рдХрд░рдиреЗ рдХреЗ рд▓рд┐рдП рдлрд╝реНрд▓реЛ рдХрд╛ рдЗрдВрддрдЬрд╛рд░ рдХрд░рддрд╛ рд╣реИред рдлрд┐рд░ рдпрд╣ рдПрдХ рд╕рдВрдХреЗрдд рджреЗрддрд╛ рд╣реИ рдХрд┐ рд╕рдВрдЧреНрд░рд╣ рд╕рдорд╛рдкреНрдд рд╣реЛ рдЧрдпрд╛ рд╣реИред рдпрд╣ рдзреНрд╡рдЬ рдлрд┐рдирд┐рд╢ рд╡рд┐рдзрд┐ рд╕реЗрдЯ рдХрд░рддрд╛ рд╣реИред
рд╕рдорд╛рдкреНрдд рдПрдорд┐рдЯ рдХреЗ рд╕рдорд╛рди рд╕рд┐рджреНрдзрд╛рдВрдд рдкрд░ рдХрд╛рдо рдХрд░рддрд╛ рд╣реИ, рдХреЗрд╡рд▓ рдЕрдЧрд▓реЗ рддрддреНрд╡ рдХреЗ рдмрдЬрд╛рдп рдпрд╣ рд╕рдВрдЧреНрд░рд╣ рдХреЗ рдЕрдВрдд рдХрд╛ рд╕рдВрдХреЗрдд рд╕реЗрдЯ рдХрд░рддрд╛ рд╣реИред
рдЕрдм рд╣рдореЗрдВ рдЗрд╕ рд╡рд░реНрдЧ рдХрд╛ рдЙрдкрдпреЛрдЧ рдХрд░рдиреЗ рдХреА рдЖрд╡рд╢реНрдпрдХрддрд╛ рд╣реИред
public static class AsyncEnumerableExtensions { public static IAsyncEnumerable<T> CollectEnumerable<T>(this IFlow<T> flow, CancellationToken cancellationToken = default) { var collector = new FlowCollectorEnumerator<T>(); flow .Collect(collector, cancellationToken) .ContinueWith(_ => collector.Finish(), cancellationToken); return new FlowEnumerableAdapter<T>(collector); } } internal class FlowEnumerableAdapter<T> : IAsyncEnumerable<T> { private readonly IAsyncEnumerator<T> _enumerator; public FlowEnumerableAdapter(IAsyncEnumerator<T> enumerator) { _enumerator = enumerator; } public IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken cancellationToken = default) { return _enumerator; } }
IFlow рдХреЗ рд▓рд┐рдП ExtensionEnumerable рд╡рд┐рд╕реНрддрд╛рд░ рд╡рд┐рдзрд┐ рдПрдХ FlowCollectorEnumerator рдмрдирд╛рддрд╛ рд╣реИ рдФрд░ рдЙрд╕ рдкрд░ рдПрдХ рдкреНрд░рд╡рд╛рд╣ рдкрд░ рд╣рд╕реНрддрд╛рдХреНрд╖рд░ рдХрд░рддрд╛ рд╣реИ, рдЬрд┐рд╕рдХреЗ рдмрд╛рдж рд╕рдорд╛рдкреНрдд () рд╡рд┐рдзрд┐ рдХрд╣рд╛ рдЬрд╛рддрд╛ рд╣реИред рдФрд░ рдпрд╣ рдПрдХ FlowEnumerableAdapter рдХреЛ рд▓реМрдЯрд╛рддрд╛ рд╣реИ, рдЬреЛ рдХрд┐ IEAsumerator рдХреЗ рд░реВрдк рдореЗрдВ FlowCollectorEnumerator рдХрд╛ рдЙрдкрдпреЛрдЧ рдХрд░рдХреЗ IAsyncEnumerable рдХрд╛ рд╕рдмрд╕реЗ рд╕рд░рд▓ рдХрд╛рд░реНрдпрд╛рдиреНрд╡рдпрди рд╣реИред
рд╣рдо рдХреЛрд╢рд┐рд╢ рдХрд░рддреЗ рд╣реИрдВ рдХрд┐ рдХреНрдпрд╛ рд╣реБрдЖред
var clicks = emulator .Clicks() .OnNext(item => Log($"{item.Button} {item.X} {item.Y}")) .CollectEnumerable(cts.Token) .Where(click => click.Button == ClicksEmulator.Button.Right) .Select(click => click.Y < 540 ? "TOP" : "LEFT"); await foreach (var click in clicks) { Log($"Clicked at: {click}"); }
рдпрд╣рд╛рдВ рд╣рдореЗрдВ рдлрд╝реНрд▓реЛ рдХреНрд▓рд┐рдХ рдорд┐рд▓рддреЗ рд╣реИрдВ (), рдкреНрд░рддреНрдпреЗрдХ рдХреНрд▓рд┐рдХ рдХреЛ рд▓реЙрдЧ рдЗрди рдХрд░реЗрдВ, рдлрд┐рд░ IFAs рдХреЛ IAsyncEnumerable рдореЗрдВ рдмрджрд▓ рджреЗрдВред рдлрд┐рд░ рдпрд╣ рдкреНрд░рд╕рд┐рджреНрдз рд▓рд┐рдирдХ-рдСрдкрд░реЗрдЯрд░реЛрдВ рдкрд░ рд▓рд╛рдЧреВ рд╣реЛрддрд╛ рд╣реИ: рд╣рдо рдХреЗрд╡рд▓ рд░рд╛рдЗрдЯ-рдХреНрд▓рд┐рдХ рдЫреЛрдбрд╝рддреЗ рд╣реИрдВ рдФрд░ рд╣рдо рд╕реНрдХреНрд░реАрди рдХреЗ рдХрд┐рд╕ рд╣рд┐рд╕реНрд╕реЗ рдореЗрдВ рдмрдиреЗ рд╣реЛрддреЗ рд╣реИрдВред
рдЕрдЧрд▓рд╛, рдПрдХ рдЙрджрд╛рд╣рд░рдг рдкрд░ рдЕрдзрд┐рдХ рдЬрдЯрд┐рд▓ рд╡рд┐рдЪрд╛рд░ рдХрд░реЗрдВред рд╣рдо рд░рд╛рдЗрдЯ рдХреНрд▓рд┐рдХ рдХреЛ рдбрдмрд▓ рд▓реЗрдлреНрдЯ рд╡рд╛рд▓реЗ рд╕реЗ рдмрджрд▓ рджреЗрдВрдЧреЗред рдпрд╛рдиреА рд╣рдореЗрдВ рдкреНрд░рддреНрдпреЗрдХ рддрддреНрд╡ рдХреЛ рдХрд┐рд╕реА рджреВрд╕рд░реЗ рд╕реЗ рдирд╣реАрдВ, рдмрд▓реНрдХрд┐ рд╕рдВрдЧреНрд░рд╣ рдореЗрдВ рд▓реЗ рдЬрд╛рдирд╛ рд╣реЛрдЧрд╛ред рдпрд╛ рдлреНрд▓реЛ рдореЗрдВ, рдПрдХ рд╕рдВрдЧреНрд░рд╣ рдореЗрдВ рдкрд░рд┐рд╡рд░реНрддрд┐рдд рд╣реЛ рдЧрдпрд╛ред
var clicks = emulator .Clicks() .OnNext(item => Log($"Original: {item.Button} {item.X} {item.Y}")) .CollectEnumerable(cts.Token) .Select(click => click.Button == ClicksEmulator.Button.Left ? Flow<ClicksEmulator.ClickEventArgs>(collector => collector.Emit(click)) : Flow<ClicksEmulator.ClickEventArgs>(async collector => { var changedClick = new ClicksEmulator.ClickEventArgs(click.X, click.Y, ClicksEmulator.Button.Left); await collector.Emit(changedClick); await Task.Delay(200); await collector.Emit(changedClick); }) ) .SelectMany(flow => flow.CollectEnumerable()); await foreach (var click in clicks) { Log($"Changed: {click.Button} {click.X} {click.Y}"); }
рдРрд╕рд╛ рдХрд░рдиреЗ рдХреЗ рд▓рд┐рдП, Linq рдореЗрдВ SelectMany рдСрдкрд░реЗрдЯрд░ рд╣реИред рдЬреЗрдЯ рд╕реНрдЯреНрд░реАрдо рдореЗрдВ рдЗрд╕рдХрд╛ рд╕рдордХрдХреНрд╖ рдлреНрд▓реИрдЯрдкрд╛рдЗрдк рд╣реИред рд╕рдмрд╕реЗ рдкрд╣рд▓реЗ, IFlow рдХреЗ рдкреНрд░рддреНрдпреЗрдХ рдХреНрд▓рд┐рдХ рдХреЛ рдореИрдк рдХрд░реЗрдВ: рдмрд╛рдПрдВ рдХреНрд▓рд┐рдХ рдХреЗ рд▓рд┐рдП - рдЗрд╕ рдПрдХ рдХреНрд▓рд┐рдХ рдХреЗ рд╕рд╛рде рдкреНрд░рд╡рд╛рд╣ рдХрд░реЗрдВ, рджрд╛рдПрдВ рдХреЗ рд▓рд┐рдП - рдЙрди рджреЛрдиреЛрдВ рдХреЗ рдмреАрдЪ рджреЗрд░реА рдХреЗ рд╕рд╛рде рджреЛ рдмрд╛рдПрдВ рдХреНрд▓рд┐рдХ рд╕реЗ рдкреНрд░рд╡рд╛рд╣ рдХрд░реЗрдВред рдФрд░ рдлрд┐рд░ SelectMany рдореЗрдВ рд╣рдо IFlow рдХреЛ IAyncEnumerable рдореЗрдВ рдмрджрд▓ рджреЗрддреЗ рд╣реИрдВред
рдФрд░ рдпрд╣ рдХрд╛рдо рдХрд░рддрд╛ рд╣реИ! рдпрд╛рдиреА рдХрдИ рдСрдкрд░реЗрдЯрд░реЛрдВ рдХреЛ IFlow рдХреЗ рд▓рд┐рдП рд▓рд╛рдЧреВ рдХрд░рдиреЗ рдХреА рдЖрд╡рд╢реНрдпрдХрддрд╛ рдирд╣реАрдВ рд╣реИ - рдЖрдк Linq рдХрд╛ рдЙрдкрдпреЛрдЧ рдХрд░ рд╕рдХрддреЗ рд╣реИрдВред
рдирд┐рд╖реНрдХрд░реНрд╖
Rx.Net - C # рдореЗрдВ рдШрдЯрдирд╛рдУрдВ рдХреЗ рдЕрддреБрд▓реНрдпрдХрд╛рд▓рд┐рдХ рдЕрдиреБрдХреНрд░рдореЛрдВ рдХреЗ рд╕рд╛рде рдХрд╛рдо рдХрд░рддреЗ рд╕рдордп рдореБрдЦреНрдп рдЙрдкрдХрд░рдг рдмрдирд╛ рд╣реБрдЖ рд╣реИред рд▓реЗрдХрд┐рди рдХреЛрдб рдЖрдХрд╛рд░ рдХреЗ рдорд╛рдорд▓реЗ рдореЗрдВ рдпрд╣ рдПрдХ рдмрдбрд╝рд╛ рдкреБрд╕реНрддрдХрд╛рд▓рдп рд╣реИред рдЬреИрд╕рд╛ рдХрд┐ рд╣рдордиреЗ рджреЗрдЦрд╛, рд╕рдорд╛рди рдХрд╛рд░реНрдпрдХреНрд╖рдорддрд╛ рдХреЛ рдмрд╣реБрдд рд╕рд░рд▓ рдмрдирд╛рдпрд╛ рдЬрд╛ рд╕рдХрддрд╛ рд╣реИ - рдмрд╕ рджреЛ рдЗрдВрдЯрд░рдлреЗрд╕ рдФрд░ рдХреБрдЫ рдмрд╛рдзреНрдпрдХрд╛рд░реАред рдпрд╣ рднрд╛рд╖рд╛ рд╕реБрд╡рд┐рдзрд╛рдУрдВ рдХреЗ рдЙрдкрдпреЛрдЧ рдХреЗ рд▓рд┐рдП рд╕рдВрднрд╡ рд╣реИ - async / рдкреНрд░рддреАрдХреНрд╖рд╛ред рдЬрдм Rx рдХрд╛ рдЬрдиреНрдо рд╣реБрдЖ, рддрдм рдЗрд╕ рд╕реБрд╡рд┐рдзрд╛ рдХреЛ C # рдореЗрдВ рдирд╣реАрдВ рд▓рд╛рдпрд╛ рдЧрдпрд╛ рдерд╛ред
рдЖрдкрдХрд╛ рдзреНрдпрд╛рди рдХреЗ рд▓рд┐рдП рдзрдиреНрдпрд╡рд╛рдж!