forked from equinor/isar
-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
99 lines (83 loc) · 3.27 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
import logging
import time
from logging import Logger
from threading import Thread
from typing import List
from injector import Injector
from isar.apis.api import API
from isar.config.keyvault.keyvault_service import Keyvault
from isar.config.log import setup_loggers
from isar.config.settings import settings
from isar.models.communication.queues.queues import Queues
from isar.modules import get_injector
from isar.services.service_connections.mqtt.mqtt_client import MqttClient
from isar.services.service_connections.mqtt.robot_heartbeat_publisher import (
RobotHeartbeatPublisher,
)
from isar.services.service_connections.mqtt.robot_info_publisher import (
RobotInfoPublisher,
)
from isar.state_machine.state_machine import StateMachine, main
from isar.storage.uploader import Uploader
from robot_interface.robot_interface import RobotInterface
if __name__ == "__main__":
injector: Injector = get_injector()
keyvault_client = injector.get(Keyvault)
setup_loggers(keyvault=keyvault_client)
logger: Logger = logging.getLogger("main")
state_machine: StateMachine = injector.get(StateMachine)
uploader: Uploader = injector.get(Uploader)
robot: RobotInterface = injector.get(RobotInterface)
queues: Queues = injector.get(Queues)
threads: List[Thread] = []
state_machine_thread: Thread = Thread(
target=main, name="ISAR State Machine", args=[state_machine], daemon=True
)
threads.append(state_machine_thread)
uploader_thread: Thread = Thread(
target=uploader.run, name="ISAR Uploader", daemon=True
)
threads.append(uploader_thread)
if settings.MQTT_ENABLED:
mqtt_client: MqttClient = MqttClient(mqtt_queue=queues.mqtt_queue)
mqtt_thread: Thread = Thread(
target=mqtt_client.run, name="ISAR MQTT Client", daemon=True
)
threads.append(mqtt_thread)
robot_info_publisher: RobotInfoPublisher = RobotInfoPublisher(
mqtt_queue=queues.mqtt_queue
)
robot_info_thread: Thread = Thread(
target=robot_info_publisher.run,
name="ISAR Robot Info Publisher",
daemon=True,
)
threads.append(robot_info_thread)
robot_heartbeat_publisher: RobotHeartbeatPublisher = RobotHeartbeatPublisher(
mqtt_queue=queues.mqtt_queue
)
robot_heartbeat_thread: Thread = Thread(
target=robot_heartbeat_publisher.run,
name="ISAR Robot Heartbeat Publisher",
daemon=True,
)
threads.append(robot_heartbeat_thread)
publishers: List[Thread] = robot.get_telemetry_publishers(
queue=queues.mqtt_queue,
robot_name=settings.ROBOT_NAME,
isar_id=settings.ISAR_ID,
)
if publishers:
threads.extend(publishers)
api: API = injector.get(API)
api_thread: Thread = Thread(target=api.run_app, name="ISAR API", daemon=True)
threads.append(api_thread)
for thread in threads:
thread.start()
logger.info(f"Started thread: {thread.name}")
while True:
for thread in threads:
if not thread.is_alive():
logger.critical("Thread '%s' failed - ISAR shutting down", thread.name)
exit(1)
time.sleep(state_machine.sleep_time)