java - Multithreading in Reactor 2.0 - why can't I spin signals out to multiple threads -
i'm running problems reactor 2.0 release. namely trying set reactive signal flow, fans out signal pool of waiting threads. i'm familiar rx , reactive cocoa, there basic missing here.
i have basic transformation follows:
workqueuedispatcher dispatcher = new workqueuedispatcher("dispatch", 10, 64, {... exception handle code here ...} return objectstream .partition(partitions) .dispatchon(dispatcher) .merge() .map(new function<object, object>() { @override public object apply(object o) { try { return extension.eval(o, null); } catch (unabletoevaluateexception e) { e.printstacktrace(); return null; } } });
i've tried flow 7 or 8 different ways, including different dispatchers, etc. i've tried breaking down grouped event stream, , handling each element separately, writing separate stream processing. in every situation, either see every request processing on same thread (which works, not multi-threaded) or error message have come dread:
java.lang.illegalstateexception: dispatcher provided doesn't support event ordering. concurrent signal dispatching, refer #partition()/groupby() method , assign individual single dispatchers. @ reactor.core.support.assert.state(assert.java:387) @ reactor.rx.stream.dispatchon(stream.java:720) @ reactor.rx.stream.dispatchon(stream.java:650)
i've tried following:
- manually doing partition/group by.
- explicitly setting seperate single threaded dispatcher (ring) earlier steps.
- just saying eff it, not being functional, , dumping own queue processing.
what missing here? should not using broadcaster start message loop? don't care @ in order execution here.
(edited)
here doing homegrown code scale out:
objectstream .consume(new consumer<object>() { @override public void accept(object o) { final object target = o; tpe.execute(new runnable(){ /** * when object implementing interface <code>runnable</code> used * create thread, starting thread causes object's * <code>run</code> method called in separately executing * thread. * <p/> * general contract of method <code>run</code> may * take action whatsoever. * * @see thread#run() */ @override public void run() { try { //system.out.println("on thread "+ thread.currentthread().getname()); timer.context onnext = onnexttimer.time(); timer.context timer = callcomponenttimer.time(); object translated = extension.eval(target, null); timer.close(); broadcaster.onnext(translated); onnext.close(); } catch (unabletoevaluateexception e) { e.printstacktrace(); } } });
edit
okay, updated follows:
metricregistry reg = dmpcontext.getcontext().getmetricregistry(); de.init(null); consolereporter reporter = consolereporter.forregistry(dmpcontext.getcontext().getmetricregistry()) .convertratesto(timeunit.seconds) .convertdurationsto(timeunit.milliseconds) .build(); reporter.start(10, timeunit.seconds); final countdownlatch latch = new countdownlatch(count); final function<string, object> translator = json.from(request.class); string content = new string(files.readallbytes(paths.get("/svn/dmpidea/request.json"))); broadcaster<string> stringbroadcaster = broadcaster.create(); final exec exec = new exec(); stringbroadcaster .partition(10) .consume(new consumer<groupedstream<integer, string>>() { @override public void accept(groupedstream<integer, string> groupedstream) { groupedstream.dispatchon(environment.cacheddispatcher()).map(translator).map(new function<object, object>() { @override public object apply(object o) { try { system.out.println("got thread " +thread.currentthread().getname()); return de.eval(o, null); } catch (unabletoevaluateexception e) { e.printstacktrace(); return null; } } }).consume(new consumer<object>() { @override public void accept(object o) { latch.countdown(); } }); } }); (int i=0; i<count; i++) { stringbroadcaster.onnext(content); } latch.await();
i still seeing single threaded execution:
got thread dispatchergroup-1 got thread dispatchergroup-1 got thread dispatchergroup-1 got thread dispatchergroup-1 got thread dispatchergroup-1 got thread dispatchergroup-1 got thread dispatchergroup-1
the pattern in reactor2.0 use separate cacheddispatcher
s (ringbufferdispatcher
s) instead of using workqueuedispatcher
or threadpoolexecutordispatcher
. offers splitting stream different threads. works parallelizing small (usually non-blocking) operations logical streams.
for full non-blocking execution in separate thread pool, see edit @ bottom.
you want use stream.groupby() or stream.partition() split stream multiple streams, each able dispatched on separate threads.
- stream.groupby() - buckets streams based on key return
- stream.partition([int]) - buckets streams based on hashcode of streaming objects (signals), , optionally number of buckets specify
after have partitioned/grouped stream separate streams, can use flatmap()
tell new streams dispatch onto separate threads.
.flatmap(stream -> stream.dispatchon(environment.cacheddispatcher())
calling environment.cacheddispatcher()
getting cacheddispatcher
(ringbufferdispatcher) pool of them. pool, default, has size equivalent number of processors computer. cacheddispatcher
s lazily created, not ideal inside call .dispatchon(dispatcher)
.
to create pool (dispatchersupplier
) of cacheddispatchers
beforehand, can use environment.newcacheddispatchers(int, [groupname])
. here example projectreactor docs how combine this:
dispatchersupplier supplier1 = environment.newcacheddispatchers(2, "groupbypool"); dispatchersupplier supplier2 = environment.newcacheddispatchers(5, "partitionpool"); streams .range(1, 10) .groupby(n -> n % 2 == 0) .flatmap(stream -> stream .dispatchon(supplier1.get()) .log("groupby") ) .partition(5) .flatmap(stream -> stream .dispatchon(supplier2.get()) .log("partition") ) .dispatchon(environment.shareddispatcher()) .log("join") .consume();
reactor docs: partitioning streams separate threads
notice in example have called .dispatchon(environment.shareddispatcher())
after last flatmap()
call. join stream single dispatcher
thread. in case environment.shareddispatcher()
, ringbufferdispatcher
.
i use strategy partition stream separate threads make blocking calls in parallel on separate cacheddispatcher
s, rejoin them single thread on main environment.shareddispatcher()
, in non-blocking/reactive way, so:
// spring's resttemplate making simple rest (http) calls resttemplate resttemplate = new resttemplate(); list<string> urls = arrays.aslist("http://www.google.com", "http://www.imgur.com"); streams .from(urls) .groupby(s -> s) // unique group (stream) per url .flatmap(stream -> // dispatch on separate threads per stream stream.dispatchon(environment.cacheddispatcher()) // blocking call in separate dispatching thread .map(s -> resttemplate.getforobject(s, string.class))) // rejoin single dispatcher thread .dispatchon(environment.shareddispatcher()) .consume( s -> { // consumer system.out.println("---complete in stream---"); }, t -> { // error consumer system.out.println("---exception---"); system.out.println(t); }, s -> { // complete consumer latch.countdown(); system.out.println("---complete---"); });
edit: doing blocking operations in parallel may still want use thread pool. in reactor 2.0 can using dispatcher backed thread pool, preferably executors.newcachedthreadpool()
because reuse threads limit amount of gc pressure.
the easiest way found use eventbus
using threadpoolexecutordispatcher
backed executors.newcachedthreadpool()
(which threadpoolexecutor
few specific settings if @ function create newcachedthreadpool):
// dispatcher backed cached thread pool limited gc pressure dispatcher dispatcher = new threadpoolexecutordispatcher(1024, 1024, executors.newcachedthreadpool()); int n = 1000000; eventbus bus = eventbus.create(dispatcher); countdownlatch latch = new countdownlatch(n); bus.on($("topic"), (event<integer> ev) -> { blockingservicecall(100); // block 100ms latch.countdown(); if (ev.getdata() % 1000 == 0) { system.out.println(thread.currentthread() + " " + thread.activecount()); } }); long start = system.currenttimemillis(); for(int = 0; < n; i++) { bus.notify("topic", event.wrap(i)); } latch.await(); system.out.println("ops/sec: " + (n * 1.0) / ((system.currenttimemillis() - start) / 1000.0) );
you need dispatch stream after blocking call has completed inside consumer if want go using streams. far know can't merge them stream automatically, nor can use threadpoolexecutordispatcher backed cached thread pool directly doing stream.dispatchon(threadpoolexecutordispatcher)
.
Comments
Post a Comment