Use Gearman to make your Magento background jobs faster

In brief: this post describes a method to use an alternative task queue system to speed up the background operations performed by the Magento extension Boost My Shop ERP. With a little effort, you should be able to apply this methodology and toolset to tough problems of your own.

 We use an extension called Boost My Shop ERP to manage our LiveOutThere.com warehouse. It surprises me just how much value Olivier Zimmerman and his team have packed into a €500 extension, and it will get us through million dollar months to come.

But, like most things in Magento land, we need to extend a few things to make Boost My Shop’s ERP work exactly the way we needed it to. Our topic today: how to refresh big catalogs using gearman. The reason we do this is because we receive new sets of EDI “available-to-sell” documents from our suppliers every night, and we have to adjust the backorders status of thousands upon thousands of products and update all the various indexes, caches, aggregates, and statuses that are stored by Magento and made even more complicated by ERP.

Boost My Shop ERP does have a built-in cron scheduler extension that accepts events and queues them for later execution; but we found it inconsistent and difficult to troubleshoot. Our catalog size is >200,000 SKUs and the out-of-the-box cron jobs were stalling and failing. This caused outdated information on the site, like products that are in-stock but showing up as out-of stock and vice versa.

We needed a way to refresh stock status an order of magnitude faster  and with more control than we had been able to do previously. Enter Gearman!

Gearman provides a generic application framework to farm out work to other machines or processes that are better suited to do the work. It allows you to do work in parallel, to load balance processing, and to call functions between languages. It can be used in a variety of applications, from high-availability web sites to the transport of database replication events. In other words, it is the nervous system for how distributed processing communicates.

So what’s the point? Well, if you’re familiar with asynchronous Javascript programming you’re going to get this right way. Actually if you’re any kind of a programmer but a PHP programmer… jokes… I’m going to go out on a limb here and say, yeah, you’re going to get this right away!

Prerequisites

To use Gearman you will need to do a few things:

  • Install the IBuildings Gearman extension for Magento from https://github.com/frak/Magento-Gearman-Module and follow the instructions to install the required libraries and classes
  • Install gearmand
    • On my Mac with the Homebrew package manager, it’s this simple: `brew install gearman`
    • Don’t forget to add this to your php.info and make sure the extension has been loaded;
      extension=”gearman.so”
  • Install supervisord (try this guide from python.org)
    • Copy the example configuration file for supervisord to $CWD/supervisord.conf or /etc/supervisord.conf
    • Now you’ll add a program directive to the bottom of your supervisord.conf. Mine looks like this:

[program:gearman_worker]
command=/usr/local/bin/php gearman_worker.php
process_name=%(program_name)s_%(process_num)02d
numprocs=8
directory=/Users/drew/Web/shell
stdout_logfile=/var/log/supervisor/gearman_worker.log
autostart=true
autorestart=true
user=drew

Then run supervisorctl from the /bin directoy of wherever you installed supervisor to. You will see a list of all your “workers” and their state. What are workers? They are the PHP files you specified in the command config value above. Supervisord keeps them open and “waiting for work” from a dispatcher script.

At this point gearman.php, the task dispatcher, and gearman_worker.php don’t exist yet, so create a gearman.php file in the shell directory of your Magento root:

<?php
/*
==New BSD License==

Copyright (c) 2013, Drew Gillson
All rights reserved.

Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:

    * Redistributions of source code must retain the above copyright
      notice, this list of conditions and the following disclaimer.
    * Redistributions in binary form must reproduce the above copyright
      notice, this list of conditions and the following disclaimer in the
      documentation and/or other materials provided with the distribution.
    * The name of Drew Gillson may not be used to endorse or promote products
      derived from this software without specific prior written permission.

THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER BE LIABLE FOR ANY
DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 */

/**
 * Gearman task dispatcher for Boost My Shop ERP (v2.9.1) extension for Magento (CE 1.7)
 *
 * @copyright  Copyright (c) 2013 Drew Gillson (http://drewgillson.com)
 * @license    http://framework.zend.com/license/new-bsd     New BSD License
 */

