Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batched Unique Jobs Can Be Dispatched With The Same UniqueId Values #48882

Closed
deanPerkins opened this issue Nov 1, 2023 · 3 comments
Closed

Comments

@deanPerkins
Copy link

Laravel Version

9.19

PHP Version

8.0.2

Database Driver & Version

MariaDB 10.5.19

Description

I've come across this interesting scenario that may potentially be a bug, we have a ShouldBeUnique Job that is kicked off in two scenarios:

  1. From an endpoint directly, into a 'high-priority' queue.
  2. From a job, into a 'low-priority' queue.

This job uses the interface via the uniqueId() function outlined in the documentation and works great in the first scenario, spamming that endpoint with the same "unique" value we've determined does not allow multiples of that same job to be put into queue. As is intended.

The second scenario however has brought up the issue where that "batched unique jobs" seemingly are ignoring the uniqueId() returned value?

This was noticed when a job with the uniqueId() value of '1' was dispatched from the batched jobs and then from the endpoint, resulting in two "Unique Jobs" with the same uniqueId() being processed.

Messing with it I've been unable to determine why this is the case until I had the thought of just putting multiple "unique jobs" with the same uniqueId() into a single batch. Every job ends up being queued and processed concurrently even with the same uniqueId() returned value.

I'm able to confirm this in Horizon showing the same batch id for each job, the same property that is used in the uniqueId() function and different job ids for each job. Below are two of the jobs with the same value, I've cut off type names for privacy:
image
image

Here's the "slightly edited for privacy" code that performed the above problem.

Class that is the job that performs the "bulk batch" dispatching causing the problem:

class BatchedUserJobsJob implements ShouldQueue, ShouldBeUnique
{
    use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

     * The number of seconds the job can run before timing out.
     *
     * @var int
     */
    public $timeout = 1200;

    /**
     * Indicate if the job should be marked as failed on timeout.
     *
     * @var bool
     */
    public $failOnTimeout = true;

    /**
     * The requested date for the job.
     *
     * @var DateTime
     */
    public $requestedDate;

    /**
     * Create a new job instance.
     *
     * @return void
     */
    public function __construct(DateTime $requestedDate)
    {
        $this->requestedDate = $requestedDate;
    }

    /**
     * @return void
     */
    public function handle()
    {
        Log::debug("handle: Starting BatchedJobs... ");

        // Grab the user ids (the triple 7's should actually only result in one job).
        $userIds = [1,4,7,7,7];

        // Create a job for each of the user ids.
        $userJobs = [];
        foreach ($userIds as $userId) {
            $userJobs[] = new UniqueUserJob($userId);
        }
        Log::debug("handle: UserJob's created... ", ['requestedDate' => $this->requestedDate, 'jobCount' => count($userJobs)]);

        // Use the batching library to chain the report job for after the jobs have successfully run. If a error occurs we will not run the final report.
        $requestedDate = $this->requestedDate;
        $batch = Bus::batch($userJobs)
            ->onQueue('low-priority')
            ->then(function (Batch $batch) use ($requestedDate) {
                Log::debug("handle: All jobs completed successfully... ", ['batch' => $batch]);

                // Success means we'll dispatch the report job to run on it's own.
                FinalReportJob::dispatch($requestedDate)->onQueue('low-priority');
            })->finally(function (Batch $batch) {
                Log::debug("handle: The batch has finished executing... ", ['batch' => $batch]);
            })->allowFailures()->dispatch();

        Log::debug("handle: After dispatch() called... ", [$batch]);
    }

    /**
     * The unique ID of the job.
     *
     * @return string
     */
    public function uniqueId()
    {
        return $this->requestedDate->format('Y-m-d');
    }

    /**
     * Handle a job failure.
     *
     * @param  \Exception  $exception
     * @return void
     */
    public function failed(Exception $exception)
    {
        // If something fails we'll still attempt to set the processing flag back to zero just in case
        Log::error($exception);
        $this->fail($exception);
    }
}

Controller Endpoint With Single Function that performs the one off dispatch:

