RabbitMQ.js 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556
  1. /**
  2. * Created by zhang on 2019/5/5.
  3. */
  4. let amqp = require('amqplib');
  5. class RabbitMQ{
  6. constructor(connect){
  7. this.connect = connect;
  8. }
  9. static async connection(URL){
  10. if(!RabbitMQ.instance){
  11. RabbitMQ.instance = new RabbitMQ(await amqp.connect(URL)) //'amqp://13726297388:123456@qa.smartcost.com.cn:5672'
  12. }
  13. return RabbitMQ.instance
  14. }
  15. static async sendMessage(queue,msg,durable=false){
  16. if(RabbitMQ.instance){
  17. let ch = await RabbitMQ.instance.connect.createChannel();
  18. try {
  19. await ch.assertQueue(queue, {durable: durable});
  20. ch.sendToQueue(queue,Buffer.from(msg),{persistent: durable});
  21. console.log(" Sent %s to :"+queue, msg);
  22. ch.close();
  23. }catch (e){
  24. console.log(e);
  25. ch.close();
  26. }
  27. }
  28. }
  29. static async receiveMessage(queue,callback,durable=false){
  30. if(RabbitMQ.instance){
  31. let ch = await RabbitMQ.instance.connect.createChannel();
  32. try {
  33. await ch.assertQueue(queue, {durable: durable});
  34. ch.prefetch(1);
  35. console.log(" [*] Waiting for messages in %s. To exit press CTRL+C", queue);
  36. ch.consume(queue, function(msg) {
  37. console.log(" [x] Received %s", msg.content.toString());
  38. if(durable == true) ch.ack(msg);
  39. if(callback) callback(msg)
  40. }, {
  41. noAck: !durable
  42. });
  43. }catch (e){
  44. console.log(e);
  45. ch.close();
  46. }
  47. }
  48. }
  49. }
  50. export default RabbitMQ;