<?php $client = new GearmanClient(); $client->addServer(); do { //同步方式提交任务,所以$result接受到的是$worker处理完毕所返回的结果。 //$result = $client->doNormal("unsubscribe", "token@xxxx.com"); //异步方式: $job_handle = $client->doBackground("unsubscribe", "token@xxxx.com"); //异步方式可以用$client->returnCode() != GEARMAN_SUCCESS 判断任务提交是否成功(不代表处理完成) //异步方式可以通过$status = $client->jobStatus($job_handle)来获取工作进度$status是由worker端提供的。 switch ($client->returnCode()) { case GEARMAN_WORK_DATA: echo "Data:$result\n"; break; case GEARMAN_WORK_STATUS: list($numerator, $denominator) = $client->doStatus(); echo "Status:$numerator/$denominator complete\n"; break; case GEARMAN_WORK_FAIL: echo "Failed\n"; exit; case GEARMAN_SUCCESS: echo "Success:$result\n"; break; default: echo "RET:{$client->returnCode()}\n"; exit; } } while($client->returnCode() != GEARMAN_SUCCESS); //gearman还可以并行处理任务 /* $client->setCompleteCallback(function($task){ echo $task->data(), "\n"; //获取由worker返回的数据 }); $client->addTask('sum', json_encode([1, 100])); $client->addTask('sum', json_encode([101, 200])); $client->addTask('sum', json_encode([201, 300])); $client->runTasks(); */ ?>
Worker.php
<?php $worker = new GearmanWorker(); $worker->addServer(); // Add default server_info (localhost) $worker->addFunction('unsubscribe', 'do_unsubscribe'); //函数名 $worker->addFunction('unsubscribe3', [new MyWorker(), 'doUnsubscribe']); //对象 $worker->addFunction('unsubscribe2', function ($job){ //匿名函数 echo 'unsubscribe2:', $job->workload(), "\n"; }); while ($worker->work()) { if ($worker->returnCode() != GEARMAN_SUCCESS) { echo "return_code:{$worker->returnCode()}\n"; break; } } function do_unsubscribe($job){ echo "Received job:{$job->handle()}\n"; $workload = $job->workload(); $workload_size = $job->workloadSize(); echo "Workload:$workload($workload_size)\n"; for ($x = 0; $x < $workload_size; $x++) { echo "Sending status: " . ($x + 1) . "/$workload_size complete\n"; $job->sendStatus($x, $workload_size); sleep(1); } echo "$workload\n"; return true;//or false } class MyWorker{ public function doUnsubscribe($job){ echo 'unsubscribe3:', $job->workload(), "\n"; } } ?>
Gearmand队列持久化方案(MySQL):
在gearmand启动时加上参数:/usr/sbin/gearmand -d -q MySQL --mysql-host=localhost --mysql-port 3306 --mysql-user=myuser --mysql-password=123456 --mysql-db=gearman --mysql-table=gearman_queue
其它参数配置:-L 127.0.0.1 -p 4730 -l /mnt/lnx_log/gearmand.log。
需要注意的是,如果指定了gearmand的监听地址和端口,那么在代码上调用addServer时需要显示地指定IP和Port.