Fixed broadcast to all deadlock and safer pushAll API

This commit is contained in:
2026-04-09 21:01:22 +02:00
parent d57baa3aff
commit 8a9dd8f27a
12 changed files with 94 additions and 65 deletions

0
.codex Normal file
View File

View File

@@ -5,22 +5,31 @@
#include "router.h"
namespace router {
int sendAll(module::Id source, task::Type type, uint32_t data) {
Task task{module::All, type, data};
int ret = 0;
for (size_t index = 0; index < module::Count; index++) {
if (source < module::Count && index == source) {
continue;
}
ModuleBase *module_ptr = module_registry[index];
if (module_ptr == nullptr) {
continue;
}
if (module_ptr->push(task) != 0) {
ret = 1;
}
}
return ret;
}
int send(const Task &task) {
if (task.target_ == module::All) {
int ret = 0;
for (size_t index = 0; index < module::Count; index++) {
ModuleBase *module_ptr = module_registry[index];
if (module_ptr == nullptr) {
continue;
}
if (module_ptr->push(task) != 0) {
ret = 1;
}
}
return ret;
return sendAll(module::Null, task.type_, task.data_);
}
if (task.target_ >= module::Count) {
return 1;

View File

@@ -10,4 +10,5 @@
namespace router {
int send(const Task& task);
int send(module::Id target, task::Type type, uint32_t data = 0);
int sendAll(module::Id source, task::Type type, uint32_t data = 0);
}

View File

@@ -446,8 +446,13 @@ int Cmd::handleBatterySetLow(unsigned short argc, char* argv[]) {
logger_->info("Setting warning level for VBAT");
}
#endif
router::send(module::Config, task::ConfigVbatSetLow, task_data);
return 0;
int result = router::send(module::Config, task::ConfigVbatSetLow, task_data);
#ifdef ERROR
if (result != 0 && logger_ != nullptr) {
logger_->error("Failed to queue BATTERY_SET_LOW config update");
}
#endif
return result;
}
int Cmd::handleThermoSetLow(unsigned short argc, char* argv[]) {
@@ -467,8 +472,13 @@ int Cmd::handleThermoSetLow(unsigned short argc, char* argv[]) {
logger_->info("Setting low level for TENG");
}
#endif
router::send(module::Config, task::ConfigTengSetLow, task_data);
return 0;
int result = router::send(module::Config, task::ConfigTengSetLow, task_data);
#ifdef ERROR
if (result != 0 && logger_ != nullptr) {
logger_->error("Failed to queue THERMO_SET_LOW config update");
}
#endif
return result;
}
int Cmd::handleThermoSetHigh(unsigned short argc, char* argv[]) {
@@ -480,16 +490,21 @@ int Cmd::handleThermoSetHigh(unsigned short argc, char* argv[]) {
#endif
return 1;
}
float low = strtod(argv[1], nullptr);
float high = strtod(argv[1], nullptr);
uint32_t task_data;
memcpy(&task_data, &low, sizeof(uint32_t));
memcpy(&task_data, &high, sizeof(uint32_t));
#ifdef INFO
if (logger_ != nullptr) {
logger_->info("Setting high level for TENG");
}
#endif
router::send(module::Config, task::ConfigTengSetHigh, task_data);
return 0;
int result = router::send(module::Config, task::ConfigTengSetHigh, task_data);
#ifdef ERROR
if (result != 0 && logger_ != nullptr) {
logger_->error("Failed to queue THERMO_SET_HIGH config update");
}
#endif
return result;
}
int Cmd::handleUnknownCommand(unsigned short argc, char *argv[]) {
@@ -579,7 +594,8 @@ int Cmd::tryParse() {
}
int Cmd::push(const Task &task) {
return queue_.push(task);
(void)task;
return 0;
}
Cmd::Cmd(HardwareSerial *data_stream)

View File

@@ -7,7 +7,6 @@
#include <avr/wdt.h>
#include "base/module_base.h"
#include "base/ring_buffer.h"
#include "base/task.h"
#include "custom_types.h"
#include "modules/logger/system_logger.h"
@@ -39,7 +38,6 @@ private:
char buffer_[256];
unsigned int index_ = 0;
bool buffer_open_ = false;
RingBuffer<Task, 16> queue_;
static const unsigned short MAX_ARGS = 10;

View File

@@ -145,7 +145,7 @@ int Config::writeConfig() {
logger_->info("Config updated and saved to EEPROM");
}
#endif
router::send(module::All, task::AllConfigUpdated);
router::sendAll(module::Config, task::AllConfigUpdated);
return 0;
}
@@ -303,9 +303,11 @@ int Config::handleActiveTask(unsigned long timeout_ms) {
}
default:
break;
// Broadcasts such as AllConfigUpdated may be queued back into Config.
// Unsupported tasks must still be completed so the module does not stall.
this->taskComplete();
return 1;
}
return 0;
}
int Config::autoInit() {
@@ -391,6 +393,6 @@ int Config::loadTrack(unsigned int idx) {
track.root_ = track_data;
trackGlobalWrite(track);
is_track_loaded_ = true;
router::send(module::All, task::AllTrackLoaded);
router::sendAll(module::Config, task::AllTrackLoaded);
return 0;
}

View File

@@ -43,7 +43,7 @@ int Gps::loop(unsigned long timeout_ms) {
if (last_fix_value_ == 0 && current_fix_value > 0) {
router::send(module::Lcd, task::DisplayMsgGpsFix, 2000);
router::send(module::Config, task::ConfigTrackDetect);
router::send(module::All, task::AllGpsFixOk);
router::sendAll(module::Gps, task::AllGpsFixOk);
}
last_fix_value_ = current_fix_value;
}
@@ -129,7 +129,7 @@ int Gps::loop(unsigned long timeout_ms) {
gpsTriggerGlobalWrite(start_line_trigger_);
arm_sign_ = 0;
state_changed_at_ = now;
router::send(module::All, task::AllStartLineTriggered);
router::sendAll(module::Gps, task::AllStartLineTriggered);
}
break;
};

