[PhalApi实战篇(1)]Redis队列处理异步任务 前言 先在这里感谢phalapi框架创始人@dogstar,为我们提供了这样一个优秀的开源框架. 哈喽大家好呀! 队列处理异步任务 大家希望喵咪在PhalApi实战推出一些什么样的内容? ,但是需要使用它们的成本对相对高一些需要搭建很多复杂的组件,但是相对redis,redis虽然没有那么多丰富的功能工具但是它也是队列软件中的不二之选 2.理解Redis处理队列特点 对于PHP来说对Redis $msg ){ break; } // 处理逻辑 ..... } 然后通过crontab进行定时任务即可 用法二 第二种用法是通过redis队列的另外一种机制来解决这类问题,相对于 如果阻塞时间设置的是5秒等待了2秒有消息进来了就里面会进入处理模式 上述方式可以使用Supervisor进行常驻内存执行 总结 本次实战篇为大家讲述了怎么使用Redis来处理队列来处理异步任务,以及队列有什么特点为什么使用
Redis实现任务队列 1.任务队列 松耦合性 生产者和消费者无需知道彼此的实现细节,只需要约定好任务的描述格式,这使得生产者和消费者可以由不同的团队使用不同的编程语言编写。 2.Redis实现任务队列 redis中实现任务队列我们可以通过List中的LPUSH和RPOP命令来实现。 ,但是还有点不完善,当任务队列中没有任务时消费者每秒都会调用RPOP命令查看是否有新任务,我们想要实现的是如果有新的任务添加进来我们能够立马知道,这时可以使用BRPOP命令来实现,BRPOP命令的作用和 打开两个redis-cli实例测试如下: 127.0.0.1:6379> brpop queue 0 进入等待状态。 实际环境中我们可能需要监听多个任务队列,有些队列的优先级比较高,需要优先执行,面对这种情况怎么办呢?
而在众多的技术选型中,Redis 凭借其高性能和简单易用性,成为了任务队列的理想选择。 我们通过 Redis 的 LPUSH 和 RPOP 操作来实现一个简单的异步队列。LPUSH 用于将任务添加到队列的左侧,而 RPOP 则用于从队列的右侧取出任务。 Redis 提供了有序集合(Sorted Set)的数据结构,非常适合实现延迟队列。我们可以将任务的执行时间作为 Sorted Set 的分数,当任务被取出时,只处理那些分数小于当前时间的任务。 Redis 会根据这个时间戳来排序任务,确保任务在正确的时间被取出。 而在消费者测试中,我们循环检查队列,只有当任务的时间戳小于当前时间时,才会取出任务并执行。4. 总结通过本文的讲解,我们从 Redis 的基础连接开始,逐步构建了异步队列和延迟队列的实现。
简介 顾名思义,延迟队列就是进入该队列的消息会被延迟消费的队列。而一般的队列,消息一旦入队了之后就会被消费者马上消费。 延时任务和定时任务区别 延时任务有别于定时任务,定时任务往往是固定周期的,有明确的触发时间。 延时队列的实现 选用了基于Redis的有序集合Sorted Set和Crontab短轮询进行实现。 php /** * @desc Redis 延迟任务队列 * @author Tinywan(ShaoBo Wan) * @date 2024/05/02 11:36 */ declare( 消费延迟队列消息后(zset结构中扫描到期的消息),不及时消费 把读取的消息放入一个 redis stream 队列,同时加入消费组 通过消费组消费 redis stream 消费,处理业务逻辑 Redis
import time from queue import Queue # 用于创建队列任务 import threading # 多线程 import redis # redis 模块 q=Queue () # 队列列表 red=redis.ConnectionPool(host="localhost",port=6379) # redis 连接池 r=redis.Redis(connection_pool =red) # redis 连接池实例 def gup(): # 生产者,产生任务丢到远程redis 维护一个队列,用于替代queue for x in range(100): th.join() if __name__ == '__main__': gup() time.sleep(1) ma() 代码很少,原理也挺简单的, 先用一个方法,生成任务 通过redis 来代替本地的queue队列,实现分布式,实现不复杂, 配合requests就可以实现自己diy的分布式爬虫。
我们创建两个队列,一个专门用于存储邮件任务队列和图像处理,一个用来存储文件上传任务队列。 @celery_app.task def my_task7(): print("my_task7任务正在执行....") 通过配置,将send_email和upload_file任务发送到queue1队列中,将image_process发送到queue2队列中。 通过apply_aynsc()方法动态划分任务至队列中 可以通过**apply_aynsc()**方法来设置任务发送到那个队列中 In [6]: my_task1.apply_async(args=(10,20 worker的运行日志,如下: 可以从日志中看出,两个队列的任务该worker都可以执行。
一、配置文件 首先我们需要在配置文件中配置默认队列驱动为Redis,队列配置文件是config/queue.php: return [ 'default' => env('QUEUE_DRIVER ,这里我们将其值改为redis(实际上是修改.env中的QUEUE_DRIVER)。 二、编写队列任务 首先我们通过如下Artisan命令创建任务类: php artisan make:job SendReminderEmail 运行成功后会在app/Jobs目录下生成一个SendReminderEmail.php message) use ($user){ $message->to($user->email)->subject('新功能发布'); }); } } 三、推送队列任务 在浏览器中访问http://laravel.app:8000/mail/sendReminderEmail/1,此时任务被推送到Redis队列中,我们还需要在命令行中运行Artisan命令执行队列中的任务
Redis类: <? config$name); }catch (Exception $exception){ self::$$name = false; } return self::$$name; } }; 定时任务 /domain_order.log", time()); try { //防止长时间无任务导致MySQL超时 $db->query("select 1"); //出列 $order_info $order_info['id'], json_encode($order_info)); } Redis常用队列方法: //队列第一个 =>出列 $Redis->lPop($key); //入到 =>队列最后 $Redis->rPush($key); //队列最后一个 =>出列 $Redis->rPop($key); //入到 =>队列第一个 $Redis->rPop($key
首先是配置类 分为Redis配置类和Jackson配置类,主要是用于收发消息时序列化 Jackson的 package com.ruben.config; import java.text.SimpleDateFormat jackson2JsonRedisSerializer.setObjectMapper(objectMapper); return jackson2JsonRedisSerializer; } } redis ; import org.springframework.data.redis.listener.ChannelTopic; import org.springframework.data.redis.listener.RedisMessageListenerContainer ; import org.springframework.data.redis.listener.adapter.MessageListenerAdapter; import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer private ObjectMapper objectMapper; /** * Callback for processing received objects through Redis
下面讲一个新手容易犯的错误,在这个示例中把队列的入队、出队和Redis存储节点的主从关系给混淆了,示例如下 存储: Redis主节点M, 使用数据List类型做为队列,列表名称M (标记为M.L,意为主节点上的 image.png 上图显然不是我们想要的结果,这种设计导致的问题是Redis主节点使用的内存会不断增长直至触发Redis的LRU策略导致数据丢失或者无法入队。 Redis的从节点S仅仅是做为主节点M的一个备份节点存在的。 ? 可靠队列 在Redis的列表(List)实现的队列中,一般一个客户端通过LPUSH命令将消息放入队列中,而另一个客户端通过RPOP/BRPOP 命令有顺序的取出队列中的消息进行消费。 使用Redis数据结构实现的方式是按照优先权建队列(列表),相同优先权的元素在同一个队列中,客户端在使用BRPOP/RPOP命令使队列中的元素出队的时候参数按照优先权从高到低的顺序进行。
延迟队列,顾名思义它是一种带有延迟功能的消息队列。那么,是在什么场景下我才需要这样的队列呢? 1. 使用Redis的zset、list的特性,我们可以利用redis来实现一个延迟队列RedisDelayQueue 2. 设计方案 设计主要包含以下几点 将整个Redis当做消息池,以KV形式存储消息 使用ZSET做优先队列,按照Score维持优先级 使用LIST结构,以先进先出的方式消费 搬运线程会去ZING:DELAY_QUEUE:BUCKET中查找哪些执行时间戳的RunTimeMillis比现在的时间小,将这些记录全部删除;同时会解析出每个任务的Topic是什么,然后将这些任务PUSH 3.4 设计要点 3.4.1 基本概念 JOB:需要异步处理的任务,是延迟队列里的基本单元 Topic:一组相同类型Job的集合(队列)。
lprm命令的英文全称是“Remove jobs from the print queue”,意为用于删除打印队列中的打印任务。 语法格式:lprm [参数] [任务编号] 常用参数: -E 与打印服务器连接时强制使用加密 -P 指定接受打印任务的目标打印机 -U 指定可选的用户名 参考实例 将打印机hpprint中的第102号任务移除 : [root@linuxcool ~]# lprm -Phpprint 102 将第101号任务由预设打印机中移除: [root@linuxcool ~]# lprm 101
def func_a(a, b): return a + b def func_b(): pass def func_c(a, b, c): return a, b, c 异步任务队列 callback, 'args': args, 'kwargs': kwargs }) def _task_queue_consumer(): """ 异步任务队列消费者
def func_a(a, b): return a + b def func_b(): pass def func_c(a, b, c): return a, b, c 异步任务队列 callback, 'args': args, 'kwargs': kwargs }) def _task_queue_consumer(): """ 异步任务队列消费者
使用redis做任何事情都是基于redis提供的数据结构,那么消息队列有哪几种类型?之前rabbitmq咋说有简单的队列、优先级队列、延迟队列等等。但是那时候咋也没说栈这东西。 那么redis如何做这些事,根据之前的学习。肯定使用list了。 Redis队列(先进先出) 队列中我们说redis提供了很多操作队列的方法。可以从左边添加、右边添加、左边获取、右边获取等等等。 所以说有了这些方法,用redis做个简单的队列简直是太容易了。比如我们要做一个先进先出的队列。 Redis优先级队列(按优先级高低进行排序) 我们的任务发送到redis中,然后任务要具有一定的顺序,这个顺序是优先级。 Riedis做延迟队列(指定时间执行) Redis做延迟队列其实还是用zset去做,我们用当前的时间+需要延迟的时间作为zset的score,然后我们按照score的增序来获取对应的元素,通过判断时间是否小于当前时间然后执行相关的动作
消息队列的特征 消息队列在存取消息时,必须要满足三个需求,分别是 消息保序 处理重复的消息 保证消息可靠性 消息保序 对于 单队列,单进程的queue, 是满足先入先出的特点的,本身是有序的,但是如果有多个队列或多个消费者线程的时候 stream的缺点就是在redis内部,stream就是一个单一的key,如果不对key进行分片,那么stream的容量被限制在单个redis的实例, 当然我们可以使用redis cluster对stream 的key进行分片,实现类似kafka多partition的概念,但是由于redis cluster的一些限制,需要解决redis原生命令不支持跨slot操作的问题, 当然经过合理的设计,这并不是一个很大的问题 XPENDING 命令可以用来查询每个消费组内所有消费者已读取但尚未确认的消息,而 XACK 命令用于向消息队列确认消息处理已完成 在用Redis当作队列或存储数据时,是有可能丢失数据的:一个场景是, 总的来说,Redis不保证严格的数据完整性和主从切换时的一致性。我们在使用Redis时需要注意。 而采用RabbitMQ和Kafka这些专业的队列中间件时,就没有这个问题了。
Redis 作为高性能的内存数据库,具备非常灵活的 Sorted Set(有序集合) 数据结构,可以很容易地实现延迟队列,满足限时任务的需求。 在本篇文章中,我们将介绍如何通过 Redis 和 Spring Boot 3 来实现 限时任务(也称为延迟任务或延迟队列),让你能够轻松管理任务的延时执行。 1. 延迟队列的任务存储和处理 接下来,我们通过 Redis 的 Sorted Set 来存储任务,并定时检查任务是否到期。 持久化 延迟队列可以与持久化存储结合起来,确保任务在 Redis 失败或重启时不会丢失。可以使用 Redis 持久化功能或将任务信息存储在数据库中。 4.3. Redis 的高性能和有序集合特性为我们提供了实现延迟队列的基础,而 Spring Boot 的定时任务调度则帮助我们定期处理这些任务。
只要没有其他JavaScript在执行中间,微任务队列就会在回调之后进行处理,并且在每个任务结束时进行处理。在微任务期间排队的所有其他微任务都将添加到队列的末尾并进行处理。 ECMAScript具有类似于微型任务的“任务”概念,但是除了模糊的邮件列表讨论之外,这种关系并没有明确。但是,普遍的共识是,应将诺言作为微任务队列的一部分,这是有充分理由的。 此规则来自HTML规范,用于调用回调: 如果脚本设置对象堆栈现在为空,请执行微任务检查点 — HTML:在回调步骤3 之后进行清理 …并且微任务检查点涉及遍历微任务队列,除非我们已经在处理微任务队列。 Firefox和Safari正确耗尽了点击侦听器之间的微任务队列,如突变回调所示,但承诺的排队似乎不同。鉴于工作和微任务之间的联系模糊,这是可以原谅的,但我仍然希望它们在侦听器回调之间执行。 使用Edge,我们已经看到它的队列承诺不正确,但是它也无法耗尽点击侦听器之间的微任务队列,相反,它是在调用所有侦听器之后执行的,这mutate在两个click日志之后占单个日志。错误票。
开发环境.NET 7+vue3.0,下面是对应安装和使用教程: 二、Hangfire使用 1、安装nuget包 由于我使用的mysql,对应包为Hangfire.MySqlStorage,大家根据自己的数据库选择安装对应的包 同时,在UseHangfireServer时,我使用了自定义的队列名称,并将同时执行的任务数设置为1,以实现任务队列中的任务唯一,且任务依次执行。 ")] public async Task BackServiceCreateImg(GraphGenerationRequest request) { //...代码逻辑省略 } 3、查询队列等待任务数 JobStorage.Current.GetMonitoringApi() .EnqueuedCount("img-queue");//指定的队列类型的队列等待任务数 三、SignalR使用 后端SignalR使用 由于我使用的.NET 7,微软自带SignalR,我们使用时只需要添加引用 using Microsoft.AspNetCore.SignalR;
欢迎关注微信公众号:数据科学与艺术 作者WX:superhe199 获取ExecutorService队列中的任务数量,可以使用java.util.concurrent.ThreadPoolExecutor 类提供的getQueue()方法获取BlockingQueue对象,然后使用size()方法获取队列中的任务数量。 (int i = 0; i < 10; i++) { executorService.submit(new Task()); } // 获取队列中的任务数量 threadPoolExecutor.getQueue(); int taskCount = queue.size(); System.out.println("队列中的任务数量 } } } 运行上述代码将输出队列中的任务数量。