我有一系列批次,需要在一个链中运行(按正确的顺序),并且我正在设法找到一种方法将它们连接回一起,并最终通知用户一切都完成了。
我目前使用的方法如下:
Bus::batch(new \App\Jobs\TestBatchJob())
->allowFailures()
->name('test-batch-1')
->then(function(\Illuminate\Bus\Batch $batch) {
Bus::batch(new \App\Jobs\TestBatchJob2())
->allowFailures()
->name('test-batch-2')
->then(function (\Illuminate\Bus\Batch $batch){
Bus::batch(new \App\Jobs\TestBatchJob2())
->allowFailures()
->name('test-batch-3')
->then(function (\Illuminate\Bus\Batch $batch){
Bus::batch(new \App\Jobs\TestBatchJob2())
->allowFailures()
->name('test-batch-4')
->then(function (\Illuminate\Bus\Batch $batch){
// Batch Chain Finishes Here.
\Illuminate\Support\Facades\Auth::user()->notify(...);
})
->dispatch();
})
->dispatch();
})
->dispatch();
})
->dispatch();这没什么..。但主要的缺点是,它会产生多个批次。

我不能再与批处理交互,即$batch->cancel(),而前面的任何进度指示器都不再起作用。
我需要确保第一批工作在开始第二批之前就完成了,但我想以某种方式将所有的批次联系在一起。像这样的事情(不起作用)也许能解释我的想法。
Bus::batch(new \App\Jobs\TestBatchJob())
->allowFailures()
->name('test-batch-1')
->then(function(\Illuminate\Bus\Batch $batch){
$batch->add(new \App\Jobs\TestBatchJob2());
})
->dispatch();更新1-总线::Chain()
感谢@boolfalse在这里的想法。我试了几样东西。
基本链-不工作
您可以在链中创建批处理,但它们仍然并行运行。总线链方法(带有批处理)只有在一组链式作业的末尾有一个批处理时才有用。
注意,链只在创建批处理时捕获失败,而不是在批处理中运行作业。
Bus::chain([
function(){
Bus::batch(new TestBatchJob())->name('batch-1')->dispatch();
},
function(){
Bus::batch(new TestBatchJob())->name('batch-2')->dispatch();
},
function(){
Bus::batch(new TestBatchJob())->name('batch-3')->dispatch();
},
])->dispatch();

嵌套批次链
使用嵌套批处理的链的一个用例是允许我们在运行批处理之前运行一些正常作业,然后处理处理过的数据。
要在链中运行批处理,必须在链上使用then方法。
$model = Model::create();
Bus::chain([
new TestBatchJob($model), // Run first on its own
function() use($model){
$model->doSomething(); // Amend the model prior to next batch
Bus::batch(new TestBatchJob($model)) // Run second on its own, using up-to-date model.
->name('batch-2')
->then(function(){
Bus::batch(new TestBatchJob())->name('batch-3')->dispatch(); // run third on its own
})
->dispatch();
}
])->dispatch();更新2-数据库解决方案
现在,我将回到嵌套then()的原始解决方案。为了将每个子作业链接回父作业(我们可以在UI上显示它并作为一个子程序进行跟踪),我将在数据库job_batches表中添加一个列来链接子批。
$batch = Bus::batch(new \App\Jobs\TestBatchJob())
->name('test-batch-1')
->then(function(Batch $parentBatch) {
$parent = $parentBatch->id;
$child = Bus::batch(new \App\Jobs\TestBatchJob2())
->name('test-batch-2')
->then(function (Batch $batch) use($parent){
$child = Bus::batch(new \App\Jobs\TestBatchJob2())
->name('test-batch-3')
->then(function(Batch $batch) use($parent){
$child = Bus::batch(new \App\Jobs\TestBatchJob2())
->name('test-batch-3')
->dispatch();
JobBatch::findByUuid($parent)->addChild($child);
})
->dispatch();
JobBatch::findByUuid($parent)->addChild($child);
})
->dispatch();
JobBatch::findByUuid($parent)->addChild($child);
})
->dispatch();
return JobBatch::query()->find($batch->id);发布于 2022-02-21 07:49:02
我将在这里分享我的代码片段,因此我认为它将是有用的:
<?php
// LARAVEL 8 QUEUES
// Official Docs: https://laravel.com/docs/8.x/queues
// Author: https://github.com/boolfalse
// include Laravel8-specific classes
use Illuminate\Bus\Batch;
use Illuminate\Support\Facades\Bus;
use \Throwable;
// include custom classes
use \App\Jobs\FirstBatchJob;
use \App\Jobs\SecondBatchJob;
// defining class with body
// ...
// method for adding Batch Jobs
public function runQueueJobs()
{
// some initial data
$data = [
'first_argument' => 111,
'second_argument' => 222,
];
// chaining synchronous batches
$batch = Bus::chain([
new FirstBatchJob($data['first_argument']), // common job in app/Jobs/FirstBatchJob.php
$second_batch = function () use ($data) {
$batch_unique_name = $this->getBatchName($data['second_argument']);
// CHUNKS USAGE EXAMPLE
// get ready for starting concurrent job instances
$chunk_intervals = [];
for ($i = 0; $i < 100; $i++) {
// common job in app/Jobs/SecondBatchJob.php
$chunk_intervals[] = new SecondBatchJob($data['second_argument'], [$i * 1000, ($i + 1) * 1000 - 1]);
}
// asynchronous/concurrent batch jobs
Bus::batch($chunk_intervals)
->then(function (Batch $concurrent_batch) use ($data) {
info("SecondBatchJob jobs have done.");
})
->catch(function (Batch $concurrent_batch, Throwable $e) use ($data) {
info("SecondBatchJob error! - " . $e->getMessage());
})
->finally(function (Batch $concurrent_batch) use ($data) {
info("SecondBatchJob batch has finished executing.");
})
->name($batch_unique_name)
->dispatch();
},
])->dispatch();
// response back to the client
return response()->json([
'success' => true,
'message' => "Batch Jobs successfully run.",
]);
}
// method for getting (a)synchronous/concurrent batch job(s) info
public function batchInfo(BatchInfoRequest $request)
{
// getting "batch_unique_name" from client
$batch_unique_name = $request->get('batch_unique_name');
// getting appropriate "batch_id" from somewhere, where we've stored that "batch_id"<->"batch_unique_name" relation
$batch_id = $this->getBatchIdFromUniqueName($batch_unique_name);
// getting appropriate batch_id
$batch = Bus::findBatch($batch_id);
// response back to the client
if ($batch) {
return response()->json($batch);
} else {
return response()->json([
'success' => false,
'message' => "Batch not found!",
]);
}
}下面是顺序(同步)和并发(异步)队列作业的示例。如您所见,总线中有一个数组::chain(. ),因此有顺序(有序)作业。因此,在数组的第二部分中,有一个并发作业使用块(在我的代码中,我有一些大数据,需要分离到块),它作为异步(并发)工作。
最后,您可以看到一个方法,它返回有关当前特定批处理作业的信息。
的。
https://stackoverflow.com/questions/71194147
复制相似问题