CronQueueTest.php

Same filename and directory in other branches
  1. 9 core/modules/system/tests/src/Kernel/System/CronQueueTest.php
  2. 8.9.x core/modules/system/tests/src/Kernel/System/CronQueueTest.php
  3. 10 core/modules/system/tests/src/Kernel/System/CronQueueTest.php

Namespace

Drupal\Tests\system\Kernel\System

File

core/modules/system/tests/src/Kernel/System/CronQueueTest.php

View source
<?php

declare (strict_types=1);
namespace Drupal\Tests\system\Kernel\System;

use Drupal\Core\Database\Database;
use Drupal\Core\DependencyInjection\ContainerBuilder;
use Drupal\Core\Logger\RfcLogLevel;
use Drupal\Core\Queue\DatabaseQueue;
use Drupal\Core\Queue\Memory;
use Drupal\cron_queue_test\Plugin\QueueWorker\CronQueueTestDeriverQueue;
use Drupal\cron_queue_test\Plugin\QueueWorker\CronQueueTestException;
use Drupal\cron_queue_test\Plugin\QueueWorker\CronQueueTestRequeueException;
use Drupal\cron_queue_test\Plugin\QueueWorker\CronQueueTestSuspendQueue;
use Drupal\Core\Queue\QueueWorkerManagerInterface;
use Drupal\KernelTests\KernelTestBase;
use Drupal\cron_queue_test\Plugin\QueueWorker\CronQueueTestDatabaseDelayException;
use Prophecy\Argument;
use Psr\Log\LoggerInterface;

/**
 * Tests the Cron Queue runner.
 *
 * @group system
 */
class CronQueueTest extends KernelTestBase {
  
  /**
   * {@inheritdoc}
   */
  protected static $modules = [
    'system',
    'cron_queue_test',
  ];
  
  /**
   * The database connection.
   *
   * @var \Drupal\Core\Database\Connection
   */
  protected $connection;
  
  /**
   * The cron service.
   *
   * @var \Drupal\Core\Cron
   */
  protected $cron;
  
  /**
   * The fake current time used for queue worker / cron testing purposes.
   *
   * This value should be greater than or equal to zero.
   *
   * @var int
   */
  protected $currentTime = 1000;
  
  /**
   * A logger for testing.
   *
   * @var \Prophecy\Prophecy\ObjectProphecy|\Psr\Log\LoggerInterface
   */
  protected $logger;
  
  /**
   * {@inheritdoc}
   */
  protected function setUp() : void {
    // Setup logger before register() is called.
    $this->logger = $this->prophesize(LoggerInterface::class);
    parent::setUp();
    $this->connection = Database::getConnection();
    $time = $this->prophesize('Drupal\\Component\\Datetime\\TimeInterface');
    $time->getCurrentTime()
      ->willReturn($this->currentTime);
    $time->getCurrentMicroTime()
      ->willReturn(100.0);
    $time->getRequestTime()
      ->willReturn($this->currentTime);
    \Drupal::getContainer()->set('datetime.time', $time->reveal());
    $this->assertEquals($this->currentTime, \Drupal::time()->getCurrentTime());
    $this->assertEquals($this->currentTime, \Drupal::time()->getRequestTime());
    $realQueueFactory = $this->container
      ->get('queue');
    $queue_factory = $this->prophesize(get_class($realQueueFactory));
    $database = new DatabaseQueue('cron_queue_test_database_delay_exception', $this->connection);
    $memory = new Memory('cron_queue_test_memory_delay_exception');
    $queue_factory->get('cron_queue_test_database_delay_exception', Argument::cetera())
      ->willReturn($database);
    $queue_factory->get('cron_queue_test_memory_delay_exception', Argument::cetera())
      ->willReturn($memory);
    $queue_factory->get(Argument::any(), Argument::cetera())
      ->will(function ($args) use ($realQueueFactory) {
      return $realQueueFactory->get($args[0], $args[1] ?? FALSE);
    });
    $this->container
      ->set('queue', $queue_factory->reveal());
    // Instantiate the `cron` service after the mock queue factory is set.
    $this->cron = \Drupal::service('cron');
  }
  