require_once 'abstract.php';

/**
 * Magento Gearman Dispatcher Script
 *
 * @category    Mage
 * @package     Mage_Shell
 * @author      Drew Gillson <drew@liveoutthere.com>
 */
class Mage_Shell_Gearman extends Mage_Shell_Abstract
{
    private function init()
    {
        $this->indexers = Mage::getSingleton('index/indexer')->getProcessesCollection();
        $this->db = Mage::getSingleton('core/resource')->getConnection('core_read');
        $this->write = Mage::getSingleton('core/resource')->getConnection('core_write');
        $this->gearmanQueue = Mage::getModel('gearman/queue');       
        $this->visibilityAttr = Mage::getModel('eav/entity_attribute')->loadByCode('catalog_product', 'visibility');
        $this->chunkSize = 100; // number of products that get passed to the worker at a time
    }

    private function setManualIndexMode()
    {
        foreach ($this->indexers as $indexer) {
          $indexer->setMode(Mage_Index_Model_Process::MODE_MANUAL)->save();
        }
    }

    private function setRealtimeIndexMode()
    {
        foreach ($this->indexers as $indexer) {
          $indexer->setMode(Mage_Index_Model_Process::MODE_REAL_TIME)->save();
        }
    }

    public function run()
    {
        $this->init();

        if (isset($this->_args['refresh-stock-status'])) {
            
             echo "Refreshing simple product stock status\n";

             /**
              * If you get errors like SQLSTATE[HY000]: General error: 1030 Got error -1 from storage engine,
              * try dropping a foreign key from index_process_event. After doing extensive research I found this:
              * http://www.lexitconsulting.com/2012/10/magento-reindexing-transactions-and-locks/
              *
              * ALTER TABLE index_process_event DROP FOREIGN KEY FK_INDEX_PROCESS_EVENT_EVENT_ID_INDEX_EVENT_EVENT_ID;
              **/

             /**
              * It's important that we sort configurables last, summary validity depends on simple products being updated first
              **/
             $sql = "SELECT entity_id FROM catalog_product_entity ORDER BY type_id DESC";
             $rows = $this->db->fetchAll($sql);
             $count = count($rows);

             foreach ($rows as $row) {
                 $entity_ids[] = $row['entity_id'];
             }
             
             $this->setManualIndexMode();

             $i = 0; // thread counter
             $threads = array();
             $chunks = array_chunk($entity_ids, $this->chunkSize, true);
             foreach ($chunks as $chunk) {
                 
                 /**
                  * This $event object is passed to the Gearman worker below
                  **/
                 $event = array();
                 $event['queue'] = 'refreshStockStatus';
                 $event['task']  = array(
                     'entity_ids'    => $chunk,
                     'count' => ($count - $i)
                 );

                 /**
                  * If you don't need access to the state of the dispatched event you can
                  * call Mage::dispatchEvent('gearman_do_async_task', $event) instead of 
                  * using the Gearman queue model
                  **/
                 $threads[] = $this->gearmanQueue->dispatchTask($event);
                 if ($i % 1000 == 0)
                     echo "Dispatched $i product refresh events to gearman\n";

                 $i = $i + $this->chunkSize;
             }

             $this->waitForThreads($threads);

             /**
             * The program won't proceed past this point until all threads are finished up
             **/
              
             $this->setRealtimeIndexMode();
        }
        else {
            echo $this->usageHelp();            
        }      
    }

    public function waitForThreads($threads)
    {
        $threadCount = count($threads);
        do {
            $done = 0;
            foreach ($threads as $id) {
                if ($this->gearmanQueue->checkTaskComplete($id))
                    $done++;
            }
            echo "Done: " . $done . " of " . $threadCount . "\n";
            sleep(1);
        }
        while ($done < $threadCount);
    }

    public function usageHelp()
    {
        return <<<USAGE
Usage:  php gearman.php -- [options]

    refresh-stock-status     Refresh stock status asynchronously

USAGE;
    }
}

