multithreading - Concurrent queue consumption enigma in java -
trying speed image processing using java opencv, tried use parallel stream consume queue of opencv <mat>
. if time algorithm , count left on queue, incoherent results when processing stream in parallel, sequential computing results correct. since used concurrentlinkedqueue()
, thought thread safety , asynchronicity, apparently not. know how circumvent problem?
remarks:
- elements still being put on queue during consumption
- i running 4 real (8 virtual) core processor
results sequential stream:
frame collection start size (=production): 1455
frame collection end size (=production - consumption): 1360
resulting list size after algorithm run (=consumption): 100
algorithm: 6956 ms
results parallel stream:
frame collection start size (=production): 1455
frame collection end size (=production - consumption): 440
resulting list size after algorithm run (=consumption): 100
algorithm: 9242 ms
my code:
public class ovusculetestconcurrent { public final static concurrentlinkedqueue<mat> framecollection = new concurrentlinkedqueue<mat>(); public static void main(string[] args) throws interruptedexception, executionexception { system.loadlibrary(core.native_library_name); final string path = "c:\\users\\raoul\\workspace\\aorta2\\ressource\\artery_src_for_dual.avi"; long startalgotime = system.nanotime(); // constitute frame collection in async mode capture cap = new capture(path, framecollection); new thread(cap).start(); thread.sleep(3000); //leaves time accumulate frames system.out.println("frame collection start size (=production): "+framecollection.size()); //consumes current queue in parallel/sequential list<imageplus> lm = stream.generate(() -> { return framecollection.poll(); }) .parallel() // comment disable parallel computing .limit(100l) .map(img -> utils.prepareimage(img, new point(300, 250), new point(450, 250), new point(400, 400), 0.25)) .collect(collectors.tolist()); //timing & printing results long endalgotime = system.nanotime(); long algoduration = (endalgotime - startalgotime)/1_000_000; //divide 1_000_000 milliseconds. system.out.println("frame collection end size (=production - consumption): "+framecollection.size()); system.out.println("resulting list size after algorithm run (=consumption): "+lm.size()); system.out.println("algorithm: "+algoduration+" ms"); system.exit(0); } }
there few things going on code caught eye.
first, creating stream using stream.generate
on right track. it's preferable calling queue.stream()
, return stream consisting of the current contents of queue. elements being added queue during processing, won't work.
one problem code generates stream (edited clarity):
stream.generate(() -> queue.poll())
the problem poll
method, defined follows:
retrieves , removes head of queue, or returns null if queue empty.
it may when stream run in parallel, stream's threads can drain queue faster elements produced , inserted queue. if occurs, queue empty out @ point , stream become populated null
elements returned poll
.
i'm not sure prepareimage
when handed null, seems pass through output, why 100 elements in destination list.
the alternative use blockingqueue
implementation , use take
method, like
stream.generate(() -> queue.take())
this avoid injecting nulls stream. i'm not sure blockingqueue
implementation should use, i'd advise investigate 1 size bound. if producer outstrips consumers, unbounded queue expand fill available memory.
unfortunately, blockingqueue.take()
throws interruptedexception
can't use in simple lambda. you'll have figure out upon interrupts. maybe return dummy element or something.
another issue limit
method imposes limit on number of elements passed downstream, in parallel streams, multiple threads may opportunistically pull more number of elements stream processing. these buffered limit
operation until limit reached, @ time stream processing terminates. elements pulled stream source , buffered when limit reached discarded. might why on 1,000 elements pulled queue 100 elements end in result list.
(but in sequential case numbers don't add up. don't think same thing going on, buffered elements being discarded when limit reached. perhaps it's because of additional elements being produced during processing?)
if can live elements being discarded, parallel stream fed queue.take()
might work; otherwise, different approach required.
Comments
Post a Comment