import smbus
import time
import threading
import RPi.GPIO as GPIO
# MLX90614 Sensor Class
class MLX90614:
MLX90614_RAWIR1 = 0x04
MLX90614_RAWIR2 = 0x05
MLX90614_TA = 0x06
MLX90614_TOBJ1 = 0x07
MLX90614_TOBJ2 = 0x08
MLX90614_TOMAX = 0x20
MLX90614_TOMIN = 0x21
MLX90614_PWMCTRL = 0x22
MLX90614_TARANGE = 0x23
MLX90614_EMISS = 0x24
MLX90614_CONFIG = 0x25
MLX90614_ADDR = 0x0E
MLX90614_ID1 = 0x3C
MLX90614_ID2 = 0x3D
MLX90614_ID3 = 0x3E
MLX90614_ID4 = 0x3F
comm_retries = 5
comm_sleep_amount = 0.1
def __init__(self, address=0x5a, bus_num=1):
self.bus_num = bus_num
self.address = address
self.bus = smbus.SMBus(bus=bus_num)
def read_reg(self, reg_addr):
err = None
for i in range(self.comm_retries):
try:
return self.bus.read_word_data(self.address, reg_addr)
except IOError as e:
err = e
time.sleep(self.comm_sleep_amount)
raise err
def data_to_temp(self, data):
temp = (data * 0.02) - 273.15
return temp
def get_amb_temp(self):
data = self.read_reg(self.MLX90614_TA)
return self.data_to_temp(data)
def get_obj_temp(self):
data = self.read_reg(self.MLX90614_TOBJ1)
return self.data_to_temp(data)
# GPIO Pin Setup
VIBRATION_SENSOR_PIN = 17
RELAY_PIN_20 = 20
RELAY_PIN_21 = 21
GPIO.setmode(GPIO.BCM)
GPIO.setup(VIBRATION_SENSOR_PIN, GPIO.IN, pull_up_down=GPIO.PUD_DOWN)
GPIO.setup(RELAY_PIN_20, GPIO.OUT)
GPIO.setup(RELAY_PIN_21, GPIO.OUT)
# Shared flag to stop threads on emergency
stop_threads = False
def trigger_emergency_stop():
"""Handles emergency stop by setting GPIO 21 HIGH for 3 seconds."""
global stop_threads
stop_threads = True
GPIO.output(RELAY_PIN_21, GPIO.HIGH)
print("Emergency stop activated. GPIO 21 HIGH for 3 seconds.")
time.sleep(3) # Keep GPIO 21 HIGH for 3 seconds
GPIO.output(RELAY_PIN_21, GPIO.LOW)
def monitor_temperature(sensor):
"""Thread to monitor temperature every 0.1 seconds."""
try:
while not stop_threads:
amb_temp = round(sensor.get_amb_temp(), 2)
obj_temp = round(sensor.get_obj_temp(), 2)
print(f"Ambient Temp: {amb_temp} C")
print(f"Object Temp: {obj_temp} C")
if obj_temp >= 27:
print(f"High Temperature Detected: {obj_temp} C")
trigger_emergency_stop()
break
time.sleep(0.1)
except Exception as e:
print(f"Temperature monitoring error: {e}")
def monitor_vibration():
"""Thread to monitor the vibration sensor continuously."""
try:
while not stop_threads:
if GPIO.input(VIBRATION_SENSOR_PIN) == GPIO.HIGH:
print("Vibration detected")
trigger_emergency_stop()
break
time.sleep(0.05) # Check vibration sensor every 50ms
except Exception as e:
print(f"Vibration sensor monitoring error: {e}")
def alternate_relays():
"""Thread to alternate relays every 3 seconds."""
try:
while not stop_threads:
GPIO.output(RELAY_PIN_20, GPIO.HIGH)
GPIO.output(RELAY_PIN_21, GPIO.LOW)
print("Winch unwinding")
time.sleep(3)
GPIO.output(RELAY_PIN_20, GPIO.LOW)
GPIO.output(RELAY_PIN_21, GPIO.HIGH)
print("Winch winding")
time.sleep(3)
except Exception as e:
print(f"Relay control error: {e}")
if __name__ == "__main__":
sensor = MLX90614()
try:
print("Monitoring temperature, vibration, and relays...")
temp_thread = threading.Thread(target=monitor_temperature, args=(sensor,))
vibration_thread = threading.Thread(target=monitor_vibration)
relay_thread = threading.Thread(target=alternate_relays)
temp_thread.start()
vibration_thread.start()
relay_thread.start()
temp_thread.join()
vibration_thread.join()
relay_thread.join()
except KeyboardInterrupt:
print("Exiting program")
stop_threads = True
finally:
GPIO.cleanup()
print("GPIO cleanup completed.")