consider following code listen update long-polling:
map<string, list<asyncresponse>> tagmap = new concurrentgoodstuff(); // endpoint listens notifications of tag @produces(mediatype.application_json) @consumes(mediatype.application_json) @get @path("listen/{tag}") public void listenforupdates( @pathparam("tag") final string tag, @suspended final asyncresponse response) { tagmap.get(tag).add(response); } // endpoint push-style notifications @produces(mediatype.application_json) @consumes(mediatype.application_json) @put @path("update/{tag}/{value}") public response updatetag( @pathparam("tag") final string tag, @pathparam("value") final string value) { for(asyncresponse response : tagmap.get(tag)) { // resumes suspended responses response.resume(value); } return response.ok("cool whatever").build(); }
the client adds listener normal jersey client's asyncinvoker
, calls asynchronous task, , task calls update method.
when i'm testing this, run race condition. right after add listener asynchronously on listenforupdates()
, make update on endpoint updatetag()
synchronously. update gets run before listener added, , asynchronous response fails resume.
a solution call suspend()
method on response after adding listeners. it's not clear how that, given @suspended
provides already-suspended asyncresponse
object. should async response suspended only after adding listener? call suspend method? how can work jersey async client, or should use different long-polling client?
for solutions, i'm open different libraries, atmosphere or guava. not open adding thread.sleep()
in test, since intermittent failure waiting happen.
i ended using rxjava, not before coming just-as-good solution using blockingqueue
instead of list
in map
. goes this:
concurrentmap<string, blockingqueue<asyncresponse>> tagmap = new concurrentgoodstuff(); // endpoint initiates listener array tag. @produces(mediatype.application_json) @consumes(mediatype.application_json) @get @path("initlisten/{tag}") public void listenforupdates( @pathparam("tag") final string tag) { tagmap.putifabsent(tag, new linkedblockingqueue<>()); } // endpoint listens notifications of tag @produces(mediatype.application_json) @consumes(mediatype.application_json) @get @path("listen/{tag}") public void listenforupdates( @pathparam("tag") final string tag, @suspended final asyncresponse response) { blockingqueue<asyncresponse> responses = tagmap.get(tag); if (responses != null) { responses.add(response); } } // endpoint push-style notifications @produces(mediatype.application_json) @consumes(mediatype.application_json) @put @path("update/{tag}/{value}") public response updatetag( @pathparam("tag") final string tag, @pathparam("value") final string value) { blockingqueue<asyncresponse> responses = tagmap.get(tag); if (responses == null) { return response.nocontent().build(); } if (responses.isempty()) { // block-wait async listener try { asyncresponse response = tagmap.poll(15, timeunit.seconds); if (response == null) { return response.nocontent().build(); } response.resume(value); } catch (interruptedexception e) { return response.nocontent().build(); } } else { (asyncresponse response : responses) { // resumes suspended responses response.resume(value); } } return response.ok("cool whatever").build(); }
i haven't tested exact code, used version of in past. long call initlisten
endpoint synchronously first, can call asynchronous listen
endpoint , synchronous update
endpoint , there won't significant race condition.
there slight hint of race condition in update
endpoint, it's minor. responses
blocking queue become empty on iteration, or may updated multiple sources differently. alleviate this, i've used drainto(collection)
method on per-request instantiated data structure. still not solve use case multiple clients may try updating same tag of listeners, not need use case.
Comments
Post a Comment