java - Multithreading in Reactor 2.0 - why can't I spin signals out to multiple threads -


i'm running problems reactor 2.0 release. namely trying set reactive signal flow, fans out signal pool of waiting threads. i'm familiar rx , reactive cocoa, there basic missing here.

i have basic transformation follows:

 workqueuedispatcher dispatcher = new workqueuedispatcher("dispatch", 10, 64, {... exception handle code here ...}   return objectstream             .partition(partitions)             .dispatchon(dispatcher)             .merge()             .map(new function<object, object>() {                 @override                 public object apply(object o) {                     try {                         return extension.eval(o, null);                     } catch (unabletoevaluateexception e) {                         e.printstacktrace();                         return null;                     }                  }             }); 

i've tried flow 7 or 8 different ways, including different dispatchers, etc. i've tried breaking down grouped event stream, , handling each element separately, writing separate stream processing. in every situation, either see every request processing on same thread (which works, not multi-threaded) or error message have come dread:

   java.lang.illegalstateexception: dispatcher provided doesn't support event     ordering.  concurrent signal dispatching, refer #partition()/groupby()     method , assign individual single dispatchers.  @ reactor.core.support.assert.state(assert.java:387) @ reactor.rx.stream.dispatchon(stream.java:720) @ reactor.rx.stream.dispatchon(stream.java:650) 

i've tried following:

  1. manually doing partition/group by.
  2. explicitly setting seperate single threaded dispatcher (ring) earlier steps.
  3. just saying eff it, not being functional, , dumping own queue processing.

what missing here? should not using broadcaster start message loop? don't care @ in order execution here.

(edited)

here doing homegrown code scale out:

objectstream         .consume(new consumer<object>() {             @override             public void accept(object o) {                 final object target = o;                 tpe.execute(new runnable(){                     /**                      * when object implementing interface <code>runnable</code> used                      * create thread, starting thread causes object's                      * <code>run</code> method called in separately executing                      * thread.                      * <p/>                      * general contract of method <code>run</code> may                      * take action whatsoever.                      *                      * @see thread#run()                      */                     @override                     public void run() {                         try {                             //system.out.println("on thread "+ thread.currentthread().getname());                             timer.context onnext = onnexttimer.time();                             timer.context timer = callcomponenttimer.time();                             object translated = extension.eval(target, null);                             timer.close();                             broadcaster.onnext(translated);                             onnext.close();                         } catch (unabletoevaluateexception e) {                             e.printstacktrace();                         }                     }                 }); 

edit

okay, updated follows:

 metricregistry reg = dmpcontext.getcontext().getmetricregistry();       de.init(null);       consolereporter reporter = consolereporter.forregistry(dmpcontext.getcontext().getmetricregistry())             .convertratesto(timeunit.seconds)             .convertdurationsto(timeunit.milliseconds)             .build();      reporter.start(10, timeunit.seconds);      final countdownlatch latch = new countdownlatch(count);      final function<string, object> translator = json.from(request.class);     string content = new string(files.readallbytes(paths.get("/svn/dmpidea/request.json")));      broadcaster<string> stringbroadcaster = broadcaster.create();      final exec exec = new exec();       stringbroadcaster             .partition(10)             .consume(new consumer<groupedstream<integer, string>>() {                 @override                 public void accept(groupedstream<integer, string> groupedstream) {                      groupedstream.dispatchon(environment.cacheddispatcher()).map(translator).map(new function<object, object>() {                         @override                         public object apply(object o) {                             try {                                 system.out.println("got thread " +thread.currentthread().getname());                                 return de.eval(o, null);                             } catch (unabletoevaluateexception e) {                                 e.printstacktrace();                                 return null;                             }                         }                     }).consume(new consumer<object>() {                         @override                         public void accept(object o) {                             latch.countdown();                         }                     });                 }             });      (int i=0; i<count; i++)     {          stringbroadcaster.onnext(content);      }     latch.await(); 

i still seeing single threaded execution:

got thread dispatchergroup-1 got thread dispatchergroup-1 got thread dispatchergroup-1 got thread dispatchergroup-1 got thread dispatchergroup-1 got thread dispatchergroup-1 got thread dispatchergroup-1

the pattern in reactor2.0 use separate cacheddispatchers (ringbufferdispatchers) instead of using workqueuedispatcher or threadpoolexecutordispatcher. offers splitting stream different threads. works parallelizing small (usually non-blocking) operations logical streams.

for full non-blocking execution in separate thread pool, see edit @ bottom.

you want use stream.groupby() or stream.partition() split stream multiple streams, each able dispatched on separate threads.

  1. stream.groupby() - buckets streams based on key return
  2. stream.partition([int]) - buckets streams based on hashcode of streaming objects (signals), , optionally number of buckets specify

after have partitioned/grouped stream separate streams, can use flatmap() tell new streams dispatch onto separate threads.

.flatmap(stream -> stream.dispatchon(environment.cacheddispatcher())

calling environment.cacheddispatcher() getting cacheddispatcher (ringbufferdispatcher) pool of them. pool, default, has size equivalent number of processors computer. cacheddispatchers lazily created, not ideal inside call .dispatchon(dispatcher).

to create pool (dispatchersupplier) of cacheddispatchers beforehand, can use environment.newcacheddispatchers(int, [groupname]). here example projectreactor docs how combine this:

    dispatchersupplier supplier1 = environment.newcacheddispatchers(2, "groupbypool");     dispatchersupplier supplier2 = environment.newcacheddispatchers(5, "partitionpool");      streams         .range(1, 10)         .groupby(n -> n % 2 == 0)          .flatmap(stream -> stream                 .dispatchon(supplier1.get())                  .log("groupby")         )         .partition(5)          .flatmap(stream -> stream                 .dispatchon(supplier2.get())                  .log("partition")         )         .dispatchon(environment.shareddispatcher())          .log("join")         .consume(); 

reactor docs: partitioning streams separate threads

notice in example have called .dispatchon(environment.shareddispatcher()) after last flatmap() call. join stream single dispatcher thread. in case environment.shareddispatcher(), ringbufferdispatcher.

i use strategy partition stream separate threads make blocking calls in parallel on separate cacheddispatchers, rejoin them single thread on main environment.shareddispatcher(), in non-blocking/reactive way, so:

    // spring's resttemplate making simple rest (http) calls     resttemplate resttemplate = new resttemplate();     list<string> urls = arrays.aslist("http://www.google.com", "http://www.imgur.com");     streams         .from(urls)         .groupby(s -> s) // unique group (stream) per url         .flatmap(stream ->              // dispatch on separate threads per stream             stream.dispatchon(environment.cacheddispatcher())                  // blocking call in separate dispatching thread                  .map(s -> resttemplate.getforobject(s, string.class)))         // rejoin single dispatcher thread         .dispatchon(environment.shareddispatcher())         .consume(              s -> { // consumer                  system.out.println("---complete in stream---");              },              t -> { // error consumer                  system.out.println("---exception---");                  system.out.println(t);              },              s -> { // complete consumer                  latch.countdown();                  system.out.println("---complete---");              }); 



edit: doing blocking operations in parallel may still want use thread pool. in reactor 2.0 can using dispatcher backed thread pool, preferably executors.newcachedthreadpool() because reuse threads limit amount of gc pressure.

the easiest way found use eventbus using threadpoolexecutordispatcher backed executors.newcachedthreadpool() (which threadpoolexecutor few specific settings if @ function create newcachedthreadpool):

    // dispatcher backed cached thread pool limited gc pressure     dispatcher dispatcher = new threadpoolexecutordispatcher(1024, 1024, executors.newcachedthreadpool());      int n = 1000000;     eventbus bus = eventbus.create(dispatcher);     countdownlatch latch = new countdownlatch(n);     bus.on($("topic"), (event<integer> ev) -> {         blockingservicecall(100); // block 100ms         latch.countdown();         if (ev.getdata() % 1000 == 0) {             system.out.println(thread.currentthread() + " " + thread.activecount());         }     });      long start = system.currenttimemillis();     for(int = 0; < n; i++) {         bus.notify("topic", event.wrap(i));     }     latch.await();     system.out.println("ops/sec: " + (n * 1.0) / ((system.currenttimemillis() - start) / 1000.0) ); 

you need dispatch stream after blocking call has completed inside consumer if want go using streams. far know can't merge them stream automatically, nor can use threadpoolexecutordispatcher backed cached thread pool directly doing stream.dispatchon(threadpoolexecutordispatcher).


Comments

Popular posts from this blog

google chrome - Developer tools - How to inspect the elements which are added momentarily (by JQuery)? -

angularjs - Showing an empty as first option in select tag -

php - Cloud9 cloud IDE and CakePHP -