CRMEB_PRO_M/app/services/other/queue/QueueServices.php

889 lines
35 KiB
PHP
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

<?php
// +----------------------------------------------------------------------
// | CRMEB [ CRMEB赋能开发者助力企业发展 ]
// +----------------------------------------------------------------------
// | Copyright (c) 2016~2020 https://www.crmeb.com All rights reserved.
// +----------------------------------------------------------------------
// | Licensed CRMEB并不是自由软件未经许可不能去掉CRMEB相关版权
// +----------------------------------------------------------------------
// | Author: CRMEB Team <admin@crmeb.com>
// +----------------------------------------------------------------------
namespace app\services\other\queue;
use app\dao\other\queue\QueueDao;
use app\jobs\BatchHandleJob;
use app\services\BaseServices;
use app\services\activity\coupon\StoreCouponIssueServices;
use app\services\order\StoreCartServices;
use app\services\order\StoreOrderDeliveryServices;
use app\services\order\StoreOrderServices;
use app\services\product\category\StoreProductCategoryServices;
use app\services\product\product\StoreProductRelationServices;
use app\services\product\product\StoreProductServices;
use app\services\product\sku\StoreProductRuleServices;
use app\services\user\group\UserGroupServices;
use app\services\user\label\UserLabelRelationServices;
use app\services\user\label\UserLabelServices;
use app\services\user\UserServices;
use crmeb\exceptions\AdminException;
use crmeb\services\CacheService;
use think\exception\ValidateException;
use think\facade\Log;
/**
* 队列
* Class QueueServices
* @package app\services\other\queue
* @mixin QueueDao
*/
class QueueServices extends BaseServices
{
/**
* 任务类型名称
* @var string[]
*/
public $queue_type_name = [
1 => "批量发放用户优惠券",
2 => "批量设置用户分组",
3 => "批量设置用户标签",
4 => "批量下架商品",
5 => "批量删除商品规格",
6 => "批量删除订单",
7 => "批量手动发货",
8 => "批量打印电子面单",
9 => "批量配送",
10 => "批量虚拟发货",
];
/**
* 任务redis缓存key
* @var string[]
*/
public $queue_redis_key = [
1 => "DrivingSendCoupon-ADMIN",
2 => "DrivingUserGroup-ADMIN",
3 => "DrivingUserLabel-ADMIN",
4 => "DrivingProductUnshow-ADMIN",
5 => "DrivingProductRule-ADMIN",
6 => "DrivingOrderDel-ADMIN",
7 => 3,
8 => 4,
9 => 5,
10 => 6,
];
/**
* 状态
* @var string[]
*/
protected $status_name = [
0 => '未处理',
1 => '正在处理',
2 => '完成',
3 => '失败'
];
public function __construct(QueueDao $dao)
{
$this->dao = $dao;
}
/**
* 获取任务列表
* @param array $where
*/
public function getList(array $where = [])
{
[$page, $limit] = $this->getPageValue();
$list = $this->dao->getList($where, $page, $limit);
if ($list) {
foreach ($list as &$v) {
$v['finish_time'] = $v['finish_time'] ? date('Y-m-d H:i:s', $v['finish_time']) : "";
$v['first_time'] = $v['first_time'] ? date('Y-m-d H:i:s', $v['first_time']) : "";
$v['again_time'] = $v['again_time'] ? date('Y-m-d H:i:s', $v['again_time']) : "";
$v['status_cn'] = $this->status_name[$v['status']] ?? '';
$v['is_show_log'] = false;
if (in_array($v['type'], [7, 8, 9, 10])) {
$v['is_show_log'] = true;
$v['is_error_button'] = $v['status'] == 2;
}
$v['type_cn'] = $this->queue_type_name[$v['type']] ?? '';
$v['cache_type'] = $this->queue_redis_key[$v['type']] ?? 0;
$v['success_num'] = bcsub($v['total_num'], $v['surplus_num'], 0);
//是否显示停止按钮
$v['is_stop_button'] = $v['status'] == 1;
$v['add_time'] = date('Y-m-d H:i:s', $v['add_time']);
}
}
$count = $this->dao->count($where);
return compact('list', 'count');
}
/**
* 将要执行的任务数据存入表中
* @param array $where
* @param string $field
* @param array $data
* @param int $type
* @return mixed
*/
public function setQueueData(array $where = [], $field = "*", array $data = [], int $type = 1, $other = false)
{
if (!$type) throw new ValidateException('缺少执行任务类型');
$queue_redis_keys = $this->queue_redis_key;
$redisKey = $queue_redis_keys[$type] ?? '';
$queue_type_name = $this->queue_type_name;
$queueName = $queue_type_name[$type] ?? '';
if (!$redisKey || !$queueName) {
throw new ValidateException('缺少队列缓存KEY或者不存在此类型队列');
}
//检查同类型其他任务
$this->checkTypeQueue($redisKey);
$source = 'admin';
if (in_array($type, [1, 2, 3, 4, 5, 6])) {
$queueDataNum = $this->setRedisData($redisKey, $type, $data, $where, $field);
if (!$queueDataNum) {
throw new ValidateException('需要执行的批量数据为空');
}
if (!$id = $this->dao->addQueueList($queueName, $queueDataNum, $type, $redisKey, $source)) {
throw new ValidateException('添加队列失败');
}
} else {
if ($type == 7) {
$ids = array_column($data, 0);
} else {
$ids = $data;
}
/** @var StoreOrderServices $orderService */
$orderService = app()->make(StoreOrderServices::class);
$oids = $orderService->getOrderDumpData($where, $ids, $field);
$order_ids = [];
if ($oids) {
//过滤拼团未完成订单
foreach ($oids as $order) {
if (isset($order['pinkStatus']) && $order['pinkStatus'] != 2) {
continue;
}
$order_ids[] = $order['id'];
}
}
if (!$order_ids) {
throw new ValidateException('暂无需要发货订单');
}
if (!$id = $this->dao->addQueueList($queueName, count($ids), $type, $redisKey, $source)) {
throw new ValidateException('添加队列失败');
}
/** @var QueueAuxiliaryServices $auxiliaryService */
$auxiliaryService = app()->make(QueueAuxiliaryServices::class);
$auxiliaryService->saveQueueOrderData($id, $order_ids, $data, $type, $redisKey);
}
return $id;
}
/**
* 队列数据放入redis集合
* @param $redisKey
* @param $type
* @param array $dataIds
* @param array $where
* @param string $filed
* @return int
* @throws \think\db\exception\DataNotFoundException
* @throws \think\db\exception\DbException
* @throws \think\db\exception\ModelNotFoundException
*/
public function setRedisData($redisKey, $type, array $dataIds, array $where, $filed = "*")
{
if (!$redisKey || !$type) return 0;
/** @var CacheService $redis */
$redis = app()->make(CacheService::class);
if ($dataIds) {
foreach ($dataIds as $v) {
$redis->sAdd($redisKey, $v);
}
} else {
if ($where) {
foreach ($where as $k => $v) {
if (!$v) unset($where[$k]);
}
}
switch ($type) {
case 1://批量发放优惠券
case 2://批量设置用户分组
case 3://批量设置用户标签
/** @var UserServices $userService */
$userService = app()->make(UserServices::class);
$dataInfo = $userService->getUserInfoList($where, $filed);
if ($dataInfo) {
foreach ($dataInfo as $k => $v) {
$redis->sAdd($redisKey, $v['uid']);
}
}
break;
case 4://批量上下架商品
$cateIds = [];
if (isset($where['cate_id']) && $where['cate_id']) {
/** @var StoreProductCategoryServices $storeCategory */
$storeCategory = app()->make(StoreProductCategoryServices::class);
$cateIds = $storeCategory->getColumn(['pid' => $where['cate_id']], 'id');
}
if ($cateIds) {
$cateIds[] = $where['cate_id'];
$where['cate_id'] = $cateIds;
}
/** @var StoreProductServices $productService */
$productService = app()->make(StoreProductServices::class);
$dataInfo = $productService->getProductListByWhere($where, $filed);
if ($dataInfo) {
foreach ($dataInfo as $k => $v) {
$redis->sAdd($redisKey, $v['id']);
}
}
break;
case 5://批量删除商品规格
/** @var StoreProductRuleServices $productRuleService */
$productRuleService = app()->make(StoreProductRuleServices::class);
$dataInfo = $productRuleService->getProductRuleList($where, $filed);
if ($dataInfo) {
foreach ($dataInfo as $k => $v) {
$redis->sAdd($redisKey, $v['id']);
}
}
break;
case 6://批量删除用户已删除订单
/** @var StoreOrderServices $orderService */
$orderService = app()->make(StoreOrderServices::class);
$dataInfo = $orderService->getOrderListByWhere($where, $filed);
if ($dataInfo) {
foreach ($dataInfo as $k => $v) {
$redis->sAdd($redisKey, $v['id']);
}
}
break;
default:
return 0;
break;
}
}
return $redis->sCard($redisKey);
}
/**
* 获取队列redis中存的数据集合
* @param string $redisKey
* @param array $queueInfo
* @return array
*/
public function getQueueRedisdata($queueInfo, string $redisKey = '')
{
if (!$queueInfo) return [$redisKey, []];
if (!$redisKey) {
$redisKey = $queueInfo['execute_key'] ?? '';
}
if (!$redisKey) {
return [$redisKey, []];
}
/** @var CacheService $redis */
$redis = app()->make(CacheService::class);
return [$redisKey, $redis->sMembers($redisKey)];
}
/**
* 批量发送优惠券
* @param $coupon
* @param $type
* @return bool
* @throws \think\db\exception\DataNotFoundException
* @throws \think\db\exception\DbException
* @throws \think\db\exception\ModelNotFoundException
*/
public function sendCoupon($coupon, $type)
{
if (!$type || !$coupon) return false;
$queueInfo = $this->dao->getQueueOne(['type' => $type, 'status' => 0]);
if (!$queueInfo) {
return false;
}
//把队列需要执行的入参数据存起来,以便队列执行失败后接着执行,同时队列状态改为正在执行状态。
$this->dao->setQueueDoing($coupon, $queueInfo['id']);
[$redisKey, $uids] = $this->getQueueRedisdata($queueInfo);
if ($uids) {
$chunkUids = array_chunk($uids, 100, true);
/** @var StoreCouponIssueServices $issueService */
$issueService = app()->make(StoreCouponIssueServices::class);
foreach ($chunkUids as $v) {
$issueService->setCoupon($coupon, $v, $redisKey, $queueInfo);
}
}
//发完后将队列置为完成
$this->setQueueSuccess($queueInfo['id'], $queueInfo['type']);
return true;
}
/**
* 批量设置用户分组
* @param $groupId
* @param $type
* @return bool
* @throws \think\db\exception\DataNotFoundException
* @throws \think\db\exception\DbException
* @throws \think\db\exception\ModelNotFoundException
*/
public function setUserGroup($groupId, $type)
{
if (!$groupId || !$type) return false;
$queueInfo = $this->dao->getQueueOne(['type' => $type, 'status' => 0]);
if (!$queueInfo) {
return false;
}
/** @var UserGroupServices $userGroup */
$userGroup = app()->make(UserGroupServices::class);
if (!$userGroup->getGroup($groupId)) {
return false;
}
//把队列需要执行的入参数据存起来,以便队列执行失败后接着执行,同时队列状态改为正在执行状态。
$this->dao->setQueueDoing($groupId, $queueInfo['id']);
[$redisKey, $uids] = $this->getQueueRedisdata($queueInfo);
if ($uids) {
$chunkUids = array_chunk($uids, 1000, true);
/** @var UserServices $userServices */
$userServices = app()->make(UserServices::class);
foreach ($chunkUids as $v) {
//执行分组
if (!$userServices->setUserGroup($v, $groupId)) {
$this->setQueueFail($queueInfo['id'], $redisKey);
} else {
$this->doSuccessSremRedis($v, $redisKey, $type);
}
}
}
//发完后将队列置为完成
$this->setQueueSuccess($queueInfo['id'], $queueInfo['type']);
return true;
}
/**
* 批量设置用户标签
* @param $labelId
* @param $type
* @return bool
* @throws \think\db\exception\DataNotFoundException
* @throws \think\db\exception\DbException
* @throws \think\db\exception\ModelNotFoundException
*/
public function setUserLabel($labelId, $type, $other = [])
{
if (!$labelId || !$type) return false;
$queueInfo = $this->dao->getQueueOne(['type' => $type, 'status' => 0]);
if (!$queueInfo) {
return false;
}
/** @var UserLabelServices $userLabelServices */
$userLabelServices = app()->make(UserLabelServices::class);
$count = $userLabelServices->getCount([['id', 'IN', $labelId]]);
if ($count != count($labelId)) {
return false;
}
//把队列需要执行的入参数据存起来,以便队列执行失败后接着执行,同时队列状态改为正在执行状态。
$this->dao->setQueueDoing($labelId, $queueInfo['id']);
[$redisKey, $uids] = $this->getQueueRedisdata($queueInfo);
if ($uids) {
$chunkUids = array_chunk($uids, 1000, true);
/** @var UserLabelRelationServices $services */
$services = app()->make(UserLabelRelationServices::class);
$store_id = (int)$other['store_id'] ?? 0;
foreach ($chunkUids as $v) {
if (!$services->setUserLable($v, $labelId, $store_id ? 1 : 0, $store_id)) {
$this->setQueueFail($queueInfo['id'], $redisKey);
} else {
$this->doSuccessSremRedis($v, $redisKey, $type);
}
}
}
//发完后将队列置为完成
$this->setQueueSuccess($queueInfo['id'], $queueInfo['type']);
return true;
}
/**
* 商品批量上下架
* @param string $upORdown
* @param $type
* @return bool
* @throws \think\db\exception\DataNotFoundException
* @throws \think\db\exception\DbException
* @throws \think\db\exception\ModelNotFoundException
*/
public function setProductShow($upORdown = "up", $type = 1)
{
if (!$type) return false;
$queueInfo = $this->dao->getQueueOne(['type' => $type, 'status' => 0]);
if (!$queueInfo) {
return false;
}
//把队列需要执行的入参数据存起来,以便队列执行失败后接着执行,同时队列状态改为正在执行状态。
$this->dao->setQueueDoing($upORdown, $queueInfo['id']);
[$redisKey, $pids] = $this->getQueueRedisdata($queueInfo);
if ($pids) {
$chunkPids = array_chunk($pids, 1000, true);
$isShow = 0;
if ($upORdown == 'up') $isShow = 1;
/** @var StoreProductServices $storeproductServices */
$storeproductServices = app()->make(StoreProductServices::class);
/** @var StoreProductRelationServices $storeProductRelationServices */
$storeProductRelationServices = app()->make(StoreProductRelationServices::class);
/** @var StoreCartServices $cartService */
$cartService = app()->make(StoreCartServices::class);
$update = ['is_show' => $isShow];
if ($isShow) {//手动上架 清空定时下架状态
$update['auto_off_time'] = 0;
}
foreach ($chunkPids as $v) {
//商品
$res = $storeproductServices->batchUpdate($v, $update);
//门店商品
$storeproductServices->batchUpdateAppendWhere($v, $update, ['type' => 1], 'pid');
if ($isShow == 0) {
$storeProductRelationServices->setShow($v, (int)$isShow);
//购物车
$cartService->batchUpdate($v, ['status' => 1], 'product_id');
}
//下架检测是否有参与活动商品
try {
$is_activity = $storeproductServices->checkActivity($v);
} catch (\Throwable $e) {
$is_activity = false;
}
if ($isShow == 0 || $is_activity) {
//改变购物车中状态
$storeProductRelationServices->setShow($v, (int)$isShow);
}
if (!$res) {
$this->setQueueFail($queueInfo['id'], $redisKey);
} else {
$this->doSuccessSremRedis($v, $redisKey, $type);
}
}
}
//发完后将队列置为完成
$this->setQueueSuccess($queueInfo['id'], $queueInfo['type']);
return true;
}
/**
* 批量队列删除商品规格
* @param $type
* @return bool
* @throws \think\db\exception\DataNotFoundException
* @throws \think\db\exception\DbException
* @throws \think\db\exception\ModelNotFoundException
*/
public function delProductRule($type)
{
if (!$type) return false;
$queueInfo = $this->dao->getQueueOne(['type' => $type, 'status' => 0]);
if (!$queueInfo) {
return false;
}
//把队列需要执行的入参数据存起来,以便队列执行失败后接着执行,同时队列状态改为正在执行状态。
$this->dao->setQueueDoing('', $queueInfo['id']);
[$redisKey, $pids] = $this->getQueueRedisdata($queueInfo);
if ($pids) {
$chunkPids = array_chunk($pids, 1000, true);
/** @var StoreProductRuleServices $storeProductRuleservices */
$storeProductRuleservices = app()->make(StoreProductRuleServices::class);
foreach ($chunkPids as $v) {
$res = $storeProductRuleservices->del(implode(',', $v));
if ($res) {
$this->doSuccessSremRedis($v, $redisKey, $queueInfo['type']);
} else {
$this->addQueueFail($queueInfo['id'], $redisKey);
}
}
}
//发完后将队列置为完成
$this->setQueueSuccess($queueInfo['id'], $queueInfo['type']);
return true;
}
/**
* 批量队列删除订单
* @param $type
* @return bool
* @throws \think\db\exception\DataNotFoundException
* @throws \think\db\exception\DbException
* @throws \think\db\exception\ModelNotFoundException
*/
public function delOrder($type)
{
if (!$type) return false;
$queueInfo = $this->dao->getQueueOne(['type' => $type, 'status' => 0]);
if (!$queueInfo) {
return false;
}
//把队列需要执行的入参数据存起来,以便队列执行失败后接着执行,同时队列状态改为正在执行状态。
$this->dao->setQueueDoing('', $queueInfo['id']);
[$redisKey, $pids] = $this->getQueueRedisdata($queueInfo);
if ($pids) {
$chunkPids = array_chunk($pids, 1000, true);
/** @var StoreOrderServices $storeOrderServices */
$storeOrderServices = app()->make(StoreOrderServices::class);
foreach ($chunkPids as $v) {
$res = $storeOrderServices->batchUpdateOrder($v, ['is_system_del' => 1]);
if ($res) {
$this->doSuccessSremRedis($v, $redisKey, $type);
} else {
$this->setQueueFail($queueInfo['id'], $redisKey);
}
}
}
//发完后将队列置为完成
$this->setQueueSuccess($queueInfo['id'], $queueInfo['type']);
return true;
}
/**
* 队列批量发货
* @param $oid
* @param array $deliveryData
* @return bool
* @throws \think\db\exception\DataNotFoundException
* @throws \think\db\exception\DbException
* @throws \think\db\exception\ModelNotFoundException
*/
public function orderDelivery($oid, array $deliveryData)
{
if (!$oid) return false;
if (!isset($deliveryData['queueType'])) {
return false;
}
/** @var QueueAuxiliaryServices $auxiliaryService */
$auxiliaryService = app()->make(QueueAuxiliaryServices::class);
//看是否能查到任务数据
$auxiliaryInfo = $auxiliaryService->getOrderCacheOne(['binding_id' => $deliveryData['queueId'], 'relation_id' => $oid, 'type' => $deliveryData['cacheType']]);
if (!$auxiliaryInfo || !$auxiliaryInfo['other']) {
return false;
}
$deliveryInfo = json_decode($auxiliaryInfo['other'], true);
if ($deliveryData['queueType'] == 7) {
if (!$deliveryInfo['delivery_name'] || !$deliveryInfo['delivery_code'] || !$deliveryInfo['delivery_id']) {
return false;
}
$deliveryData['express_record_type'] = 1;
$deliveryData['delivery_name'] = $deliveryInfo['delivery_name'];
$deliveryData['delivery_id'] = $deliveryInfo['delivery_id'];
$deliveryData['delivery_code'] = $deliveryInfo['delivery_code'];
}
try {
/** @var StoreOrderDeliveryServices $storeOrderDelivery */
$storeOrderDelivery = app()->make(StoreOrderDeliveryServices::class);
//发货
$storeOrderDelivery->delivery($oid, $deliveryData);
} catch (\Throwable $e) {
Log::error('队列发货失败发货order_id' . $oid . ',原因:' . $e->getMessage());
}
//更改队列子集数据
$this->doSuccessSremRedis(['order_id' => $oid], $deliveryData['queueId'], $deliveryData['queueType'], ['phone_message' => 1, 'status' => 1]);
//队列置为完成
return $this->setQueueSuccess($deliveryData['queueId'], $deliveryData['queueType']);
}
/**
* 添加任务前校验同类型任务状态
* @param $type
* @param array $queueInfo
* @param false $is_again
* @return bool
* @throws \think\db\exception\DataNotFoundException
* @throws \think\db\exception\DbException
* @throws \think\db\exception\ModelNotFoundException
*/
public function checkTypeQueue($type, array $queueInfo = [], bool $is_again = false)
{
if (!$type) return false;
if (!$queueInfo) {
$queueInfo = $this->dao->getQueueOne(['type' => $type, 'status' => [0, 1]]);
}
if (!$queueInfo) {
return false;
}
$num = 0;
if (in_array($type, [7, 8, 9, 10])) {
/** @var QueueAuxiliaryServices $auxiliaryService */
$auxiliaryService = app()->make(QueueAuxiliaryServices::class);
$num = $auxiliaryService->count(['binding_id' => $queueInfo['id'], 'status' => 0]);
} else {
if ($queueInfo['execute_key']) {
/** @var CacheService $redis */
$redis = app()->make(CacheService::class);
$num = $redis->sCard($queueInfo['execute_key']);
}
}
if ($num) {
if (!$is_again) {
if ($queueInfo['status'] == 0) {
throw new AdminException('上次批量任务尚未执行,请前往任务列表手动执行');
}
if ($queueInfo['status'] == 1) {
throw new AdminException('有正在执行中的任务,请耐心等待,若长时间无反应,前往任务列表修复异常数据,再手动执行');
}
}
} else {
$this->delWrongQueue(0, $type);
return false;
}
return true;
}
/**
* 修复异常任务
* @param $queueInfo
* @return bool|mixed
*/
public function repairWrongQueue($queueInfo)
{
if (!$queueInfo) throw new AdminException('任务不存在');
try {
switch ($queueInfo['type']) {
case 1://批量发放优惠券
case 2://批量设置用户分组
case 3://批量设置用户标签
case 4://批量上下架商品
case 5://批量删除商品规格
case 6://批量删除用户已删除订单
if (!$queueInfo['execute_key']) {
throw new AdminException('缓存key缺失请清除数据');
}
/** @var CacheService $redis */
$redis = app()->make(CacheService::class);
$cacheNum = $redis->sCard($queueInfo['execute_key']);
if ($cacheNum != $queueInfo['surplus_num']) {
return $this->dao->update(['id' => $queueInfo['id']], ['surplus_num' => $cacheNum]);
}
break;
case 7://手动发货
case 8://电子面单发货
case 9://批量配送
case 10://批量虚拟发货
/** @var QueueAuxiliaryServices $auxiliaryService */
$auxiliaryService = app()->make(QueueAuxiliaryServices::class);
$cacheType = $this->queue_redis_key[$queueInfo['type']] ?? '';
$cacheFailAndNoNum = $auxiliaryService->getCountOrder(['binding_id' => $queueInfo['id'], 'type' => $cacheType, 'status' => [0, 2]]);
$cacheTotalNum = $auxiliaryService->getCountOrder(['binding_id' => $queueInfo['id'], 'type' => $cacheType, 'status' => [0, 1, 2]]);
//如果任务已经执行完毕,但是记录却存在未执行数据,要进行修复,让其重新执行
if ($cacheFailAndNoNum && $queueInfo['status'] == 2) return $this->dao->update(['id' => $queueInfo['id']], ['status' => 3, 'surplus_num' => $cacheFailAndNoNum, 'total_num' => $cacheTotalNum]);
//如果执行失败,记录全部执行成功,那么进行修复
if (!$cacheFailAndNoNum && $queueInfo['status'] != 2) return $this->dao->update(['id' => $queueInfo['id']], ['status' => 2, 'surplus_num' => 0, 'total_num' => $cacheTotalNum]);
}
return true;
} catch (\Exception $e) {
throw new AdminException($e->getMessage());
}
}
/**
* 队列再次执行
* @param $queueId
* @param $type
* @return bool
* @throws \think\db\exception\DataNotFoundException
* @throws \think\db\exception\DbException
* @throws \think\db\exception\ModelNotFoundException
*/
public function againDoQueue($queueId, $type)
{
$queueInfo = $this->getQueueOne(['id' => $queueId, 'type' => $type]);
if (!$queueInfo) {
throw new AdminException('队列任务不存在');
}
if (!$queueInfo['queue_in_value']) {
throw new AdminException('队列关键数据缺失,请清除此任务及异常数据');
}
if ($queueInfo['status'] == 2) {
throw new AdminException('队列已完成');
}
if ($queueInfo['status'] == 3) {
throw new AdminException('队列异常,请清除队列重新加入');
}
if ($queueInfo['status'] == 4) {
throw new AdminException('队列已删除');
}
//检测当前队列
if (!$this->checkTypeQueue($type, $queueInfo, true)) {
throw new AdminException('任务已清除,无需再次执行');
}
//先进行数据修复
$this->repairWrongQueue($queueInfo);
if (in_array($type, [7, 8, 9, 10])) {
$queueInValue = json_decode($queueInfo['queue_in_value'], true);
/** @var StoreOrderServices $storeOrderService */
$storeOrderService = app()->make(StoreOrderServices::class);
$storeOrderService->adminQueueOrderDo($queueInValue, true);
} else {
$queueInValue = $queueInfo['queue_in_value'];
if ($type == 1) {
$queueInValue = json_decode($queueInfo['queue_in_value'], true);
}
//加入队列
BatchHandleJob::dispatch([$queueInValue, $type]);
}
return true;
}
/**
* 任务执行失败,修改队列状态
* @param $queueId
* @param string $redisKey
* @return mixed
*/
public function setQueueFail($queueId, $redisKey = '')
{
if ($redisKey) {
/** @var CacheService $cacheService */
$cacheService = app()->make(CacheService::class);
$surplusNum = $cacheService->sCard($redisKey);
} else {
/** @var QueueAuxiliaryServices $auxiliaryService */
$auxiliaryService = app()->make(QueueAuxiliaryServices::class);
$surplusNum = $auxiliaryService->count(['binding_id' => $queueId, 'status' => 0]);
}
return $this->dao->update(['id' => $queueId], ['status' => 3, 'surplus_num' => $surplusNum]);
}
/**
* 将执行成功数据移除redis集合
* @param array $data
* @param $redisKey
* @return bool
*/
public function doSuccessSremRedis(array $data, $redisKey, $type, array $otherData = [])
{
if (!$data || !$redisKey || !$type) return true;
if (in_array($type, [7, 8, 9, 10])) {
$where['relation_id'] = $data['order_id'];
$where['binding_id'] = $redisKey;
/** @var QueueAuxiliaryServices $auxiliaryService */
$auxiliaryService = app()->make(QueueAuxiliaryServices::class);
$getOne = $auxiliaryService->getOrderCacheOne($where);
if (!$getOne) return false;
$other = json_decode($getOne['other'], true);
if (isset($otherData['delivery_status'])) $other['delivery_status'] = $otherData['delivery_status'];
if (isset($otherData['wx_message'])) $other['wx_message'] = $otherData['wx_message'];
if (isset($otherData['phone_message'])) $other['phone_message'] = $otherData['phone_message'];
if (isset($otherData['error_info'])) $other['error_info'] = $otherData['error_info'];
$updateData['status'] = isset($otherData['status']) ? $otherData['status'] : 0;
$updateData['other'] = json_encode($other);
$updateData['update_time'] = time();
$auxiliaryService->updateOrderStatus($where, $updateData);
} else {//在redis缓存集合的从集合删除
/** @var CacheService $redis */
$redis = app()->make(CacheService::class);
foreach ($data as $k => $v) {
$redis->sRem($redisKey, $v);
}
}
return true;
}
/**
* 任务执行成功
* @param $queueId
* @param $type
* @return bool|mixed
* @throws \think\db\exception\DataNotFoundException
* @throws \think\db\exception\DbException
* @throws \think\db\exception\ModelNotFoundException
*/
public function setQueueSuccess($queueId, $type)
{
if (!$queueId || !$type) return false;
$queueInfo = $this->dao->get($queueId);
if (!$queueInfo) return false;
$res = true;
if (in_array($type, [7, 8, 9, 10])) {
$res = false;
if ($queueInfo['surplus_num'] > 0) {
$this->dao->bcDec($queueId, 'surplus_num', 1);
}
//看是否全部执行成功
$queueInfo = $this->dao->get($queueId);
if ($queueInfo['surplus_num'] == 0) {
$res = true;
}
}
if ($res) {
$update = [
'status' => 2,
'finish_time' => time(),
'surplus_num' => 0
];
return $this->dao->update(['id' => $queueId], $update);
}
return true;
}
/**
* 清除异常队列
* @param $queueId
* @param $type
* @return bool
* @throws \think\db\exception\DataNotFoundException
* @throws \think\db\exception\DbException
* @throws \think\db\exception\ModelNotFoundException
*/
public function delWrongQueue($queueId, $type, $is_del = true)
{
if (!$type) return false;
if ($queueId) {
$queueInfo = $this->dao->getQueueOne(['id' => $queueId, 'type' => $type]);
} else {
$queueInfo = $this->dao->getQueueOne(['type' => $type]);
}
if (!$queueInfo) {
return true;
}
try {
$data = ['status' => 3];
if ($is_del) {
if (in_array($type, [7, 8, 9, 10])) {
/** @var QueueAuxiliaryServices $auxiliaryService */
$auxiliaryService = app()->make(QueueAuxiliaryServices::class);
$auxiliaryService->batchUpdate(['binding_id' => $queueInfo['id']], ['status' => 3]);
} else {
if ($queueInfo['execute_key']) {
/** @var CacheService $redis */
$redis = app()->make(CacheService::class);
$redis->del($queueInfo['execute_key']);
}
}
$data = ['is_del' => 1, 'status' => 4];
}
$this->dao->update(['id' => $queueInfo['id']], $data);
} catch (\Throwable $e) {
Log::error('清除异常队列失败,原因' . $e->getMessage());
}
return true;
}
}