  /**
   * Tests that DelayedRequeueException behaves as expected when running cron.
   */
  public function testDelayException() : void {
    $database = $this->container
      ->get('queue')
      ->get('cron_queue_test_database_delay_exception');
    $memory = $this->container
      ->get('queue')
      ->get('cron_queue_test_memory_delay_exception');
    // Ensure that the queues are of the correct type for this test.
    $this->assertInstanceOf('Drupal\\Core\\Queue\\DelayableQueueInterface', $database);
    $this->assertNotInstanceOf('Drupal\\Core\\Queue\\DelayableQueueInterface', $memory);
    // Get the queue worker plugin manager.
    $manager = $this->container
      ->get('plugin.manager.queue_worker');
    $definitions = $manager->getDefinitions();
    $this->assertNotEmpty($database_lease_time = $definitions['cron_queue_test_database_delay_exception']['cron']['time']);
    $this->assertNotEmpty($memory_lease_time = $definitions['cron_queue_test_memory_delay_exception']['cron']['time']);
    // Create the necessary test data and run cron.
    $database->createItem('test');
    $memory->createItem('test');
    $this->cron
      ->run();
    // Fetch the expiry time for the database queue.
    $query = $this->connection
      ->select('queue');
    $query->condition('name', 'cron_queue_test_database_delay_exception');
    $query->addField('queue', 'expire');
    $query->range(0, 1);
    $expire = $query->execute()
      ->fetchField();
    // Assert that the delay interval is greater than the lease interval. This
    // allows us to assume that (if updated) the new expiry time will be greater
    // than the initial expiry time. We can then also assume that the new expiry
    // time offset will be identical to the delay interval.
    $this->assertGreaterThan($database_lease_time, CronQueueTestDatabaseDelayException::DELAY_INTERVAL);
    $this->assertGreaterThan($this->currentTime + $database_lease_time, $expire);
    $this->assertEquals(CronQueueTestDatabaseDelayException::DELAY_INTERVAL, $expire - $this->currentTime);
    // Ensure that the memory queue expiry time is unchanged after the
    // DelayedRequeueException has been thrown.
    $property = (new \ReflectionClass($memory))->getProperty('queue');
    $memory_queue_internal = $property->getValue($memory);
    $this->assertEquals($this->currentTime + $memory_lease_time, reset($memory_queue_internal)->expire);
  }
  
  /**
   * Tests that leases are expiring correctly, also within the same request.
   */
  public function testLeaseTime() : void {
    $queue = $this->container
      ->get('queue')
      ->get('cron_queue_test_lease_time');
    $queue->createItem([
      $this->randomMachineName() => $this->randomMachineName(),
    ]);
    // Run initial queue job and ensure lease time variable is initialized.
    $this->cron
      ->run();
    static::assertEquals(1, \Drupal::state()->get('cron_queue_test_lease_time'));
    // Ensure the same queue job is not picked up due to the extended lease.
    $this->cron
      ->run();
    static::assertEquals(1, \Drupal::state()->get('cron_queue_test_lease_time'));
    // Set the expiration time to 3 seconds ago, so the lease should
    // automatically expire.
    \Drupal::database()->update(DatabaseQueue::TABLE_NAME)
      ->fields([
      'expire' => $this->currentTime - 3,
    ])
      ->execute();
    // The queue job should now be picked back up since it's lease has expired,
    // and the state variable should be consequently incremented.
    $this->cron
      ->run();
    static::assertEquals(2, \Drupal::state()->get('cron_queue_test_lease_time'));
    // Ensure the same queue job is not picked up again due to the extended
    // lease.
    $this->cron
      ->run();
    static::assertEquals(2, \Drupal::state()->get('cron_queue_test_lease_time'));
  }
  
