一:队列场景
当我们使用某讯或则某浪的邮箱时,点击群发短信以后,只需等待很短的时间,浏览器提示递交成功,正在发送之类的信息时,用户就可以关闭浏览器,稍后,收件地址栏里的邮箱将相继收到该群发短信,再例如群发定时短信,以及当商城系统中有客户下单,顾客,客服,库房等相关人员收到订单电邮信息。诸这么类,队列的应用范围是这么之广。
二:普通工程师的解决方案和构架师的解决方案
方案1:建表存电邮,消息等,用定时程序取出发送。
方案2:具象到更初一层,开发一套通用异步处理队列适用于任何复杂的业务逻辑
这么,作为构架师,使用队列的做法,将具象层和业务层分离,可具有良好的扩充性和可维护性。相比较而言就高明了许多,
下边就我们介绍一下自定义队列的实现思路和技巧。
三:队列总体设计
1:须要队列程序linux环境配置,提供加入队列插口和取队列插口等
2:须要储存队列,文件或则数据库
3:须要定时程序取出队列并执行
4:其它扩充功能:优先级,日志,定时等
代码的目录结构如下linux定时执行php文件,每位文件的作用用//注释来标注
|–addTask.php//添加任务到队列的事例
|–cronMission.php//定时任务调度程序,比如linux中受crontab
直接调用的文件,业务逻辑工程师可以在这个文件中灵活定义自己的队列任务,因而不用每位队列任务都须要上服务器更改crontab,因而在安全性,方便性方面有很大提升
|–db.php//数据库操作
|–db.sql//构建队列须要用到的基本表结构
|–doQueue.php//执行队列任务
|–Queue.class.php//队列核心业务在这儿定义linux系统下载,包括将任务加入队列,读队列,修改队列任务状态
|–sendMsg.php//队列要实现具体任务的业务插口,例如现有系统的发送消息的插口,这儿事例中由于将此队列程序和现有系统系统集成,用写入日志来演示
四:队列具体实现一:建任务储存表
1:先来个最基本的:
CREATE TABLE`queue` (
id int(11) NOT NULL auto_increment primarykey,
taskphp varchar(128) NOT NULL default '',
param text not null default '',
status tinyint not null default 0,
ctime timestamp NOT NULL default CURRENT_TIMESTAMP,
KEY (ctime)
) ENGINE=InnoDBDEFAULT CHARSET=utf8;
数组解释:
taskphp:处理业务的插口文件
param:处理业务的插口文件须要接收的参数
status:任务处理状态,0为未处理,处理完毕修改为1
五、队列具体实现二:定义调用插口
<?php
/**
* @author:cyw0413
* 任务队列实现
* date:2018-11-01
*/
include_once('db.php');
class Queue
{
/**
* 把任务扔到队列
*
* @param string $taskphp 执行任务的程序
* @param string $param 执行任务程序所用的参数
* 例如,群发消息加入队列:
* $arr = array(
* "uid" => 4,//发信息的人的UID
* "uids" => array(6,234,34,67,7888,2355), //接收信息的人的UID
* "content" => 'xxxxx',//信息内容
* );
* $cqueue = new Queue();
* $cqueue->add("/app/send_msg.php", serialize($arr));
*
*/
public function add($taskphp,$param)
{
$taskphp = mysql_real_escape_string($taskphp);
//$param = mysql_real_escape_string($param);
$param = $param;
$sql = "insert into queue (taskphp, param) values('".$taskphp."', '".$param."')";
$re = execute($sql);
if ($re){
$pid = mysql_insert_id();
return $pid;
}else{
return false;
}
}
/**
* 读取任务队列
*
* @param string $limit 一次取多少条
*/
public function getQueueTask($limit = 1000)
{
$limit = (int)$limit;
$sql = "select id, taskphp, param from queue where status = 0 order by id asc";
$re = query($sql);
return $re;
}
/**
* 更新任务状态
*
* @param string $limit 一次取多少条
*/
public function updateTaskByID($id)
{
$id = (int)$id;
$mtime = time();
$sql = "update queue set status =1, mtime = ".$mtime." where id = ".$id;
$re = execute($sql);
return $re;
}
public static function a2s($arr)
{
$str = "";
foreach ($arr as $key => $value){
if (is_array($value)){
foreach ($value as $value2){
$str .= urlencode($key) . "[]=" . urlencode($value2) . "&";
}
}else{
$str .= urlencode($key) . "=" . urlencode($value) . "&";
}
}
return $str;
}
public static function s2a($str){
$arr = array();
parse_str($str, $arr);
return $arr;
}
}
?>
1:加入队列插口
l//$param1为执行任务的程序,$param2为程序参数,可以为序列化的数据
l$cqueue->add($param1,$param2);
2:读取队列插口
l$tasks=$cqueue->getQueueTask($limit=1000);
3:更新任务状态
l$cqueue->updateTaskStatus($id);
4:a2s是自定义的一个链表转换字符串方式,这儿不要使用json_encode,容易出现问题,同样,从数据库中取出转换为字段的时侯,使用s2a方式
l$re=$cqueue->add("sendMsg.php",Queue::a2s($arr));
六、队列具体实现三:写执行队列的程序
按照设计,执行队列的程序文件是do_queue.php,它的主要功能是把任务从队列表里取下来,但是在后台执行。
do_queue.php部份代码:
<?php
$phpcmd = exec("which php"); //查找到php安装位置
$cqueue = new Queue();
$tasks = $cqueue->getQueueTask(200);
foreach ($tasks as $t){
$taskphp = $t['taskphp'];
$param = $t['param'];
$job = $phpcmd . " " . escapeshellarg($taskphp) . " " . escapeshellarg($param);
system($job);
七、具体任务的业务实现
还是拿群发消息来做事例,我们须要写好一个群发消息的程序,这个程序接收事先定义好的参数,之后按照参数调用发消息的插口把消息发送出去。
这个通常由做业务功能的工程师实现。并且构架师事先得写文档事例,教会他人使用。
send_msg.php:
<?php
$para = $argv[1];
$arr = unserialize($para);
$cmessage = new Message();
foreach($arr['uids'] as $touid){
$cmessage->send($arr['uid'], $touid, $arr['content']);
}
八、服务器布署一:配置crontab
俺们执行队列的程序都写好了,这个程序如何触发呢,其实就要用到linux的定时任务,每隔一定的时间,执行do_queue.php一次。并且呢,这儿不是直接调用do_queue.php,俺们再提高三层linux定时执行php文件,加个调度程序cron_mission.php,在cron_mission.php上面调用do_queue.php
配置定时任务crontab:
lcrontab–e
l*****cd/ucai/schedule;phpcron_mission.php>>cron_mission.log
#可以先使用crontab-l查看本机早已使用的定时任务
九、服务器布署二:写定时任务调度程序
思路:将定时任务写入到任务调度程序cron_mission.php中,这样可以在cron_mission.php中灵活控制队列任务。相比较直接通过crontab控制doQueue.php而言,防止了频繁更改服务器上的crontab,从安全,以便维护等角度来说,都是下策。
cron_mission.php示例:
<?php
if ($minute % 5 == 0){
if(chdir($site_dir."app/")) {
$cmd = "$phpcmd do_queue.php > do_queue.log &";
echo '[' , $ymd , ' ' , $hour , ':' , $minute , '] ' , $cmd , "n";
system($cmd);
}
}
十、开启多进程并发执行队列
思路:对任务序列进行编号,数据库中执行的时侯
where条件加上id%每位队列要执行任务总量=队列编号
这样可以防止重复处理
比如:每位进程执行10条任务,更改如下
1:定时任务的更改
更改前:
<?php
if ($minute % 5 == 0){
if(chdir($site_dir."app/")) {
$cmd = "$phpcmd do_queue.php > do_queue.log &";
echo '[' , $ymd , ' ' , $hour , ':' , $minute , '] ' , $cmd , "n";
system($cmd);
}
}
更改后:
<?php
if ($minute % 5 == 0){
for ($i=0; $i < 10; $i++) {
$cmd = "$phpcmd doQueue.php 10 $i>> doQueueMission".date('Y-m-d').".log ";
echo date("Y-m-d H:i:s") . "t : " .$cmd."n";
system($cmd);
}
}
//每次进行10个进程,$i来分辨是当前的进程标识
2:队列执行程序的更改
更改前:
<?php
$phpcmd = 'D:workwampbinphpphp5.3.10php ';
$cqueue = new Queue();
$tasks = $cqueue->getQueueTask(200);
更改后:
<?php
$phpcmd = 'D:workwampbinphpphp5.3.10php ';
$total=$argv[1];
$i=$argb[2];
$cqueue = new Queue();
$tasks = $cqueue->getQueueTask($total,$i,200);
3:取队列插口的更改
更改前:
<?php
public function getQueueTask($limit = 1000)
{
$limit = (int)$limit;
$sql = "select id, taskphp, param from queue where status = 0 order by id asc";
$re = query($sql);
return $re;
}
更改后:
<?php
public function getQueueTask($total,$i,$limit = 1000)
{
$limit = (int)$limit;
$sql = "select id, taskphp, param from queue where status = 0 and id%$total=$i order by id asc";
$re = query($sql);
return $re;
}