c# - How to convert blocking events to Observable? -



c# - How to convert blocking events to Observable? -

i learning .net rx (reactive extensions) library , trying create proper observable read user input console.

so far came this:

public static iobservable<string> consoleinputobservable() { homecoming observable.create<string>(observer => { var cancelable = new booleandisposable(); while(!cancelable.isdisposed) { observer.onnext(console.readline()); } observer.oncompleted(); homecoming cancelable; }); }

unfortunately, implementation has @ to the lowest degree 1 problem - there no way unsubscribe of it.

so question is: how convert series of blocking events observable?

thanks.

edit: typos

here go.

note few things:

this allows user supply appropriate scheduler command concurrency run, , yield each loop prevent getting gummed (although waiting on console pretty gummy anyway...) we should not phone call oncompleted in example, since way terminate cancelling subscription - , strive send no farther messages post cancellation we don't send onnext post cancellation here either.

here code:

public static iobservable<string> consoleinputobservable( ischeduler scheduler = null) { scheduler = scheduler ?? scheduler.default; homecoming observable.create<string>(o => { homecoming scheduler.scheduleasync(async (ctrl, ct) => { while(!ct.iscancellationrequested) { var next = console.readline(); if(ct.iscancellationrequested) return; o.onnext(next); await ctrl.yield(); } }); }); } addendum

@martinliversage commented behaviour multiple subscribers undesirable - prompted addendum. can publish() above code, given nature of console there 1 application , 1 thread can reading @ time, different approach warranted.

i ignored above since felt question more threading aspect nature of console. if genuinely interested in reporting lines entered @ console, kind of main loop next more practical - , represents reasonable utilize of subject.

static void main() { subject<string> sc = new subject<string>(); // kick off subscriptions here... // perhaps `observeon` if background processing required sc.subscribe(x => console.writeline("subscriber1: " + x)); sc.subscribe(x => console.writeline("subscriber2: " + x)); string input; while((input = console.readline()) != "q") { sc.onnext(input); } sc.oncompleted(); console.writeline("finished"); }

c# system.reactive observable rx.net

Comments

Popular posts from this blog

java - How to set log4j.defaultInitOverride property to false in jboss server 6 -

c - GStreamer 1.0 1.4.5 RTSP Example Server sends 503 Service unavailable -

Using ajax with sonata admin list view pagination -