View File

@@ -5,7 +5,10 @@
#include "base/router.h"
#include "data/general_store.h"
int InjectionCounter::push(const Task &task) { return queue_.push(task); }
int InjectionCounter::push(const Task &task) {
(void)task;
return 0;
}
InjectionCounter::InjectionCounter() : logger_(nullptr) {};
@@ -13,7 +16,10 @@ InjectionCounter::InjectionCounter(SystemLogger *logger) : logger_(logger) {};
InjectionCounter::~InjectionCounter() {};
int InjectionCounter::init() { pinMode(INJ_GPIO, INPUT); }
int InjectionCounter::init() {
pinMode(INJ_GPIO, INPUT);
return 0;
}
int InjectionCounter::loop() {
unsigned long now = millis();

View File

@@ -3,7 +3,6 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#pragma once
#include "base/module_base.h"
#include "base/ring_buffer.h"
#include "modules/logger/system_logger.h"
#define INJ_GPIO 37
@@ -11,7 +10,6 @@
class InjectionCounter : public ModuleBase {
private:
SystemLogger *logger_;
RingBuffer<Task, 16> queue_;
unsigned long last_check_;
unsigned long check_interval_ = 100;
unsigned long debounce_ = 100;

View File

@@ -5,7 +5,10 @@
#include "base/router.h"
#include "data/general_store.h"
int Telemetry::push(const Task &task) { return queue_.push(task); }
int Telemetry::push(const Task &task) {
(void)task;
return 0;
}
Telemetry::Telemetry(HardwareSerial* data_stream) : logger_(nullptr), data_stream_(data_stream) {}

View File

@@ -3,7 +3,6 @@
// SPDX-License-Identifier: GPL-3.0-or-later
#pragma once
#include "base/module_base.h"
#include "base/ring_buffer.h"
#include "modules/logger/system_logger.h"
#include "telemetry_common/telemetry_common.h"
#include <Arduino.h>
@@ -13,7 +12,6 @@
class Telemetry : public ModuleBase {
private:
SystemLogger *logger_;
RingBuffer<Task, 16> queue_;
HardwareSerial *data_stream_;
unsigned long last_sent_ = 0;
unsigned long update_interval_ = 1000;

View File

@@ -26,6 +26,25 @@ int Thermocouple::init() {
int Thermocouple::loop(unsigned long timeout_ms) {
(void)timeout_ms;
Task active_task;
int res = queue_.pop(active_task);
if (res == 0) {
if (active_task.target_ == module::All) {
switch (active_task.type_) {
case task::AllConfigUpdated: {
VehicleConfig config;
configGlobalRead(config);
low_ = config.teng_low_;
high_ = config.teng_high_;
break;
}
default:
break;
}
}
}
if (millis() > last_read_at_ + update_interval_) {
temperature_ = thermocouple_->readCelsius();
tengGlobalWrite(temperature_);
@@ -43,28 +62,7 @@ int Thermocouple::loop(unsigned long timeout_ms) {
warning_sent_at_ = millis();
}
}
Task active_task;
int res = queue_.pop(active_task);
if (res == 0) {
if (active_task.target_ == module::Thermocouple) {
} else if (active_task.target_ == module::All) {
switch (active_task.type_) {
case task::AllConfigUpdated: {
VehicleConfig config;
configGlobalRead(config);
low_ = config.teng_low_;
high_ = config.teng_high_;
break;
}
default:
break;
}
};
}
return 0;
}
return 0;
}