Quite often we may need to run in background some resource demanding task to avoid interfering with the main task that is being performed at the same time. It is quite easy to arrange this with the help of queues: we put a task in a queue, and the system carries it out in accordance with the amount of available resources. It is possible to arrange such queues using third party applications, such as RabbitMQ or Gearman. You can also do the job perfectly well even without such third party tools. For instance, instead of RabbitMq for Magento 2.0 EE perfectly suits Mysql + cron, but this solution may have certain disadvantages if compared to real queue-solutions: first off, cron is running as a cycle so you are not able to immediately react on adding of a record into the queue. Secondly, using such a scheme makes it quite difficult to parallel a queue. Magento 2.0 EE gives you the possibility to use queues not only on the basis of Mysql+cron, but also on the basis of RabbitMq.
RabbitMQ allows applications to interact through the AMQP protocol. So to use RabbitMQ we will need to install the library that supports this protocol. Magento 2.0 EE uses the – php-amqplib library that is provided by one of the RabbitMq developers. This library uses AMQ Protocol version 0-9-1 and requires the installation of the extension bcmath in php. If your Magento 2.0 EE is properly installed, then you should have this extension already available (the docker file php7 for Magento 2.0 can be downloaded from https://github.com/Galillei/php7-magento2ee/blob/master/Dockerfile).
For more detailed info about RabbitMq please visit the official website.
But in this article we will describe only those features that can be used in Magento 2 EE.
Magento 2.0 EE uses the pattern Publish/Subscribe (publish/subscribe). I.e. there is a method that is adding records to the Publisher queue and there is a scriptwhich is reading data from the Consumer queue. Besides, it is even possible to use several Consumer subscribers when each of them will have its own queue, where messages from Publisher will be delivered.
The very first thing we need to do to create a queue is to add the file queue.xml into the module folder <magento2ee_dir>/app/code/<module_name>/etc/queue.xml:
1 2 3 4 5 6 |
<?xml version="1.0"?> <config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/queue.xsd"> <topic publisher="default" name="bridge.set.orders" schema="Path\to\MyModyleInterface" /> <consumer name="bridgeSender" queue="bridge_sender_queue" connection="amqp" class="MyModule\Consumer\ConsumerOrder" method="processMessage" executor="Magento\Framework\MessageQueue\BatchConsumer" /> <bind queue="bridge_sender_queue" exchange="magento" topic="bridge.set.orders" /> </config> |
So, lets check every string of this file.
1 |
<topic publisher="default" name="bridge.set.orders" schema="Path\to\Intrface\Interface" /> |
This line indicates that we are using the default Publisher bridge.set.orders, where the param “schema” stands for interface. This interface defines the type of the message. The object implements this interface and should be delivered to the Publisher, while Consumer should also receive exactly the same object.
In <magento2ee_dir>/app/code/<module_name>/etc/di.xml we have to indicate the class which will implement this interface, so that Magento would know which class should be initialized. <topic> indicates the type of exchanges (points of exchange) which is being used.
When we add a message to Publisher it does not know anything about queues or subscribed Consumers. In other words, for Publisher it does not matter what or how many Consumers are subscribed for receiving messages. So, this is exactly what exchanges do: distributing messages among Consumers.
There can be different types of Exchange: topic, direct, fanout and others. The most advanced type is “topic”: it has all features that other types of exchanges have. For instance, this type can use a combined routing for queues (something similar to аn URL) which gives you access to messages. For more details read this article.
In case you need to process and array instead of an object, then instead of:
1 |
schema="Path\to\Intrface\Interface" |
use, for example:
1 |
schema="Path\to\Intrface\Interface[]" |
The line:
1 |
<consumer name="bridgeSender" queue="bridge_sender_queue" connection="amqp" class="MyModule\Consumer\ConsumerOrder" method="processMessage" executor="Magento\Framework\MessageQueue\BatchConsumer" /> |
Here there are declared Consumer, queue name, queue adapter (amqp). To use mysql instead of RabbitMq you need just to change the parameter connection from amqp to db and to change the class which object will process messages in queue in the method processMessage. The parameter executor indicates the class, which will parse our messages and deliver them to the class MyModule\Consumer\ConsumerOrder
The line:
1 |
bind queue="bridge_sender_queue" exchange="magento" topic="bridge.set.orders" /> |
attaches the queue bridge_sender_queue to exchange=”magento”.
So now if we run the following command in console:
1 |
php magento queue:consumers:list |
we should get a list with the name of our Consumer, specifically bridgeSender.
To set Consumer listen to the queue, we have to run the following command:
1 |
php magento queue:consumers:start bridgeSender |
But if try to run the command you may get an error. The thing is that Magento does not create a new queue when either a Consumer is connected or when a message is added to the queue. So if you try to send a message to a non-existing queue nothing will happen and the message will be ignored. And if you try to run the command
1 |
php magento queue:consumers:start bridgeSender |
then you will receive an error message that such queue does not exist . So, to be able to use the queue we need to bind it to our exchange. Magento uses only topic exchange type, which has the name ‘magento’. To create a queue, you may use the following code in the file <magento2ee_dir>/app/code/<module_name><Setup>/InstallData.php (basically, you may use it in any file from the Setup folder, but I think it is quite logical to use it exactly in the file InstallData.php):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
<?php /** * to install queue, use InstallData from amqp module */ namespace Mymodule\Setup; use Magento\Framework\Setup\InstallDataInterface; use Magento\Framework\Setup\ModuleContextInterface; use Magento\Framework\Setup\ModuleDataSetupInterface; use Magento\Amqp\Model\Topology; /** * @codeCoverageIgnore */ class InstallData implements InstallDataInterface { /** * @var \Magento\Amqp\Model\Topology */ private $topology; /** * Constructor * * @param Topology $topology */ public function __construct(Topology $topology) { $this->topology = $topology; } /** * {@inheritdoc} */ public function install(ModuleDataSetupInterface $setup, ModuleContextInterface $context) { $this->topology->install(); } } |
And you should not worry that it will re-create already existing queues. Instead, it will create only those queues that have not existed yet.
So, if you try to run the command now:
1 |
php magento queue:consumers:start bridgeSender |
you will get the error – “Not found merger for consumer name bridgeSender'” . Because for each queue we need to declare the class, that has been inherited from the interface Magento\Framework\MessageQueue\MergerInterface. This class can be used for preliminary processing of messages even before the class that was declared in queue.xml in the Consumer section. It is obligatory to use this class, but if you do not require preliminary processing of messages, then you may simply return messages back:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
<?php namespace MyModele\Model; use Magento\Framework\MessageQueue\MergerInterface; class Merger implements MergerInterface { public function merge(array $messages) { return $messages; } } |
Having created this file, try to run the command once again:
1 |
php magento queue:consumers:start bridgeSender |
And again you will receive the error message “Not found merger for consumer name bridgeSender'” . The reason is that Magento does not know anything about this class. To declare this class (we are going to use it as Merger) you need to add the following code lines in the file <magento2ee_dir>/app/code/<module_name><etc>di.xml:
1 2 3 4 5 6 7 |
<type name="Magento\Framework\MessageQueue\MergerFactory"> <arguments> <argument name="mergers" xsi:type="array"> <item name="bridgeSender" xsi:type="string">MyModele\Merger</item> </argument> </arguments> </type> |
So now Magento knows which Merger class it should look for when you run the command:
1 |
php magento queue:consumers:start bridgeSender |
You should not expect to see anything when the command is being implemented. The command will simply be listening to the queue and waiting for new messages. Moreover, if there occur any minor errors you will see nothing either, so you will not be aware of whether the script is running correctly or not. So to output error messages in the console I usually use the method getTransactionCallback in the file <magento2ee_dir>/vendor/magento/framework-message-queue/BatchConsumer.php:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 |
private function getTransactionCallback(QueueInterface $queue, MergerInterface $merger) { return function (array $messages) use ($queue, $merger) { try { $this->resource->getConnection()->beginTransaction(); $decodedMessages = $this->decodeMessages($messages); $mergedMessages = $merger->merge($decodedMessages); $this->dispatchMessage($mergedMessages); $this->acknowledgeAll($queue, $messages); $this->resource->getConnection()->commit(); } catch (ConnectionLostException $e) { $this->resource->getConnection()->rollBack(); } catch (\Exception $e) { print_r($e->getMessage()); print_r($e->getFile()); print_r($e->getLine()); $this->resource->getConnection()->rollBack(); $this->rejectAll($queue, $messages); } }; } |
So now, when we have Consumer available, let’s try to push a message to the queue. For that you need to add one component to the constructor class, as on the below example:
1 2 3 4 5 6 7 8 9 10 11 12 13 |
/** * Publisher constructor. * @param PublisherInterface $publisher */ public function __construct(PublisherInterface $publisher) { $this->publisher = $publisher; } public function setToQueue( Path\to\MyModyleInterface $order) { $this->publisher->publish('bridge.set.orders',$order); } |
This code adds a message to the queue. Do not forget, that the object $order should be inherited from the interface that is indicated in the file queue.xml.
1 |
<topic publisher="default" name="bridge.set.orders" schema="Path\to\MyModyleInterface" /> |
Now lets look at the method”MyModule\Consumer\ConsumerOrder”::”processMessage”
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
public function processMessage(Path\to\MyModyleInterface $order) { $this->order = $order; $this->prepareRequest(); if(!$this->sendToBridge()) { //again set to queue $this->publish->setOrders($this->order); $this->publish->setToQueue(); $this->logger->critical('message cannot send to Bridge'); { } } $this->logger->debug('message succefull explain'); } |
The $order parameter is a message that has already been parsed. By default magento deletes messaged that have been sent to consumer. But in case any error occurs and you need to repeat the attempt later, then you need to add the messages into the queue again.
So, to sum up, we can say that Magento 2.0 provides a very useful tool to work with queues – RabbitMq, which you can widely use in your modules.
Hi, Daniel! Thanks for your question.
Unfortunately, I know nothing of such integrations.
Nice Article !
Are you aware of such integration of Message Queue or kind of ESB with Prestashop ?
Thank you,
Daniel
Hi, Keyur!
The issue is pretty complicated, we’ll try to cover this topic in one of our next articles. Subscribe to our blog to stay tuned!
Hi,
Thank you for the nice blog. For Magento 2.2 queue.xml is changed with another xml files. Could you please update it ?
Hi Siju, theoretically it’s possible, but on the other hand, you should have a dedicated process for certain queue and in case of errors it’s much more complicated to manage this process. We usually use Queue for push notification and products import (images that need to be uploaded are sent to the queue).
Can we replace all Cron Jobs with tthe Queue?