  /**
   * Tests that non-queue exceptions thrown by workers are handled properly.
   *
   * @see \Drupal\cron_queue_test\Plugin\QueueWorker\CronQueueTestException
   */
  public function testUncaughtExceptions() : void {
    $this->logger
      ->log(RfcLogLevel::ERROR, '%type: @message in %function (line %line of %file).', Argument::that(function ($args) {
      return $args['@message'] === 'That is not supposed to happen.' && $args['exception'] instanceof \Exception;
    }))
      ->shouldBeCalled();
    $this->logger
      ->log(RfcLogLevel::INFO, 'Cron run completed.', Argument::cetera())
      ->shouldBeCalled();
    // Get the queue to test the normal Exception.
    $queue = $this->container
      ->get('queue')
      ->get(CronQueueTestException::PLUGIN_ID);
    // Enqueue an item for processing.
    $queue->createItem([
      $this->randomMachineName() => $this->randomMachineName(),
    ]);
    // Run cron; the worker for this queue should throw an exception and handle
    // it.
    $this->cron
      ->run();
    $this->assertEquals(1, \Drupal::state()->get('cron_queue_test_exception'));
    // The item should be left in the queue.
    $this->assertEquals(1, $queue->numberOfItems(), 'Failing item still in the queue after throwing an exception.');
    // Expire the queue item manually. system_cron() relies in
    // \Drupal::time()->getRequestTime() to find queue items whose expire field
    // needs to be reset to 0. This is a Kernel test, so
    // \Drupal::time()->getRequestTime() won't change when cron runs.
    // @see system_cron()
    // @see \Drupal\Core\Cron::processQueues()
    $this->connection
      ->update('queue')
      ->condition('name', 'cron_queue_test_exception')
      ->fields([
      'expire' => \Drupal::time()->getRequestTime() - 1,
    ])
      ->execute();
    $this->cron
      ->run();
    $this->assertEquals(2, \Drupal::state()->get('cron_queue_test_exception'));
    $this->assertEquals(0, $queue->numberOfItems(), 'Item was processed and removed from the queue.');
  }
  
  /**
   * Tests suspend queue exception is handled properly.
   *
   * @see \Drupal\cron_queue_test\Plugin\QueueWorker\CronQueueTestSuspendQueue
   * @covers \Drupal\Core\Queue\SuspendQueueException
   */
  public function testSuspendQueueException() : void {
    $this->logger
      ->log(RfcLogLevel::DEBUG, 'A worker for @queue queue suspended further processing of the queue.', Argument::that(function ($args) {
      return $args['@queue'] === CronQueueTestSuspendQueue::PLUGIN_ID;
    }))
      ->shouldBeCalled();
    $this->logger
      ->log(RfcLogLevel::INFO, 'Cron run completed.', Argument::cetera())
      ->shouldBeCalled();
    // Get the queue to test the specific SuspendQueueException.
    $queue = \Drupal::queue(CronQueueTestSuspendQueue::PLUGIN_ID);
    // Enqueue several item for processing.
    $queue->createItem('process');
    $queue->createItem('suspend');
    $queue->createItem('ignored');
    // Run cron; the worker for this queue should process as far as the
    // suspending item.
    $this->cron
      ->run();
    // Only one item should have been processed.
    $this->assertEquals(2, $queue->numberOfItems(), 'Suspended queue stopped processing at the suspending item.');
    // Check the items remaining in the queue. The item that throws the
    // exception gets released by cron, so we can claim it again to check it.
    $item = $queue->claimItem();
    $this->assertEquals('suspend', $item->data, 'Suspending item remains in the queue.');
    $item = $queue->claimItem();
    $this->assertEquals('ignored', $item->data, 'Item beyond the suspending item remains in the queue.');
  }
  
