status = $status; } /** * Execute the job. * * @return void */ public function handle() { // Check if status still exists if (!Status::where('id', $this->status->id)->exists()) { if(config('federation.activitypub.delivery.logger.enabled')) { Log::info('Status ' . $this->status->id . ' was deleted before federation'); } return; } // Skip media check if cloud storage isn't enabled or fast processing is on if (!config_cache('pixelfed.cloud_storage') || config('pixelfed.media_fast_process')) { $this->dispatchFederation(); return; } // Check for media still processing $stillProcessing = Media::whereStatusId($this->status->id) ->whereNull('cdn_url') ->exists(); if ($stillProcessing) { // Get the oldest processing media item $oldestProcessingMedia = Media::whereStatusId($this->status->id) ->whereNull('cdn_url') ->oldest() ->first(); // If media has been processing for more than 10 minutes, proceed anyway if ($oldestProcessingMedia && now()->diffInMinutes($oldestProcessingMedia->created_at) > 10) { if(config('federation.activitypub.delivery.logger.enabled')) { Log::warning('Media processing timeout for status ' . $this->status->id . '. Proceeding with federation.'); } $this->dispatchFederation(); return; } // Release job back to queue with delay of 30 seconds $this->release(30); return; } // All media processed, proceed with federation $this->dispatchFederation(); } /** * Dispatch the federation job * * @return void */ protected function dispatchFederation() { try { StatusEntityLexer::dispatch($this->status); } catch (\Exception $e) { if(config('federation.activitypub.delivery.logger.enabled')) { Log::error('Federation dispatch failed for status ' . $this->status->id . ': ' . $e->getMessage()); } throw $e; } } /** * Handle a job failure. * * @param \Throwable $exception * @return void */ public function failed(\Throwable $exception) { if(config('federation.activitypub.delivery.logger.enabled')) { Log::error('NewStatusPipeline failed for status ' . $this->status->id, [ 'exception' => $exception->getMessage(), 'trace' => $exception->getTraceAsString() ]); } } }