multithreading - Concurrent queue consumption enigma in java -


trying speed image processing using java opencv, tried use parallel stream consume queue of opencv <mat>. if time algorithm , count left on queue, incoherent results when processing stream in parallel, sequential computing results correct. since used concurrentlinkedqueue(), thought thread safety , asynchronicity, apparently not. know how circumvent problem?

remarks:

  • elements still being put on queue during consumption
  • i running 4 real (8 virtual) core processor

results sequential stream:

frame collection start size (=production): 1455

frame collection end size (=production - consumption): 1360

resulting list size after algorithm run (=consumption): 100

algorithm: 6956 ms

results parallel stream:

frame collection start size (=production): 1455

frame collection end size (=production - consumption): 440

resulting list size after algorithm run (=consumption): 100

algorithm: 9242 ms

my code:

public class ovusculetestconcurrent {      public final static concurrentlinkedqueue<mat> framecollection = new concurrentlinkedqueue<mat>();      public static void main(string[] args) throws interruptedexception, executionexception {         system.loadlibrary(core.native_library_name);         final string path = "c:\\users\\raoul\\workspace\\aorta2\\ressource\\artery_src_for_dual.avi";         long startalgotime = system.nanotime();          // constitute frame collection in async mode         capture cap = new capture(path, framecollection);         new thread(cap).start();         thread.sleep(3000); //leaves time accumulate frames         system.out.println("frame collection start size (=production): "+framecollection.size());          //consumes current queue in parallel/sequential         list<imageplus> lm = stream.generate(() -> {                 return framecollection.poll();             })             .parallel() // comment disable parallel computing             .limit(100l)             .map(img -> utils.prepareimage(img,                                     new point(300, 250),                                     new point(450, 250),                                     new point(400, 400),                                     0.25))             .collect(collectors.tolist());          //timing & printing results         long endalgotime = system.nanotime();         long algoduration = (endalgotime - startalgotime)/1_000_000;  //divide 1_000_000 milliseconds.         system.out.println("frame collection end size (=production - consumption): "+framecollection.size());         system.out.println("resulting list size after algorithm run (=consumption): "+lm.size());         system.out.println("algorithm: "+algoduration+" ms");         system.exit(0);     }  } 

there few things going on code caught eye.

first, creating stream using stream.generate on right track. it's preferable calling queue.stream(), return stream consisting of the current contents of queue. elements being added queue during processing, won't work.

one problem code generates stream (edited clarity):

stream.generate(() -> queue.poll()) 

the problem poll method, defined follows:

retrieves , removes head of queue, or returns null if queue empty.

it may when stream run in parallel, stream's threads can drain queue faster elements produced , inserted queue. if occurs, queue empty out @ point , stream become populated null elements returned poll.

i'm not sure prepareimage when handed null, seems pass through output, why 100 elements in destination list.

the alternative use blockingqueue implementation , use take method, like

stream.generate(() -> queue.take()) 

this avoid injecting nulls stream. i'm not sure blockingqueue implementation should use, i'd advise investigate 1 size bound. if producer outstrips consumers, unbounded queue expand fill available memory.

unfortunately, blockingqueue.take() throws interruptedexception can't use in simple lambda. you'll have figure out upon interrupts. maybe return dummy element or something.

another issue limit method imposes limit on number of elements passed downstream, in parallel streams, multiple threads may opportunistically pull more number of elements stream processing. these buffered limit operation until limit reached, @ time stream processing terminates. elements pulled stream source , buffered when limit reached discarded. might why on 1,000 elements pulled queue 100 elements end in result list.

(but in sequential case numbers don't add up. don't think same thing going on, buffered elements being discarded when limit reached. perhaps it's because of additional elements being produced during processing?)

if can live elements being discarded, parallel stream fed queue.take() might work; otherwise, different approach required.


Comments

Popular posts from this blog

google chrome - Developer tools - How to inspect the elements which are added momentarily (by JQuery)? -

angularjs - Showing an empty as first option in select tag -

php - Cloud9 cloud IDE and CakePHP -