MassTransit's ISendObserver is not observing -


i have consumer publishing response bus. can ireceiveobserver wired , working on bus, haven't been able either isendobserver or ipublishobserver running. have confirmed rabbitmq management console messages are being published correctly.

class program {     static bushandle _bushandle;      static void main(string[] args)     {         initlogging();         initstructuremap();         initbus();          system.console.writeline("starting processing, enter stop...");         system.console.readline();         system.console.writeline("see later, alligator!");          stopbus();     }      static void initbus()     {         var busctrl = objectfactory.container.getinstance<ibuscontrol>();         var recobserver = objectfactory.container.getinstance<ireceiveobserver>();         var sendobserver = objectfactory.container.getinstance<isendobserver>();          busctrl.connectreceiveobserver(recobserver);         busctrl.connectsendobserver(sendobserver);          _bushandle = busctrl.start();     }      static void stopbus()     {         _bushandle.stop();     }       static void initlogging()     {         xmlconfigurator.configure();         log4netlogger.use();     }      static void initstructuremap()     {         objectfactory.initialize(x => {             x.addregistry<mytestconsoleregistry>();             x.addregistry<mytestregistry>();         });     } }  public class mytestconsoleregistry : registry {     public mytestconsoleregistry()     {         var rabbituri = configurationmanager.appsettings["rabbitmqhosturi"];         var queuename = configurationmanager.appsettings["masstransitqueue"];          for<ibuscontrol>(new singletonlifecycle())             .use("configure ibuscontrol masstransit consumers rabbitmq transport",                  ctx => bus.factory.createusingrabbitmq(cfg => {                     cfg.usejsonserializer();                     cfg.publisherconfirmation = true;                      var host = cfg.host(new uri(rabbituri), rabbitcfg => { });                      cfg.receiveendpoint(host, queuename, endpointcfg => {                         endpointcfg.loadfrom(ctx);                     });                 })             );          for<ireceiveobserver>().use<masstransitobserver>();         for<isendobserver>().use<masstransitobserver>();          // ...snip...     } }   public class mytestregistry : registry {     public mytestregistry()     {         forconcretetype<mytestconsumer>();          // ...snip...     } }  public class masstransitobserver : ireceiveobserver, isendobserver {      // nothing now, trying wire up...      public task consumefault<t>(consumecontext<t> context, timespan duration, string consumertype, exception exception) t : class     {         return task.completedtask;     }      public task postconsume<t>(consumecontext<t> context, timespan duration, string consumertype) t : class     {         return task.completedtask;     }      public task postreceive(receivecontext context)     {         return task.completedtask;     }      public task prereceive(receivecontext context)     {         return task.completedtask;     }      public task receivefault(receivecontext context, exception exception)     {         return task.completedtask;     }      public task presend<t>(sendcontext<t> context) t : class     {         return task.completedtask;     }      public task postsend<t>(sendcontext<t> context) t : class     {         return task.completedtask;     }      public task sendfault<t>(sendcontext<t> context, exception exception) t : class     {         return task.completedtask;     } }   public class mytestconsumer : iconsumer<mytestmessage>,     // testing only:     iconsumer<mytestresponse> {     readonly idosomething _dosomething;      public testconsumer(idosomething dosomething)     {         _dosomething = dosomething;     }      public task consume(consumecontext<mytestresponse> context)     {         // testing only...         return task.completedtask;     }      public async task consume(consumecontext<mytestmessage> context)     {         var result = await _dosomething(context.message.id);         var resp = new mytestresponsemessage(result);          await context             .publish<mytestresponse>(resp);     } } 

given code, ireceiveobserver methods getting called, isendobserver methods not.

i'm new masstransit, expect straightforward issue.


edit: unit test using nunit , moq, doesn't use structuremap. believe illustrates i'm seeing.

[test] public void testsendobserver() {     var bus = createbus();     var bushandle = bus.start();      var sendobs = new mock<isendobserver>();      sendobs.setup(x => x.presend<testmessage>(it.isany<sendcontext<testmessage>>()))         .returns(task.fromresult(0))         .verifiable();      sendobs.setup(x => x.postsend<testmessage>(it.isany<sendcontext<testmessage>>()))         .returns(task.fromresult(0))         .verifiable();      using (bus.connectsendobserver(sendobs.object)) {         var pubtask = bus.publish(new testmessage { message = "some test message" });         pubtask.wait();     }      bushandle.stop();      // fails, neither presend nor postsend have been called     sendobs.verify(x => x.presend<testmessage>(it.isany<sendcontext<testmessage>>()), times.once());     sendobs.verify(x => x.postsend<testmessage>(it.isany<sendcontext<testmessage>>()), times.once()); }  ibuscontrol createbus() {     return masstransit.bus.factory.createusingrabbitmq(x => {          var host = x.host(new uri("rabbitmq://localhost/"), h => {             h.username("guest");             h.password("guest");         });     }); }  public class testmessage {     public string message { get; set; } } 


Comments