require_once dirname(__FILE__) . '/../app/Mage.php';

Mage::app();

$gearman = new Mage_Shell_Gearman();
$gearman->run();

Then, create a gearman_worker.php script in the /shell directly as well. You’ll notice this file was referenced in the supervisord.conf program configuration. Supervisor is going to spawn multiple instances of this worker:

<?php
/*
==New BSD License==
 
Copyright (c) 2013, Drew Gillson
All rights reserved.
 
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
 
    * Redistributions of source code must retain the above copyright
      notice, this list of conditions and the following disclaimer.
    * Redistributions in binary form must reproduce the above copyright
      notice, this list of conditions and the following disclaimer in the
      documentation and/or other materials provided with the distribution.
    * The name of Drew Gillson may not be used to endorse or promote products
      derived from this software without specific prior written permission.
 
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER BE LIABLE FOR ANY
DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 */
 
/**
 * Gearman worker for Boost My Shop ERP (v2.9.1) extension for Magento (CE 1.7)
 *
 * @copyright  Copyright (c) 2013 Drew Gillson (http://drewgillson.com)
 * @license    http://framework.zend.com/license/new-bsd     New BSD License
 */

require_once dirname(__FILE__) . '/../app/Mage.php';
ini_set('memory_limit', '512M');

/**
 * If you make any changes to a worker you need to restart the worker
 * process with supervisorctl (the CLI utility for supervisord). If 
 * you change the number of threads supervisord will run you need
 * to restart supervisord itself.
 *
 * In order to make sure all background threads have been terminated
 * you might have to kill gearmand and restart it again. Monitor the
 * worker log to ensure what you think is happening is actually happening.
 **/

Mage::app();

$worker = new GearmanWorker();
$worker->addServer('127.0.0.1', 4730);
$worker->addFunction('refreshStockStatus', 'refreshStockStatus_fn');

echo "Waiting for work...\n";
while ($worker->work()) {
    if ($worker->returnCode() !== GEARMAN_SUCCESS) {
        echo "Oops: " . $worker->returnCode() . "\n";
        break;
    }
}

function refreshStockStatus_fn($job)
{
    $task = unserialize($job->workload());

    echo $task['count'] . " simples remaining\n";
    
    Mage::helper('lot_common/cli_erp')->refreshStockStatusAsync($task['entity_ids']);
    return serialize($task);
}

You’re really close! Good slugging.

This part is really important! If you make a change to your gearman_worker.php file you will need to use supervisorctl to restart all of your daemonized workers. That means typing supervisorctl at the command line (you might need to specifiy the location to your config file with the -c option), and then typing `restart all`. If you want to change the number of workers you have configured, you will need to type `shutdown` in the supervisorctl console, exit, and then start a new instance of supervisord itself.

As you might have noticed in the refreshStockStatus_fn function in the worker above, there is a call to a standard Magento helper. Here’s where the real work happens. Just one more Gist:

This method recalculates the current quantity on hand based on all the stock movements in our ERP system, refreshes the availability status cache entry for a product, and resets expected delivery dates and quantities. Remember, it is being called concurrently by many workers, so we are refreshing many, many products at once rather than one product at a time!

Perhaps we need just one more gist to show the alternative:

Will the code above work? Of course it will! It’s certainly a lot easier to understand. But try running it through a recordset with hundreds of thousands of products in it. It’s a non-starter.

After completing this gearman implementation catalog refresh stock status time decreased from 12 hours if it ever finished at all, to one hour.

Asynchronous programming is becoming a real favorite of mine and I have been exploring it lots in Node.js, pure Javascript, Ruby, and now with PHP and Gearman. The habits this development style encourages will certainly make you a better programmer!

There are many other benefits like being able to “fake it til you make it” re: performance issues – for instance, Ticketmaster and airlines’ websites “searching for tickets” spinners – these guys figured it out ages ago! Just give me something to look at for christs’ sake. Not to mention the opportunities for graceful handling of large batches of work like image processing, user uploads, saving large configuration changes, or communicating with slow external APIs.

Thanks for reading,