class UserRefreshController extends Controller
{
    public function refresh_user_values($id, UserServiceInterface $userService)
    {
        try {
            // Dispatch job on the high-priority queue.
            UniqueUserJob::dispatch($id, $userService)->onQueue('high-priority');
        } catch (Exception $ex) {
            return response()->json([
                "success" => false,
                "job_status" => "FailedDispatch",
                "message" => $ex->getMessage()
            ])->setEncodingOptions(JSON_NUMERIC_CHECK);
        }

        // Return a response that the job successfully dispatched.
        return response()->json([
            "success" => true,
            "job_status" => "SuccessfulDispatch",
            "message" => "MemberCollectionRefreshValuesJob Successfully Dispatched"
        ])->setEncodingOptions(JSON_NUMERIC_CHECK);
    }
}

Unique Job Class code:

class UniqueUserJob implements ShouldQueue, ShouldBeUnique
{
    use Batchable, Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    /**
     * The number of seconds the job can run before timing out.
     *
     * @var int
     */
    public $timeout = 1500;

    /**
     * Indicate if the job should be marked as failed on timeout.
     *
     * @var bool
     */
    public $failOnTimeout = true;

    /**
     * The user id.
     *
     * @var int
     */
    public $userId;

    /**
     * Create a new job instance.
     *
     * @return void
     */
    public function __construct(int $userId)
    {
        $this->userId = $userId;

        Log::debug("__construct: Job has been created successfully... ", ['userId' => $userId]);
    }

    /**
     * @return void
     */
    public function handle(UserServiceInterface $userService)
    {
        try {
            if ($this->batch() && $this->batch()->cancelled()) {
                Log::error("Issue with Batch Processing", ['batch' => json_encode($this->batch())]);
                $this->fail("A batching issue has occurred.");
            }
            
            $userService->RefreshUsersValue($this->userId);
        } catch (Exception $exception) {
            $this->fail($exception);
        }
    }

    /**
     * Handle a job failure.
     *
     * @param  \Exception  $exception
     * @return void
     */
    public function failed(Exception $exception)
    {
        // Capture if an exception is thrown.
        $this->fail($exception);
    }

    /**
     * The unique ID of the job.
     *
     * @return string
     */
    public function uniqueId()
    {
        Log::debug("uniqueId() ... ", ['$this->userId' => $this->userId, 'strval($this->userId)' => strval($this->userId)]);
        return strval($this->userId);
    }
}

Sorry to be reporting this outside of the bug fix window for this version of Laravel. Again I'm uncertain if this is a bug or intended behavior, we just need insight on it.

Steps To Reproduce

Unfortunately I'm unable to share the code directly with a branch of a github repository for ease of replication.

In general I'd assume that quickly copying my code from the description and using both avenues in however you see fit would replicate the problem.

If not please let me know what I can provide to further assist.

@crynobone
Copy link
Member

Hey there,

Unfortunately we don't support this version of the library anymore. Please check out our support policy on which versions we are currently supporting. Can you please try to upgrade to the latest version and see if your problem persists? If so, please open up a new issue and we'll help you out.

Thanks!

@dubocr
Copy link

dubocr commented Feb 2, 2024

Issue is always present in latest version (10.8).

When dispatching 2 batches with same job in each, both jobs are executed concurently.

class ShouldBeUniqueJob implements ShouldQueue, ShouldBeUnique
{
    use Batchable, Dispatchable, InteractsWithQueue, Queueable, SerializesModels;

    /**
     * The number of seconds the job can run before timing out.
     *
     * @var int
     */
    public $timeout = 30;

    /**
     * Execute the job.
     */
    public function handle(FsvService $fsv): void
    {
        sleep(20);
    }
}
$batch1 = Bus::batch([
            new ShouldBeUniqueJob,
        ])->dispatch();

$batch2 = Bus::batch([
            new ShouldBeUniqueJob,
        ])->dispatch();

ShouldBeUniqueJob is executed twice and concurently :

Worker 1:
2024-02-02 12:57:07 App\Jobs\ShouldBeUniqueJob ................................................................................. RUNNING
2024-02-02 12:57:27 App\Jobs\ShouldBeUniqueJob ............................................................................... 20 s DONE

Worker 2:
2024-02-02 12:57:09 App\Jobs\ShouldBeUniqueJob ................................................................................ RUNNING
2024-02-02 12:57:29 App\Jobs\ShouldBeUniqueJob ............................................................................... 20 s DONE

@ahoiroman
Copy link

I am facing this behavior with Bus::chain, too.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants