Skip to content

output_queue

output_queue

GlobalQueue

A global queue for storing data.

This class provides methods for putting data into the queue, getting data from the queue, checking the length of the queue, and checking if the queue is empty.

Attributes:

Name Type Description
queue Queue

The underlying queue object.

Source code in redesign_pipeline/flask_utils/output_queue.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
class GlobalQueue:
    """
    A global queue for storing data.

    This class provides methods for putting data into the queue, getting data from the queue,
    checking the length of the queue, and checking if the queue is empty.

    Attributes:
        queue (Queue): The underlying queue object.

    """

    def __init__(self) -> None:
        self.queue = queue.Queue()

    def put_to_queue(self, data):
        """
        Put data into the queue.

        This method puts the provided data into the queue and performs additional operations,
        such as creating an output JSON string, storing the data in a database, and sending
        the data via MQTT.

        Args:
            data (dict): The data to be put into the queue.

        """
        self.queue.put(data)
        data = create_output_json(data)

        db = DB(db="nervotec",collection="estimates")
        db.put_to_db(json.loads(data))
        measure_time.checkpoint("until enter the estimates to the db")
        mqtt_client = Mqtt()
        logger.debug('client connected')
        mqtt_client.send_estimates(data)
        measure_time.checkpoint("until enter the first estimates via mqtt")
        try:
            logger.debug(measure_time.report(from_start=True))
        except:
            pass
        logger.debug('data sent')

    def get_from_queue(self):
        """
        Get data from the queue.

        Returns:
            object: The data retrieved from the queue.

        """
        return self.queue.get()

    def __len__(self):
        """
        Get the length of the queue.

        Returns:
            int: The length of the queue.

        """
        return self.queue.qsize()

    def is_empty(self):
        """
        Check if the queue is empty.

        Returns:
            bool: True if the queue is empty, False otherwise.

        """
        return self.queue.empty()

__len__()

Get the length of the queue.

Returns:

Name Type Description
int

The length of the queue.

Source code in redesign_pipeline/flask_utils/output_queue.py
62
63
64
65
66
67
68
69
70
def __len__(self):
    """
    Get the length of the queue.

    Returns:
        int: The length of the queue.

    """
    return self.queue.qsize()

get_from_queue()

Get data from the queue.

Returns:

Name Type Description
object

The data retrieved from the queue.

Source code in redesign_pipeline/flask_utils/output_queue.py
52
53
54
55
56
57
58
59
60
def get_from_queue(self):
    """
    Get data from the queue.

    Returns:
        object: The data retrieved from the queue.

    """
    return self.queue.get()

is_empty()

Check if the queue is empty.

Returns:

Name Type Description
bool

True if the queue is empty, False otherwise.

Source code in redesign_pipeline/flask_utils/output_queue.py
72
73
74
75
76
77
78
79
80
def is_empty(self):
    """
    Check if the queue is empty.

    Returns:
        bool: True if the queue is empty, False otherwise.

    """
    return self.queue.empty()

put_to_queue(data)

Put data into the queue.

This method puts the provided data into the queue and performs additional operations, such as creating an output JSON string, storing the data in a database, and sending the data via MQTT.

Parameters:

Name Type Description Default
data dict

The data to be put into the queue.

required
Source code in redesign_pipeline/flask_utils/output_queue.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
def put_to_queue(self, data):
    """
    Put data into the queue.

    This method puts the provided data into the queue and performs additional operations,
    such as creating an output JSON string, storing the data in a database, and sending
    the data via MQTT.

    Args:
        data (dict): The data to be put into the queue.

    """
    self.queue.put(data)
    data = create_output_json(data)

    db = DB(db="nervotec",collection="estimates")
    db.put_to_db(json.loads(data))
    measure_time.checkpoint("until enter the estimates to the db")
    mqtt_client = Mqtt()
    logger.debug('client connected')
    mqtt_client.send_estimates(data)
    measure_time.checkpoint("until enter the first estimates via mqtt")
    try:
        logger.debug(measure_time.report(from_start=True))
    except:
        pass
    logger.debug('data sent')

Mqtt

MQTT client for sending data.

This class provides methods for connecting to an MQTT broker and sending data.

Attributes:

Name Type Description
client mqtt.Client

The MQTT client instance.

Source code in redesign_pipeline/flask_utils/output_queue.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
class Mqtt:
    """
    MQTT client for sending data.

    This class provides methods for connecting to an MQTT broker and sending data.

    Attributes:
        client (mqtt.Client): The MQTT client instance.

    """

    def __init__(self):
        """
        Send estimates data via MQTT.

        Args:
            payload (str): The payload data to be sent.

        """
        self.client = mqtt.Client("nervoscan")
        host = "test.mosquitto.org"
        port=1883
        self.client.connect(host=host,port=port)

    def send_estimates(self, payload):
        """
        Send estimates data via MQTT.

        Args:
            payload (str): The payload data to be sent.

        """
        logger.debug(type(payload))
        logger.debug(payload)
        self.client.publish("nervoscan/get_estimates",payload=payload)

__init__()

Send estimates data via MQTT.

Parameters:

Name Type Description Default
payload str

The payload data to be sent.

required
Source code in redesign_pipeline/flask_utils/output_queue.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def __init__(self):
    """
    Send estimates data via MQTT.

    Args:
        payload (str): The payload data to be sent.

    """
    self.client = mqtt.Client("nervoscan")
    host = "test.mosquitto.org"
    port=1883
    self.client.connect(host=host,port=port)

send_estimates(payload)

Send estimates data via MQTT.

Parameters:

Name Type Description Default
payload str

The payload data to be sent.

required
Source code in redesign_pipeline/flask_utils/output_queue.py
108
109
110
111
112
113
114
115
116
117
118
def send_estimates(self, payload):
    """
    Send estimates data via MQTT.

    Args:
        payload (str): The payload data to be sent.

    """
    logger.debug(type(payload))
    logger.debug(payload)
    self.client.publish("nervoscan/get_estimates",payload=payload)