Source code for decisionengine.framework.util.redis_stats

# SPDX-FileCopyrightText: 2017 Fermi Research Alliance, LLC
# SPDX-License-Identifier: Apache-2.0

import re

import redis

from kombu.transport.redis import Channel


[docs] def redis_stats(broker_url, exchange): r = redis.Redis.from_url(broker_url) queue_to_routing_key = {} # The exchange name is prefixed by a Kombu-provided pattern for member in r.smembers(Channel.keyprefix_queue % exchange): matches = re.fullmatch(b"^(.*?)\x06.*\x16(.*)$", member) queue_to_routing_key[matches[2].decode()] = matches[1].decode() result = [] for queue_name, routing_key in queue_to_routing_key.items(): rtype = r.type(queue_name).decode() if rtype == "none": result.append([routing_key, queue_name, "None"]) elif rtype != "list": result.append([routing_key, queue_name, "Unsupported"]) else: result.append([routing_key, queue_name, f"{r.llen(queue_name)}"]) result.sort() # Sort in alphabetical order according to routing key return result