  /**
   * Tests requeue exception is handled properly.
   *
   * @see \Drupal\cron_queue_test\Plugin\QueueWorker\CronQueueTestRequeueException
   * @covers \Drupal\Core\Queue\RequeueException
   */
  public function testRequeueException() : void {
    // Test the requeueing functionality.
    $queue = $this->container
      ->get('queue')
      ->get(CronQueueTestRequeueException::PLUGIN_ID);
    $queue->createItem([]);
    $this->cron
      ->run();
    $this->assertEquals(2, \Drupal::state()->get('cron_queue_test_requeue_exception'));
    $this->assertEquals(0, $queue->numberOfItems());
  }
  
  /**
   * Tests that database queue implementation complies with interfaces specs.
   */
  public function testDatabaseQueueReturnTypes() : void {
    /** @var \Drupal\Core\Queue\DatabaseQueue $queue */
    $queue = $this->container
      ->get('queue')
      ->get('cron_queue_test_database_delay_exception');
    static::assertInstanceOf(DatabaseQueue::class, $queue);
    $queue->createItem(12);
    $item = $queue->claimItem();
    static::assertTrue($queue->delayItem($item, 1));
    static::assertTrue($queue->releaseItem($item));
    $queue->deleteItem($item);
    static::assertFalse($queue->delayItem($item, 1));
    static::assertFalse($queue->releaseItem($item));
  }
  
  /**
   * Test safeguard against invalid annotations in QueueWorkerManager.
   */
  public function testQueueWorkerManagerSafeguard() : void {
    $queue_worker_manager = $this->container
      ->get('plugin.manager.queue_worker');
    $plugin_id = 'test_plugin_id';
    // Ensure if no cron annotation is provided, none is added.
    $definition = [];
    $queue_worker_manager->processDefinition($definition, $plugin_id);
    $this->assertArrayNotHasKey('cron', $definition);
    // Ensure if an empty cron annotation is provided, the default lease time is
    // added.
    $definition = [
      'cron' => [],
    ];
    $queue_worker_manager->processDefinition($definition, $plugin_id);
    $this->assertArrayHasKey('time', $definition['cron']);
    $this->assertEquals(QueueWorkerManagerInterface::DEFAULT_QUEUE_CRON_TIME, $definition['cron']['time']);
    // Ensure if an invalid lease time (less-than 1 second) is provided, it is
    // overridden with the default lease time.
    $definition = [
      'cron' => [
        'time' => 0,
      ],
    ];
    $queue_worker_manager->processDefinition($definition, $plugin_id);
    $this->assertEquals(QueueWorkerManagerInterface::DEFAULT_QUEUE_CRON_TIME, $definition['cron']['time']);
    $definition = [
      'cron' => [
        'time' => -1,
      ],
    ];
    $queue_worker_manager->processDefinition($definition, $plugin_id);
    $this->assertEquals(QueueWorkerManagerInterface::DEFAULT_QUEUE_CRON_TIME, $definition['cron']['time']);
  }
  
  /**
   * Tests that cron queues from derivers work.
   */
  public function testQueueWorkerDeriver() : void {
    $this->assertEquals(0, \Drupal::state()->get(CronQueueTestDeriverQueue::PLUGIN_ID, 0));
    $queue = \Drupal::queue(sprintf('%s:foo', CronQueueTestDeriverQueue::PLUGIN_ID));
    $queue->createItem('foo');
    $this->cron
      ->run();
    $this->assertEquals(1, \Drupal::state()->get(CronQueueTestDeriverQueue::PLUGIN_ID));
  }
  
  /**
   * {@inheritdoc}
   */
  public function register(ContainerBuilder $container) : void {
    parent::register($container);
    $container->register('test_logger', get_class($this->logger
      ->reveal()))
      ->addTag('logger');
    $container->set('test_logger', $this->logger
      ->reveal());
  }

}

Classes

Title Deprecated Summary
CronQueueTest Tests the Cron Queue runner.

Buggy or inaccurate documentation? Please file an issue. Need support? Need help programming? Connect with the Drupal community.