PHP 操作 Beanstalkd 方法及参数注释

Beanstalkd介绍

什么是Beanstalkd

Beanstalk,一个高性能、轻量级的分布式内存队列系统,最初设计的目的是想通过后台异步执行耗时的任务来降低高容量Web应用系统的页面访问延迟,支持过有9.5 million用户的Facebook Causes应用。
后来开源,现在有PostRank大规模部署和使用,每天处理百万级任务。Beanstalkd是典型的类Memcached设计,协议和使用方式都是同样的风格,所以使用过memcached的用户会觉得Beanstalkd似曾相识。

Beanstalkd运行流程

job典型的生命周期

put            reserve               delete
  -----> [READY] ---------> [RESERVED] --------> *poof*

job可能的状态迁移

put with delay               release with delay
  ----------------> [DELAYED] <------------.
                        |                   |
                 kick   | (time passes)     |
                        |                   |
   put                  v     reserve       |       delete
  -----------------> [READY] ---------> [RESERVED] --------> *poof*
                       ^  ^                |  |
                       |   \  release      |  |
                       |    `-------------'   |
                       |                      |
                       | kick                 |
                       |                      |
                       |       bury           |
                    [BURIED] <---------------'
                       |
                       |  delete
                        `--------> *poof*

Beanstalkd安装

安装略过,如需请参考 点击参考

注:需要安装Composer,教程链接 Composer安装方法

Pheanstalk安装

用于操作BeanstalkdPHP 第三方库,安装略过,如需请参考 点击参考

Pheanstalk操作

连接Beanstalkd

<?php
require __DIR__ . '/vendor/autoload.php';

use Pheanstalk\Pheanstalk;

/**
 * 实例化beanstalk
 * 参数依次为:ip地址 端口号默认11300 连接超时时间 是否长连接
 */
$pheanstalk = new Pheanstalk('127.0.0.1', 11300, 3, false);
?>

Beanstalkd状态方法

Beanstalkd状态

$stats = $pheanstalk->stats();

返回:

Pheanstalk\Response\ArrayResponse Object
(
    [_name:Pheanstalk\Response\ArrayResponse:private] => OK
    [storage:ArrayObject:private] => Array
        (
            [current-jobs-urgent] => 0 //当前存在优先级的任务数
            [current-jobs-ready] => 0 //当前准备就绪的任务数
            [current-jobs-reserved] => 0 //当前处于阻塞的任务数
            [current-jobs-delayed] => 0 //当前处于延迟状态的任务数
            [current-jobs-buried] => 0 //当前预留的任务数
            [cmd-put] => 0 //cmd命令为累计运行次数
            [cmd-peek] => 0
            [cmd-peek-ready] => 0
            [cmd-peek-delayed] => 0
            [cmd-peek-buried] => 0
            [cmd-reserve] => 0
            [cmd-reserve-with-timeout] => 0
            [cmd-delete] => 0
            [cmd-release] => 0
            [cmd-use] => 0
            [cmd-watch] => 0
            [cmd-ignore] => 0
            [cmd-bury] => 0
            [cmd-kick] => 0
            [cmd-touch] => 0
            [cmd-stats] => 1
            [cmd-stats-job] => 0
            [cmd-stats-tube] => 0
            [cmd-list-tubes] => 3
            [cmd-list-tube-used] => 0
            [cmd-list-tubes-watched] => 0
            [cmd-pause-tube] => 0
            [job-timeouts] => 0 //超时的任务
            [total-jobs] => 0 //任务总数
            [max-job-size] => 65535 //任务字符串大小
            [current-tubes] => 1 //当前的管道数
            [current-connections] => 1 //当前打开的连接数
            [current-producers] => 0 //当前生产者数量
            [current-workers] => 0 //当前消费者数量
            [current-waiting] => 0 //发出reserved指令,但没有响应的数量
            [total-connections] => 4 //累计链接数量
            [pid] => 530 //Beanstalkd进程id
            [version] => 1.10
            [rusage-utime] => 0.003545
            [rusage-stime] => 0.007473
            [uptime] => 105855 //运行时间(秒)
            [binlog-oldest-index] => 0
            [binlog-current-index] => 0
            [binlog-records-migrated] => 0
            [binlog-records-written] => 0
            [binlog-max-size] => 10485760
            [id] => 4199e3eca8bfdea8
            [hostname] => lmmlwendeMacBook-Air.local
        )
)

