#include "rpc.hpp" #include #include #include #include #include #include #include #include #include #include #include #include #define LAST_SYNC_TIME_FILE "/last_sync.time" // ---- Constructor / Destructor ---- RPC::RPC() { // NOTE: Initialization is done here. Ensure this global object is created // AFTER core systems like LittleFS are initialized in setup(). apiKey_[0] = '\0'; baseUrl_[0] = '\0'; if (!accessLogQueue) { accessLogQueue = xQueueCreate(100, sizeof(RpcAccessLogEntry)); if (!accessLogQueue) { LOG_ERROR("RPC: FATAL: Failed to create access log queue!"); } } syncTriggerSemaphore_ = xSemaphoreCreateBinary(); if (!syncTriggerSemaphore_) { LOG_ERROR("RPC: FATAL: Failed to create sync trigger semaphore!"); } // Load the last sync time from persistent storage on startup. this->lastSyncTime = loadLastSyncTime(); LOG_INFO("RPC: Instance created. Loaded last sync timestamp: %s", HardwareRTC::toDateString(this->lastSyncTime).c_str()); } RPC::~RPC() { stopAutoSync(); if (accessLogQueue) { vQueueDelete(accessLogQueue); accessLogQueue = nullptr; } if (syncTriggerSemaphore_) { vSemaphoreDelete(syncTriggerSemaphore_); syncTriggerSemaphore_ = nullptr; } } // ---- Public Methods ---- bool RPC::sync(bool forceDownload) { if (isSyncing_) { logger.warn("RPC: Sync aborted, another sync is already in progress."); return false; } if (baseUrl_[0] == '\0') { LOG_ERROR("RPC: Sync aborted (Base URL not set). Call startAutoSync first."); return false; } if (WiFi.status() != WL_CONNECTED) { LOG_ERROR("RPC: Sync aborted (no WiFi connection)."); return false; } isSyncing_ = true; LOG_INFO("RPC: Starting synchronization process..."); bool pendingChanges = false; bool syncSuccess = false; if (!syncLogsAndCheckForUpdates(&pendingChanges)) { LOG_ERROR("RPC: Log synchronization failed."); syncSuccess = false; } else { // Log sync was successful, now check if a DB download is needed. if (pendingChanges || forceDownload) { const char* dbPath = "/rfid.db"; LOG_DEBUG("RPC: Pending changes: %d, forceDownload: %d", pendingChanges, forceDownload); if (!downloadDatabaseToFile(dbPath)) { LOG_ERROR("RPC: Failed to download database."); syncSuccess = false; } else { syncSuccess = true; } } else { LOG_DEBUG("RPC: No pending changes reported by server, skipping database download."); syncSuccess = true; } } isSyncing_ = false; if(syncSuccess) { LOG_INFO("RPC: Synchronization completed successfully."); } else { LOG_ERROR("RPC: Synchronization failed."); } return syncSuccess; } void RPC::addAccessLog(uint32_t rfidId) { if (!accessLogQueue) { return; // Queue not created, do nothing. } struct timeval tv; RpcAccessLogEntry entry; entry.timestamp = HardwareRTC::getSystemTime(); entry.rfidId = rfidId; if (xQueueSend(accessLogQueue, &entry, 0) != pdPASS) { LOG_WARN("RPC: Access log queue is full, dropping entry."); return; } // --- TRIGGER LOGIC --- // If the queue has reached the threshold and we can sync, trigger it. if (uxQueueMessagesWaiting(accessLogQueue) >= LOG_SYNC_THRESHOLD && baseUrl_[0] != '\0') { LOG_INFO("RPC: Queue size: %d. Triggering immediate sync.", uxQueueMessagesWaiting(accessLogQueue)); xSemaphoreGive(syncTriggerSemaphore_); } } // ---- Auto-Sync Task Management ---- void RPC::startAutoSync(uint32_t intervalMs) { stopAutoSync(); // Always stop any existing task to ensure a clean restart. if (baseUrl_[0] == '\0') { LOG_ERROR("RPC: Cannot start auto-sync. Base URL is not configured."); return; } LOG_INFO("RPC: Starting auto-sync"); syncIntervalMs = intervalMs; autoSyncRunning = true; BaseType_t result = xTaskCreate( autoSyncTask, "AutoSyncTask", 8192, this, 1, &autoSyncTaskHandle ); if (result != pdPASS) { LOG_ERROR("RPC: Failed to create auto-sync task."); autoSyncRunning = false; autoSyncTaskHandle = nullptr; } else { LOG_INFO("RPC: Auto-sync task started successfully."); } } void RPC::stopAutoSync() { if (!autoSyncTaskHandle) { return; } LOG_INFO("RPC: Attempting to stop auto-sync task..."); autoSyncRunning = false; // Signal the task to exit its loop // Wake the task if it's waiting on the semaphore if (syncTriggerSemaphore_) { xSemaphoreGive(syncTriggerSemaphore_); } // Wait for the task to terminate itself (indicated by handle becoming nullptr) uint32_t startTime = millis(); while (autoSyncTaskHandle != nullptr && millis() - startTime < 1000) { vTaskDelay(pdMS_TO_TICKS(50)); } // If the task did not stop gracefully, force its deletion if (autoSyncTaskHandle != nullptr) { LOG_ERROR("RPC: Task did not stop gracefully. Forcing deletion."); vTaskDelete(autoSyncTaskHandle); autoSyncTaskHandle = nullptr; } else { LOG_INFO("RPC: Auto-sync task stopped gracefully."); } } void RPC::setApiKeyAndBaseUrl(const char* apiKey, const char* baseUrl) { if (apiKey) { strncpy(apiKey_, apiKey, sizeof(apiKey_) - 1); apiKey_[sizeof(apiKey_) - 1] = '\0'; // Ensure null-termination } else { apiKey_[0] = '\0'; // Set to empty string } if (baseUrl) { strncpy(baseUrl_, baseUrl, sizeof(baseUrl_) - 1); baseUrl_[sizeof(baseUrl_) - 1] = '\0'; } else { baseUrl_[0] = '\0'; } } // ---- Private Helper Methods ---- HttpResponse RPC::sendRequest(const String& url, const String& method, const uint8_t* payload, size_t payloadLen, const String& contentType) { HttpResponse response; // Default httpCode is 0 if (WiFi.status() != WL_CONNECTED) { response.httpCode = -1; // Custom code for no WiFi return response; } HTTPClient http; http.begin(url); http.setTimeout(10000); if (apiKey_[0] != '\0') { http.addHeader("x-device-key", apiKey_); } if (method.equalsIgnoreCase("POST")) { http.addHeader("Content-Type", contentType); response.httpCode = http.POST(const_cast(payload), payloadLen); } else { response.httpCode = http.GET(); } if (response.httpCode > 0) { int len = http.getSize(); if (len > 0) { response.payload.resize(len); WiFiClient* stream = http.getStreamPtr(); stream->readBytes(response.payload.data(), len); } } else { LOG_ERROR(("RPC: HTTP " + method + " " + url + " failed: " + http.errorToString(response.httpCode)).c_str()); } http.end(); // Encapsulated resource management return response; } bool RPC::syncLogsAndCheckForUpdates(bool* pendingChanges) { // NOTE: This implementation assumes the logs should be sent with the sync request. // The previous code collected logs but did not send them. This version does. std::vector logBatch; RpcAccessLogEntry entry; // Drain the queue to send all pending logs. // To be more memory efficient on larger queues, this could be done in batches. while (xQueueReceive(accessLogQueue, &entry, 0) == pdPASS) { logBatch.push_back(entry); } control_communication_SyncRequest request = control_communication_SyncRequest_init_zero; request.has_lastSync = (lastSyncTime > 0); request.lastSync = lastSyncTime; request.accessLogs.arg = &logBatch; request.accessLogs.funcs.encode = [](pb_ostream_t* stream, const pb_field_t* field, void* const* arg) -> bool { auto logs = static_cast*>(*arg); if (!logs || logs->empty()) { return true; // No logs to encode, return success } for (const auto& log : *logs) { control_communication_SyncRequest_AccessLogsEntry entry = { log.timestamp, log.rfidId }; if (!pb_encode_tag_for_field(stream, field)) { return false; } if (!pb_encode_submessage(stream, control_communication_SyncRequest_AccessLogsEntry_fields, &entry)) { return false; } } return true; }; uint8_t requestBuffer[512]; pb_ostream_t ostream = pb_ostream_from_buffer(requestBuffer, sizeof(requestBuffer)); if (!pb_encode(&ostream, control_communication_SyncRequest_fields, &request)) { LOG_ERROR("RPC: Failed to encode sync request."); return false; } String url = String(baseUrl_) + "device-communication/sync-binary"; HttpResponse httpResponse = sendRequest(url, "POST", requestBuffer, ostream.bytes_written, "application/octet-stream"); if (httpResponse.httpCode != HTTP_CODE_OK) { LOG_ERROR("RPC: Sync logs request failed with HTTP code %d", httpResponse.httpCode) return false; } // A successful connection was made. lastSyncConnection = millis(); control_communication_SyncResponse response = control_communication_SyncResponse_init_zero; pb_istream_t istream = pb_istream_from_buffer(httpResponse.payload.data(), httpResponse.payload.size()); if (!pb_decode(&istream, control_communication_SyncResponse_fields, &response)) { LOG_ERROR("RPC: Failed to decode sync response."); return false; } // As per your design, only update lastSyncTime if the server reports pending changes. if (response.pendingChanges) { this->lastSyncTime = response.currentTime; saveLastSyncTime(this->lastSyncTime); } // Pass pendingChanges flag back to the caller. if (pendingChanges) { *pendingChanges = response.pendingChanges; } if(newTimeCallback_){ newTimeCallback_(response.currentTime); } return true; } bool RPC::downloadDatabaseToFile(const char* filePath) { String url = String(baseUrl_) + "device-communication/db"; if (WiFi.status() != WL_CONNECTED) { return false; } HTTPClient http; http.begin(url); http.setTimeout(10000); if (apiKey_[0] != '\0') { http.addHeader("x-device-key", apiKey_); } int httpCode = http.GET(); if (httpCode != HTTP_CODE_OK) { LOG_ERROR(("RPC: DB download failed with HTTP code " + String(httpCode)).c_str()); http.end(); return false; } lastSyncConnection = millis(); int len = http.getSize(); if (len <= 0) { // Removed the multiple of 4 check, as it might be too restrictive. LOG_ERROR(("RPC: Invalid database size: " + String(len) + " bytes.").c_str()); http.end(); return false; } LOG_INFO(("RPC: Downloading database (" + String(len) + " bytes) to file '" + String(filePath) + "'...").c_str()); File dbFile = LittleFS.open(filePath, "w"); if (!dbFile) { LOG_ERROR(("RPC: Failed to open " + String(filePath) + " for writing.").c_str()); http.end(); return false; } // Write the file content by streaming it from the WiFi client. WiFiClient* stream = http.getStreamPtr(); const size_t bufferSize = 1024; uint8_t buffer[bufferSize]; size_t written = 0; // Read from the stream in chunks until all bytes are received while (http.connected() && (written < (size_t)len)) { // Get available data size size_t available = stream->available(); if (available) { // Read up to bufferSize bytes int bytesRead = stream->read(buffer, std::min(bufferSize, available)); // Write the chunk to the file dbFile.write(buffer, bytesRead); written += bytesRead; } } dbFile.close(); http.end(); if (written != (size_t)len) { LOG_ERROR("RPC: File download incomplete. Wrote %d of %d bytes.", (unsigned long)written, len); LittleFS.remove(filePath); // Delete partial file return false; } LOG_INFO("RPC: Database downloaded successfully."); return true; } // ---- Persistence Methods ---- void RPC::saveLastSyncTime(time_t timestamp) { File file = LittleFS.open(LAST_SYNC_TIME_FILE, "w"); if (!file) { LOG_ERROR("RPC: Failed to open last_sync.time for writing."); return; } file.write(reinterpret_cast(×tamp), sizeof(timestamp)); file.close(); } time_t RPC::loadLastSyncTime() { if (!LittleFS.exists(LAST_SYNC_TIME_FILE)) { return 0; } File file = LittleFS.open(LAST_SYNC_TIME_FILE, "r"); if (!file || file.size() != sizeof(time_t)) { LOG_ERROR("RPC: Corrupt or unreadable timestamp file. Resetting to 0."); if(file) file.close(); LittleFS.remove(LAST_SYNC_TIME_FILE); return 0; } time_t timestamp = 0; file.read(reinterpret_cast(×tamp), sizeof(timestamp)); file.close(); return timestamp; } // ---- FreeRTOS Task ---- void RPC::autoSyncTask(void* pvParameters) { RPC* rpc = static_cast(pvParameters); while (rpc->autoSyncRunning) { // Wait for either the timer to expire OR the trigger semaphore to be given. if (xSemaphoreTake(rpc->syncTriggerSemaphore_, pdMS_TO_TICKS(rpc->syncIntervalMs)) == pdTRUE) { LOG_INFO("RPC: Sync triggered by log queue."); } else { LOG_INFO("RPC: Sync triggered by timer."); } if (!rpc->autoSyncRunning) { break; // Exit immediately if stop was called while waiting } rpc->sync(); // Call sync without forcing a download } // Safer handshake to signal graceful shutdown rpc->autoSyncTaskHandle = nullptr; vTaskDelete(NULL); // Task deletes itself } RPC rpc;