c# - How to convert blocking events to Observable? -
c# - How to convert blocking events to Observable? -
i learning .net rx (reactive extensions) library , trying create proper observable read user input console.
so far came this:
public static iobservable<string> consoleinputobservable() { homecoming observable.create<string>(observer => { var cancelable = new booleandisposable(); while(!cancelable.isdisposed) { observer.onnext(console.readline()); } observer.oncompleted(); homecoming cancelable; }); }
unfortunately, implementation has @ to the lowest degree 1 problem - there no way unsubscribe of it.
so question is: how convert series of blocking events observable?
thanks.
edit: typos
here go.
note few things:
this allows user supply appropriate scheduler command concurrency run, , yield each loop prevent getting gummed (although waiting on console pretty gummy anyway...) we should not phone calloncompleted
in example, since way terminate cancelling subscription - , strive send no farther messages post cancellation we don't send onnext
post cancellation here either. here code:
public static iobservable<string> consoleinputobservable( ischeduler scheduler = null) { scheduler = scheduler ?? scheduler.default; homecoming observable.create<string>(o => { homecoming scheduler.scheduleasync(async (ctrl, ct) => { while(!ct.iscancellationrequested) { var next = console.readline(); if(ct.iscancellationrequested) return; o.onnext(next); await ctrl.yield(); } }); }); }
addendum @martinliversage commented behaviour multiple subscribers undesirable - prompted addendum. can publish()
above code, given nature of console there 1 application , 1 thread can reading @ time, different approach warranted.
i ignored above since felt question more threading aspect nature of console. if genuinely interested in reporting lines entered @ console, kind of main loop next more practical - , represents reasonable utilize of subject
.
static void main() { subject<string> sc = new subject<string>(); // kick off subscriptions here... // perhaps `observeon` if background processing required sc.subscribe(x => console.writeline("subscriber1: " + x)); sc.subscribe(x => console.writeline("subscriber2: " + x)); string input; while((input = console.readline()) != "q") { sc.onnext(input); } sc.oncompleted(); console.writeline("finished"); }
c# system.reactive observable rx.net
Comments
Post a Comment