当前的管道列表

$listTubes = $pheanstalk->listTubes();

查看管道的详细信息

$stats = $pheanstalk->statsTube();

返回:

Pheanstalk\Response\ArrayResponse Object
(
    [_name:Pheanstalk\Response\ArrayResponse:private] => OK
    [storage:ArrayObject:private] => Array
        (
            [name] => default //当前管道名
            [current-jobs-urgent] => 0 //当前管道存在优先级的任务数
            [current-jobs-ready] => 0 //当前管道准备就绪的任务书
            [current-jobs-reserved] => 0 //当前管道处于阻塞的任务数
            [current-jobs-delayed] => 0 //当前管道处于延迟状态的任务数
            [current-jobs-buried] => 0 //当前管道预留的任务数
            [total-jobs] => 0 //当前管道总任务数
            [current-using] => 1 //当前管道生产者数量
            [current-watching] => 1 //当前管道消费者数量
            [current-waiting] => 0 //发出reserved指令,但没有响应的数量
            [cmd-delete] => 0
            [cmd-pause-tube] => 0
            [pause] => 0
            [pause-time-left] => 0
        )
)

查看任务的详细信息

$job =  $pheanstalk->watch('default')->reserve();
$job_stats = $pheanstalk->statsJob($job);

返回:

Pheanstalk\Response\ArrayResponse Object
(
    [_name:Pheanstalk\Response\ArrayResponse:private] => OK
    [storage:ArrayObject:private] => Array
        (
            [id] => 1 //任务job ID
            [tube] => test //所处管道
            [state] => reserved //当前状态
            [pri] => 1024 //任务优先级(默认1024)
            [age] => 469 //任务存活时间(秒)
            [delay] => 0 //任务延迟时间(秒)
            [ttr] => 60 //任务执行时间
            [time-left] => 59 //任务在reserve状态维持的秒数
            [file] => 0 // binlog-4 默认-0
            [reserves] => 2 //总共reserve次数
            [timeouts] => 0 //任务超时次数
            [releases] => 0 //重设任务次数
            [buries] => 0 //预留次数
            [kicks] => 0 //释放预留任务次数
        )
)

查看任务的详细信息(通过ID)

$job =  $pheanstalk->peek(1);
$job_stats = $pheanstalk->statsJob($job);

Beanstalkd生产者方法

指定需要使用的管道

$tube = $pheanstalk->useTube('default');

向管道插入数据

$tube = $pheanstalk->useTube('default');
$put = $tube->put(
    'hello, beanstalk', // 任务内容
    1024, // 任务的优先级
    10,  // 不等待直接放到ready队列中
    60 // 处理任务的时间
);

或者:

$pheanstalk->putInTube('default',  'test1', 1024, 10, 60);

Beanstalkd消费者方法

监听管道

$tube =  $pheanstalk->watch('user');

去除不需要监听的管道

$tube =  $pheanstalk->watch('user')->ignore('default');

以堵塞的方式监听管道

$job =  $pheanstalk->watch('user')->reserve(4); //堵塞时间为4秒

列出所有已经监听的管道

$pheanstalk->listTubesWatched();

watch + reserve 方法

$pheanstalk->reserveFromTube('default')

删除当前任务

$job =  $pheanstalk->watch('default')->reserve();
$pheanstalk->delete($job);

将当前任务重新放入管道

$job =  $pheanstalk->watch('default')->reserve();
$pheanstalk->release($job);

为任务续命(当处理任务的时间小于当前任务执行时间时)

$job =  $pheanstalk->watch('default')->reserve();
$pheanstalk->touch($job);
//TODO

将任务预留

$job =  $pheanstalk->watch('default')->reserve();
$pheanstalk->bury($job);

将预留任务释放(变为reday状态)

$job = $pheanstalk->peekBuried('default');
$pheanstalk->kickJob($job);

批量将预留任务释放

$pheanstalk->userTube('default')->kick(999); //将id小于999的预留任务全部释放

读取当前准备就绪的任务(ready)

$job = $pheanstalk->peekReady('default');

读取当前处于延迟状态的任务(delayed)

$job = $pheanstalk->peekDelayed('default');

对管道设置延迟

$pheanstalk->pauseTube('default', 100); //设置100秒延迟

取消对管道的延迟

$pheanstalk->resumeTube('default');