i have consumer publishing response bus. can ireceiveobserver
wired , working on bus, haven't been able either isendobserver
or ipublishobserver
running. have confirmed rabbitmq management console messages are being published correctly.
class program { static bushandle _bushandle; static void main(string[] args) { initlogging(); initstructuremap(); initbus(); system.console.writeline("starting processing, enter stop..."); system.console.readline(); system.console.writeline("see later, alligator!"); stopbus(); } static void initbus() { var busctrl = objectfactory.container.getinstance<ibuscontrol>(); var recobserver = objectfactory.container.getinstance<ireceiveobserver>(); var sendobserver = objectfactory.container.getinstance<isendobserver>(); busctrl.connectreceiveobserver(recobserver); busctrl.connectsendobserver(sendobserver); _bushandle = busctrl.start(); } static void stopbus() { _bushandle.stop(); } static void initlogging() { xmlconfigurator.configure(); log4netlogger.use(); } static void initstructuremap() { objectfactory.initialize(x => { x.addregistry<mytestconsoleregistry>(); x.addregistry<mytestregistry>(); }); } } public class mytestconsoleregistry : registry { public mytestconsoleregistry() { var rabbituri = configurationmanager.appsettings["rabbitmqhosturi"]; var queuename = configurationmanager.appsettings["masstransitqueue"]; for<ibuscontrol>(new singletonlifecycle()) .use("configure ibuscontrol masstransit consumers rabbitmq transport", ctx => bus.factory.createusingrabbitmq(cfg => { cfg.usejsonserializer(); cfg.publisherconfirmation = true; var host = cfg.host(new uri(rabbituri), rabbitcfg => { }); cfg.receiveendpoint(host, queuename, endpointcfg => { endpointcfg.loadfrom(ctx); }); }) ); for<ireceiveobserver>().use<masstransitobserver>(); for<isendobserver>().use<masstransitobserver>(); // ...snip... } } public class mytestregistry : registry { public mytestregistry() { forconcretetype<mytestconsumer>(); // ...snip... } } public class masstransitobserver : ireceiveobserver, isendobserver { // nothing now, trying wire up... public task consumefault<t>(consumecontext<t> context, timespan duration, string consumertype, exception exception) t : class { return task.completedtask; } public task postconsume<t>(consumecontext<t> context, timespan duration, string consumertype) t : class { return task.completedtask; } public task postreceive(receivecontext context) { return task.completedtask; } public task prereceive(receivecontext context) { return task.completedtask; } public task receivefault(receivecontext context, exception exception) { return task.completedtask; } public task presend<t>(sendcontext<t> context) t : class { return task.completedtask; } public task postsend<t>(sendcontext<t> context) t : class { return task.completedtask; } public task sendfault<t>(sendcontext<t> context, exception exception) t : class { return task.completedtask; } } public class mytestconsumer : iconsumer<mytestmessage>, // testing only: iconsumer<mytestresponse> { readonly idosomething _dosomething; public testconsumer(idosomething dosomething) { _dosomething = dosomething; } public task consume(consumecontext<mytestresponse> context) { // testing only... return task.completedtask; } public async task consume(consumecontext<mytestmessage> context) { var result = await _dosomething(context.message.id); var resp = new mytestresponsemessage(result); await context .publish<mytestresponse>(resp); } }
given code, ireceiveobserver methods getting called, isendobserver methods not.
i'm new masstransit, expect straightforward issue.
edit: unit test using nunit , moq, doesn't use structuremap. believe illustrates i'm seeing.
[test] public void testsendobserver() { var bus = createbus(); var bushandle = bus.start(); var sendobs = new mock<isendobserver>(); sendobs.setup(x => x.presend<testmessage>(it.isany<sendcontext<testmessage>>())) .returns(task.fromresult(0)) .verifiable(); sendobs.setup(x => x.postsend<testmessage>(it.isany<sendcontext<testmessage>>())) .returns(task.fromresult(0)) .verifiable(); using (bus.connectsendobserver(sendobs.object)) { var pubtask = bus.publish(new testmessage { message = "some test message" }); pubtask.wait(); } bushandle.stop(); // fails, neither presend nor postsend have been called sendobs.verify(x => x.presend<testmessage>(it.isany<sendcontext<testmessage>>()), times.once()); sendobs.verify(x => x.postsend<testmessage>(it.isany<sendcontext<testmessage>>()), times.once()); } ibuscontrol createbus() { return masstransit.bus.factory.createusingrabbitmq(x => { var host = x.host(new uri("rabbitmq://localhost/"), h => { h.username("guest"); h.password("guest"); }); }); } public class testmessage { public string message { get; set; } }
Comments
Post a Comment