init
This commit is contained in:
+445
@@ -0,0 +1,445 @@
|
||||
#include "rpc.hpp"
|
||||
#include <Arduino.h>
|
||||
#include <WiFi.h>
|
||||
#include <LittleFS.h>
|
||||
#include <pb_encode.h>
|
||||
#include <pb_decode.h>
|
||||
#include <vector>
|
||||
#include <cstring>
|
||||
#include <HTTPClient.h>
|
||||
#include <control_communication.pb.h>
|
||||
#include <Stream.h>
|
||||
#include <logger.hpp>
|
||||
#include <hardware_rtc.hpp>
|
||||
|
||||
#define LAST_SYNC_TIME_FILE "/last_sync.time"
|
||||
|
||||
// ---- Constructor / Destructor ----
|
||||
|
||||
RPC::RPC() {
|
||||
// NOTE: Initialization is done here. Ensure this global object is created
|
||||
// AFTER core systems like LittleFS are initialized in setup().
|
||||
apiKey_[0] = '\0';
|
||||
baseUrl_[0] = '\0';
|
||||
|
||||
if (!accessLogQueue) {
|
||||
accessLogQueue = xQueueCreate(100, sizeof(RpcAccessLogEntry));
|
||||
if (!accessLogQueue) {
|
||||
LOG_ERROR("RPC: FATAL: Failed to create access log queue!");
|
||||
}
|
||||
}
|
||||
|
||||
syncTriggerSemaphore_ = xSemaphoreCreateBinary();
|
||||
if (!syncTriggerSemaphore_) {
|
||||
LOG_ERROR("RPC: FATAL: Failed to create sync trigger semaphore!");
|
||||
}
|
||||
|
||||
// Load the last sync time from persistent storage on startup.
|
||||
this->lastSyncTime = loadLastSyncTime();
|
||||
LOG_INFO("RPC: Instance created. Loaded last sync timestamp: %s", HardwareRTC::toDateString(this->lastSyncTime).c_str());
|
||||
}
|
||||
|
||||
RPC::~RPC() {
|
||||
stopAutoSync();
|
||||
if (accessLogQueue) {
|
||||
vQueueDelete(accessLogQueue);
|
||||
accessLogQueue = nullptr;
|
||||
}
|
||||
if (syncTriggerSemaphore_) {
|
||||
vSemaphoreDelete(syncTriggerSemaphore_);
|
||||
syncTriggerSemaphore_ = nullptr;
|
||||
}
|
||||
}
|
||||
|
||||
// ---- Public Methods ----
|
||||
|
||||
bool RPC::sync(bool forceDownload) {
|
||||
if (isSyncing_) {
|
||||
logger.warn("RPC: Sync aborted, another sync is already in progress.");
|
||||
return false;
|
||||
}
|
||||
if (baseUrl_[0] == '\0') {
|
||||
LOG_ERROR("RPC: Sync aborted (Base URL not set). Call startAutoSync first.");
|
||||
return false;
|
||||
}
|
||||
if (WiFi.status() != WL_CONNECTED) {
|
||||
LOG_ERROR("RPC: Sync aborted (no WiFi connection).");
|
||||
return false;
|
||||
}
|
||||
|
||||
isSyncing_ = true;
|
||||
LOG_INFO("RPC: Starting synchronization process...");
|
||||
|
||||
bool pendingChanges = false;
|
||||
bool syncSuccess = false;
|
||||
|
||||
if (!syncLogsAndCheckForUpdates(&pendingChanges)) {
|
||||
LOG_ERROR("RPC: Log synchronization failed.");
|
||||
syncSuccess = false;
|
||||
} else {
|
||||
// Log sync was successful, now check if a DB download is needed.
|
||||
if (pendingChanges || forceDownload) {
|
||||
const char* dbPath = "/rfid.db";
|
||||
LOG_DEBUG("RPC: Pending changes: %d, forceDownload: %d", pendingChanges, forceDownload);
|
||||
if (!downloadDatabaseToFile(dbPath)) {
|
||||
LOG_ERROR("RPC: Failed to download database.");
|
||||
syncSuccess = false;
|
||||
} else {
|
||||
syncSuccess = true;
|
||||
}
|
||||
} else {
|
||||
LOG_DEBUG("RPC: No pending changes reported by server, skipping database download.");
|
||||
syncSuccess = true;
|
||||
}
|
||||
}
|
||||
|
||||
isSyncing_ = false;
|
||||
if(syncSuccess) {
|
||||
LOG_INFO("RPC: Synchronization completed successfully.");
|
||||
} else {
|
||||
LOG_ERROR("RPC: Synchronization failed.");
|
||||
}
|
||||
return syncSuccess;
|
||||
}
|
||||
|
||||
void RPC::addAccessLog(uint32_t rfidId) {
|
||||
if (!accessLogQueue) {
|
||||
return; // Queue not created, do nothing.
|
||||
}
|
||||
struct timeval tv;
|
||||
RpcAccessLogEntry entry;
|
||||
|
||||
entry.timestamp = HardwareRTC::getSystemTime();
|
||||
entry.rfidId = rfidId;
|
||||
|
||||
if (xQueueSend(accessLogQueue, &entry, 0) != pdPASS) {
|
||||
LOG_WARN("RPC: Access log queue is full, dropping entry.");
|
||||
return;
|
||||
}
|
||||
|
||||
// --- TRIGGER LOGIC ---
|
||||
// If the queue has reached the threshold and we can sync, trigger it.
|
||||
if (uxQueueMessagesWaiting(accessLogQueue) >= LOG_SYNC_THRESHOLD && baseUrl_[0] != '\0') {
|
||||
LOG_INFO("RPC: Queue size: %d. Triggering immediate sync.", uxQueueMessagesWaiting(accessLogQueue));
|
||||
xSemaphoreGive(syncTriggerSemaphore_);
|
||||
}
|
||||
}
|
||||
|
||||
// ---- Auto-Sync Task Management ----
|
||||
|
||||
void RPC::startAutoSync(uint32_t intervalMs) {
|
||||
stopAutoSync(); // Always stop any existing task to ensure a clean restart.
|
||||
|
||||
if (baseUrl_[0] == '\0') {
|
||||
LOG_ERROR("RPC: Cannot start auto-sync. Base URL is not configured.");
|
||||
return;
|
||||
}
|
||||
|
||||
LOG_INFO("RPC: Starting auto-sync");
|
||||
syncIntervalMs = intervalMs;
|
||||
autoSyncRunning = true;
|
||||
|
||||
BaseType_t result = xTaskCreate(
|
||||
autoSyncTask, "AutoSyncTask", 8192, this, 1, &autoSyncTaskHandle
|
||||
);
|
||||
|
||||
if (result != pdPASS) {
|
||||
LOG_ERROR("RPC: Failed to create auto-sync task.");
|
||||
autoSyncRunning = false;
|
||||
autoSyncTaskHandle = nullptr;
|
||||
} else {
|
||||
LOG_INFO("RPC: Auto-sync task started successfully.");
|
||||
}
|
||||
}
|
||||
|
||||
void RPC::stopAutoSync() {
|
||||
if (!autoSyncTaskHandle) {
|
||||
return;
|
||||
}
|
||||
|
||||
LOG_INFO("RPC: Attempting to stop auto-sync task...");
|
||||
autoSyncRunning = false; // Signal the task to exit its loop
|
||||
|
||||
// Wake the task if it's waiting on the semaphore
|
||||
if (syncTriggerSemaphore_) {
|
||||
xSemaphoreGive(syncTriggerSemaphore_);
|
||||
}
|
||||
|
||||
// Wait for the task to terminate itself (indicated by handle becoming nullptr)
|
||||
uint32_t startTime = millis();
|
||||
while (autoSyncTaskHandle != nullptr && millis() - startTime < 1000) {
|
||||
vTaskDelay(pdMS_TO_TICKS(50));
|
||||
}
|
||||
|
||||
// If the task did not stop gracefully, force its deletion
|
||||
if (autoSyncTaskHandle != nullptr) {
|
||||
LOG_ERROR("RPC: Task did not stop gracefully. Forcing deletion.");
|
||||
vTaskDelete(autoSyncTaskHandle);
|
||||
autoSyncTaskHandle = nullptr;
|
||||
} else {
|
||||
LOG_INFO("RPC: Auto-sync task stopped gracefully.");
|
||||
}
|
||||
}
|
||||
|
||||
void RPC::setApiKeyAndBaseUrl(const char* apiKey, const char* baseUrl) {
|
||||
if (apiKey) {
|
||||
strncpy(apiKey_, apiKey, sizeof(apiKey_) - 1);
|
||||
apiKey_[sizeof(apiKey_) - 1] = '\0'; // Ensure null-termination
|
||||
} else {
|
||||
apiKey_[0] = '\0'; // Set to empty string
|
||||
}
|
||||
|
||||
if (baseUrl) {
|
||||
strncpy(baseUrl_, baseUrl, sizeof(baseUrl_) - 1);
|
||||
baseUrl_[sizeof(baseUrl_) - 1] = '\0';
|
||||
} else {
|
||||
baseUrl_[0] = '\0';
|
||||
}
|
||||
}
|
||||
|
||||
// ---- Private Helper Methods ----
|
||||
|
||||
HttpResponse RPC::sendRequest(const String& url, const String& method, const uint8_t* payload, size_t payloadLen, const String& contentType) {
|
||||
HttpResponse response; // Default httpCode is 0
|
||||
if (WiFi.status() != WL_CONNECTED) {
|
||||
response.httpCode = -1; // Custom code for no WiFi
|
||||
return response;
|
||||
}
|
||||
|
||||
HTTPClient http;
|
||||
http.begin(url);
|
||||
http.setTimeout(10000);
|
||||
|
||||
if (apiKey_[0] != '\0') {
|
||||
http.addHeader("x-device-key", apiKey_);
|
||||
}
|
||||
|
||||
if (method.equalsIgnoreCase("POST")) {
|
||||
http.addHeader("Content-Type", contentType);
|
||||
response.httpCode = http.POST(const_cast<uint8_t*>(payload), payloadLen);
|
||||
} else {
|
||||
response.httpCode = http.GET();
|
||||
}
|
||||
|
||||
if (response.httpCode > 0) {
|
||||
int len = http.getSize();
|
||||
if (len > 0) {
|
||||
response.payload.resize(len);
|
||||
WiFiClient* stream = http.getStreamPtr();
|
||||
stream->readBytes(response.payload.data(), len);
|
||||
}
|
||||
} else {
|
||||
LOG_ERROR(("RPC: HTTP " + method + " " + url + " failed: " + http.errorToString(response.httpCode)).c_str());
|
||||
}
|
||||
|
||||
http.end(); // Encapsulated resource management
|
||||
return response;
|
||||
}
|
||||
|
||||
|
||||
bool RPC::syncLogsAndCheckForUpdates(bool* pendingChanges) {
|
||||
// NOTE: This implementation assumes the logs should be sent with the sync request.
|
||||
// The previous code collected logs but did not send them. This version does.
|
||||
std::vector<RpcAccessLogEntry> logBatch;
|
||||
RpcAccessLogEntry entry;
|
||||
|
||||
// Drain the queue to send all pending logs.
|
||||
// To be more memory efficient on larger queues, this could be done in batches.
|
||||
while (xQueueReceive(accessLogQueue, &entry, 0) == pdPASS) {
|
||||
logBatch.push_back(entry);
|
||||
}
|
||||
|
||||
control_communication_SyncRequest request = control_communication_SyncRequest_init_zero;
|
||||
request.has_lastSync = (lastSyncTime > 0);
|
||||
request.lastSync = lastSyncTime;
|
||||
|
||||
request.accessLogs.arg = &logBatch;
|
||||
request.accessLogs.funcs.encode = [](pb_ostream_t* stream, const pb_field_t* field, void* const* arg) -> bool {
|
||||
auto logs = static_cast<std::vector<RpcAccessLogEntry>*>(*arg);
|
||||
if (!logs || logs->empty()) {
|
||||
return true; // No logs to encode, return success
|
||||
}
|
||||
for (const auto& log : *logs) {
|
||||
control_communication_SyncRequest_AccessLogsEntry entry = { log.timestamp, log.rfidId };
|
||||
if (!pb_encode_tag_for_field(stream, field)) {
|
||||
return false;
|
||||
}
|
||||
if (!pb_encode_submessage(stream, control_communication_SyncRequest_AccessLogsEntry_fields, &entry)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
};
|
||||
|
||||
uint8_t requestBuffer[512];
|
||||
pb_ostream_t ostream = pb_ostream_from_buffer(requestBuffer, sizeof(requestBuffer));
|
||||
if (!pb_encode(&ostream, control_communication_SyncRequest_fields, &request)) {
|
||||
LOG_ERROR("RPC: Failed to encode sync request.");
|
||||
return false;
|
||||
}
|
||||
|
||||
|
||||
String url = String(baseUrl_) + "device-communication/sync-binary";
|
||||
HttpResponse httpResponse = sendRequest(url, "POST", requestBuffer, ostream.bytes_written, "application/octet-stream");
|
||||
|
||||
if (httpResponse.httpCode != HTTP_CODE_OK) {
|
||||
LOG_ERROR("RPC: Sync logs request failed with HTTP code %d", httpResponse.httpCode)
|
||||
return false;
|
||||
}
|
||||
|
||||
// A successful connection was made.
|
||||
lastSyncConnection = millis();
|
||||
|
||||
control_communication_SyncResponse response = control_communication_SyncResponse_init_zero;
|
||||
pb_istream_t istream = pb_istream_from_buffer(httpResponse.payload.data(), httpResponse.payload.size());
|
||||
|
||||
if (!pb_decode(&istream, control_communication_SyncResponse_fields, &response)) {
|
||||
LOG_ERROR("RPC: Failed to decode sync response.");
|
||||
return false;
|
||||
}
|
||||
|
||||
// As per your design, only update lastSyncTime if the server reports pending changes.
|
||||
if (response.pendingChanges) {
|
||||
this->lastSyncTime = response.currentTime;
|
||||
saveLastSyncTime(this->lastSyncTime);
|
||||
}
|
||||
|
||||
// Pass pendingChanges flag back to the caller.
|
||||
if (pendingChanges) {
|
||||
*pendingChanges = response.pendingChanges;
|
||||
}
|
||||
|
||||
if(newTimeCallback_){
|
||||
newTimeCallback_(response.currentTime);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
bool RPC::downloadDatabaseToFile(const char* filePath) {
|
||||
String url = String(baseUrl_) + "device-communication/db";
|
||||
|
||||
if (WiFi.status() != WL_CONNECTED) {
|
||||
return false;
|
||||
}
|
||||
|
||||
HTTPClient http;
|
||||
http.begin(url);
|
||||
http.setTimeout(10000);
|
||||
if (apiKey_[0] != '\0') {
|
||||
http.addHeader("x-device-key", apiKey_);
|
||||
}
|
||||
|
||||
int httpCode = http.GET();
|
||||
if (httpCode != HTTP_CODE_OK) {
|
||||
LOG_ERROR(("RPC: DB download failed with HTTP code " + String(httpCode)).c_str());
|
||||
http.end();
|
||||
return false;
|
||||
}
|
||||
|
||||
lastSyncConnection = millis();
|
||||
|
||||
int len = http.getSize();
|
||||
if (len <= 0) { // Removed the multiple of 4 check, as it might be too restrictive.
|
||||
LOG_ERROR(("RPC: Invalid database size: " + String(len) + " bytes.").c_str());
|
||||
http.end();
|
||||
return false;
|
||||
}
|
||||
|
||||
LOG_INFO(("RPC: Downloading database (" + String(len) + " bytes) to file '" + String(filePath) + "'...").c_str());
|
||||
|
||||
File dbFile = LittleFS.open(filePath, "w");
|
||||
if (!dbFile) {
|
||||
LOG_ERROR(("RPC: Failed to open " + String(filePath) + " for writing.").c_str());
|
||||
http.end();
|
||||
return false;
|
||||
}
|
||||
|
||||
// Write the file content by streaming it from the WiFi client.
|
||||
WiFiClient* stream = http.getStreamPtr();
|
||||
const size_t bufferSize = 1024;
|
||||
uint8_t buffer[bufferSize];
|
||||
size_t written = 0;
|
||||
|
||||
// Read from the stream in chunks until all bytes are received
|
||||
while (http.connected() && (written < (size_t)len)) {
|
||||
// Get available data size
|
||||
size_t available = stream->available();
|
||||
if (available) {
|
||||
// Read up to bufferSize bytes
|
||||
int bytesRead = stream->read(buffer, std::min(bufferSize, available));
|
||||
// Write the chunk to the file
|
||||
dbFile.write(buffer, bytesRead);
|
||||
written += bytesRead;
|
||||
}
|
||||
}
|
||||
|
||||
dbFile.close();
|
||||
http.end();
|
||||
|
||||
if (written != (size_t)len) {
|
||||
LOG_ERROR("RPC: File download incomplete. Wrote %d of %d bytes.", (unsigned long)written, len);
|
||||
LittleFS.remove(filePath); // Delete partial file
|
||||
return false;
|
||||
}
|
||||
|
||||
LOG_INFO("RPC: Database downloaded successfully.");
|
||||
return true;
|
||||
}
|
||||
|
||||
// ---- Persistence Methods ----
|
||||
|
||||
void RPC::saveLastSyncTime(time_t timestamp) {
|
||||
File file = LittleFS.open(LAST_SYNC_TIME_FILE, "w");
|
||||
if (!file) {
|
||||
LOG_ERROR("RPC: Failed to open last_sync.time for writing.");
|
||||
return;
|
||||
}
|
||||
file.write(reinterpret_cast<const uint8_t*>(×tamp), sizeof(timestamp));
|
||||
file.close();
|
||||
}
|
||||
|
||||
time_t RPC::loadLastSyncTime() {
|
||||
if (!LittleFS.exists(LAST_SYNC_TIME_FILE)) {
|
||||
return 0;
|
||||
}
|
||||
File file = LittleFS.open(LAST_SYNC_TIME_FILE, "r");
|
||||
if (!file || file.size() != sizeof(time_t)) {
|
||||
LOG_ERROR("RPC: Corrupt or unreadable timestamp file. Resetting to 0.");
|
||||
if(file) file.close();
|
||||
LittleFS.remove(LAST_SYNC_TIME_FILE);
|
||||
return 0;
|
||||
}
|
||||
|
||||
time_t timestamp = 0;
|
||||
file.read(reinterpret_cast<uint8_t*>(×tamp), sizeof(timestamp));
|
||||
file.close();
|
||||
return timestamp;
|
||||
}
|
||||
|
||||
// ---- FreeRTOS Task ----
|
||||
|
||||
void RPC::autoSyncTask(void* pvParameters) {
|
||||
RPC* rpc = static_cast<RPC*>(pvParameters);
|
||||
|
||||
while (rpc->autoSyncRunning) {
|
||||
// Wait for either the timer to expire OR the trigger semaphore to be given.
|
||||
if (xSemaphoreTake(rpc->syncTriggerSemaphore_, pdMS_TO_TICKS(rpc->syncIntervalMs)) == pdTRUE) {
|
||||
LOG_INFO("RPC: Sync triggered by log queue.");
|
||||
} else {
|
||||
LOG_INFO("RPC: Sync triggered by timer.");
|
||||
}
|
||||
|
||||
if (!rpc->autoSyncRunning) {
|
||||
break; // Exit immediately if stop was called while waiting
|
||||
}
|
||||
|
||||
rpc->sync(); // Call sync without forcing a download
|
||||
}
|
||||
|
||||
// Safer handshake to signal graceful shutdown
|
||||
rpc->autoSyncTaskHandle = nullptr;
|
||||
|
||||
vTaskDelete(NULL); // Task deletes itself
|
||||
}
|
||||
|
||||
RPC rpc;
|
||||
Reference in New Issue
Block a user