[feat] add device presence support

This commit is contained in:
dijunkun
2026-02-28 13:54:55 +08:00
parent 7d1b10ede4
commit b2fef4d688
9 changed files with 369 additions and 14 deletions

View File

@@ -54,6 +54,19 @@ void DeviceDBManager::InitDB() {
"INSERT INTO device_id_seq (next_id) "
"SELECT 1 WHERE NOT EXISTS (SELECT 1 FROM device_id_seq);";
const char* sql_presence =
"CREATE TABLE IF NOT EXISTS device_presence ("
"device_id TEXT PRIMARY KEY,"
"online INTEGER NOT NULL,"
"updated_at INTEGER NOT NULL"
");";
const char* sql_user_devices =
"CREATE TABLE IF NOT EXISTS user_devices ("
"user_id TEXT NOT NULL,"
"device_id TEXT NOT NULL,"
"PRIMARY KEY(user_id, device_id)"
");";
char* err_msg = nullptr;
if (sqlite3_exec(db_, sql_devices, nullptr, nullptr, &err_msg) != SQLITE_OK) {
@@ -74,6 +87,19 @@ void DeviceDBManager::InitDB() {
sqlite3_free(err_msg);
return;
}
if (sqlite3_exec(db_, sql_presence, nullptr, nullptr, &err_msg) !=
SQLITE_OK) {
LOG_ERROR("Failed to create device_presence table: {}", err_msg);
sqlite3_free(err_msg);
return;
}
if (sqlite3_exec(db_, sql_user_devices, nullptr, nullptr, &err_msg) !=
SQLITE_OK) {
LOG_ERROR("Failed to create user_devices table: {}", err_msg);
sqlite3_free(err_msg);
return;
}
}
std::string DeviceDBManager::Sha256(const std::string& str) {
@@ -384,6 +410,70 @@ bool DeviceDBManager::UpdatePassword(const std::string& device_id,
return success;
}
bool DeviceDBManager::SetUserDevices(
const std::string& user_id, const std::vector<std::string>& device_ids) {
if (db_ == nullptr) {
LOG_ERROR("Database is not initialized in SetUserDevices.");
return false;
}
sqlite3_exec(db_, "BEGIN TRANSACTION;", nullptr, nullptr, nullptr);
const char* del_sql = "DELETE FROM user_devices WHERE user_id = ?;";
sqlite3_stmt* del_stmt = nullptr;
if (sqlite3_prepare_v2(db_, del_sql, -1, &del_stmt, nullptr) != SQLITE_OK) {
sqlite3_exec(db_, "ROLLBACK;", nullptr, nullptr, nullptr);
return false;
}
sqlite3_bind_text(del_stmt, 1, user_id.c_str(), -1, SQLITE_TRANSIENT);
if (sqlite3_step(del_stmt) != SQLITE_DONE) {
sqlite3_finalize(del_stmt);
sqlite3_exec(db_, "ROLLBACK;", nullptr, nullptr, nullptr);
return false;
}
sqlite3_finalize(del_stmt);
const char* ins_sql =
"INSERT INTO user_devices (user_id, device_id) VALUES (?, ?);";
sqlite3_stmt* ins_stmt = nullptr;
if (sqlite3_prepare_v2(db_, ins_sql, -1, &ins_stmt, nullptr) != SQLITE_OK) {
sqlite3_exec(db_, "ROLLBACK;", nullptr, nullptr, nullptr);
return false;
}
for (const auto& id : device_ids) {
sqlite3_bind_text(ins_stmt, 1, user_id.c_str(), -1, SQLITE_TRANSIENT);
sqlite3_bind_text(ins_stmt, 2, id.c_str(), -1, SQLITE_TRANSIENT);
if (sqlite3_step(ins_stmt) != SQLITE_DONE) {
sqlite3_finalize(ins_stmt);
sqlite3_exec(db_, "ROLLBACK;", nullptr, nullptr, nullptr);
return false;
}
sqlite3_reset(ins_stmt);
sqlite3_clear_bindings(ins_stmt);
}
sqlite3_finalize(ins_stmt);
sqlite3_exec(db_, "COMMIT;", nullptr, nullptr, nullptr);
return true;
}
std::vector<std::string> DeviceDBManager::GetUserDevices(
const std::string& user_id) {
std::vector<std::string> devices;
if (db_ == nullptr) {
LOG_ERROR("Database is not initialized in GetUserDevices.");
return devices;
}
const char* sql = "SELECT device_id FROM user_devices WHERE user_id = ?;";
sqlite3_stmt* stmt = nullptr;
if (sqlite3_prepare_v2(db_, sql, -1, &stmt, nullptr) != SQLITE_OK) {
return devices;
}
sqlite3_bind_text(stmt, 1, user_id.c_str(), -1, SQLITE_TRANSIENT);
while (sqlite3_step(stmt) == SQLITE_ROW) {
devices.emplace_back(
reinterpret_cast<const char*>(sqlite3_column_text(stmt, 0)));
}
sqlite3_finalize(stmt);
return devices;
}
bool DeviceDBManager::RemoveDevice(const std::string& device_id) {
if (db_ == nullptr) {
LOG_ERROR("Database is not initialized in RemoveDevice.");
@@ -402,3 +492,71 @@ bool DeviceDBManager::RemoveDevice(const std::string& device_id) {
sqlite3_finalize(stmt);
return success;
}
bool DeviceDBManager::SetDeviceOnline(const std::string& device_id,
bool online) {
if (db_ == nullptr) {
LOG_ERROR("Database is not initialized in SetDeviceOnline.");
return false;
}
const char* sql =
"INSERT INTO device_presence (device_id, online, updated_at) "
"VALUES (?, ?, strftime('%s','now')) "
"ON CONFLICT(device_id) DO UPDATE SET online=excluded.online, "
"updated_at=excluded.updated_at;";
sqlite3_stmt* stmt = nullptr;
if (sqlite3_prepare_v2(db_, sql, -1, &stmt, nullptr) != SQLITE_OK) {
return false;
}
sqlite3_bind_text(stmt, 1, device_id.c_str(), -1, SQLITE_TRANSIENT);
sqlite3_bind_int(stmt, 2, online ? 1 : 0);
bool ok = (sqlite3_step(stmt) == SQLITE_DONE);
sqlite3_finalize(stmt);
return ok;
}
std::vector<std::pair<std::string, bool>> DeviceDBManager::BatchQueryOnline(
const std::vector<std::string>& device_ids) {
std::vector<std::pair<std::string, bool>> result;
if (db_ == nullptr) {
LOG_ERROR("Database is not initialized in BatchQueryOnline.");
return result;
}
if (device_ids.empty()) {
return result;
}
std::stringstream ss;
ss << "SELECT device_id, online FROM device_presence WHERE device_id IN (";
for (size_t i = 0; i < device_ids.size(); ++i) {
ss << (i == 0 ? "?" : ",?");
}
ss << ");";
std::string sql = ss.str();
sqlite3_stmt* stmt = nullptr;
if (sqlite3_prepare_v2(db_, sql.c_str(), -1, &stmt, nullptr) != SQLITE_OK) {
return result;
}
for (size_t i = 0; i < device_ids.size(); ++i) {
sqlite3_bind_text(stmt, static_cast<int>(i + 1), device_ids[i].c_str(), -1,
SQLITE_TRANSIENT);
}
std::unordered_map<std::string, bool> map;
while (sqlite3_step(stmt) == SQLITE_ROW) {
std::string id(reinterpret_cast<const char*>(sqlite3_column_text(stmt, 0)));
int online = sqlite3_column_int(stmt, 1);
map[id] = (online != 0);
}
sqlite3_finalize(stmt);
for (const auto& id : device_ids) {
auto it = map.find(id);
bool online = (it != map.end()) ? it->second : false;
result.emplace_back(id, online);
}
return result;
}

View File

@@ -10,6 +10,7 @@
#include <sqlite3.h>
#include <string>
#include <vector>
struct DeviceCredential {
std::string device_id;
@@ -34,6 +35,13 @@ class DeviceDBManager {
int VerifyDevice(const std::string& device_id, const std::string& password);
bool RemoveDevice(const std::string& device_id);
bool SetDeviceOnline(const std::string& device_id, bool online);
std::vector<std::pair<std::string, bool>> BatchQueryOnline(
const std::vector<std::string>& device_ids);
bool SetUserDevices(const std::string& user_id,
const std::vector<std::string>& device_ids);
std::vector<std::string> GetUserDevices(const std::string& user_id);
private:
void InitDB();
std::string Sha256(const std::string& str);

70
src/presence_manager.cpp Normal file
View File

@@ -0,0 +1,70 @@
#include "presence_manager.h"
#include <nlohmann/json.hpp>
void PresenceManager::OnLogin(const std::string& user_id,
const std::string& device_id,
websocketpp::connection_hdl hdl) {
if (db_) {
db_->SetDeviceOnline(device_id, true);
}
NotifyUserDevices(user_id, device_id, true);
}
void PresenceManager::OnLogout(const std::string& device_id) {
std::string user_id;
user_id = device_id;
if (db_) {
db_->SetDeviceOnline(device_id, false);
}
if (!user_id.empty()) {
NotifyUserDevices(user_id, device_id, false);
}
}
bool PresenceManager::IsOnline(const std::string& device_id) const {
if (!db_) return false;
auto res = db_->BatchQueryOnline({device_id});
return !res.empty() && res[0].second;
}
std::vector<std::pair<std::string, bool>> PresenceManager::BatchQuery(
const std::vector<std::string>& device_ids) const {
std::vector<std::pair<std::string, bool>> result;
if (db_) {
return db_->BatchQueryOnline(device_ids);
}
return result;
}
void PresenceManager::NotifyUserDevices(const std::string& user_id,
const std::string& changed_device_id,
bool online) {
if (!db_ || !send_msg_) return;
auto devices = db_->GetUserDevices(user_id);
if (devices.empty()) return;
auto statuses = db_->BatchQueryOnline(devices);
std::vector<std::string> targets;
for (const auto& p : statuses) {
if (p.first == changed_device_id) continue;
if (p.second) {
targets.push_back(p.first);
}
}
nlohmann::json j = {
{"type", "presence_update"},
{"id", changed_device_id},
{"online", online},
};
for (auto& id : targets) {
if (send_to_device_) {
send_to_device_(id, j);
}
}
}
void PresenceManager::UpdateUserDevices(
const std::string& user_id, const std::vector<std::string>& device_ids) {
if (!db_) return;
db_->SetUserDevices(user_id, device_ids);
}

55
src/presence_manager.h Normal file
View File

@@ -0,0 +1,55 @@
/*
* @Author: DI JUNKUN
* @Date: 2026-02-28
* Copyright (c) 2026 by DI JUNKUN, All Rights Reserved.
*/
#ifndef _PRESENCE_MANAGER_H_
#define _PRESENCE_MANAGER_H_
#include <functional>
#include <nlohmann/json.hpp>
#include <string>
#include <vector>
#include <websocketpp/server.hpp>
#include "device_db_manager.h"
using nlohmann::json;
struct OnlineSession {
std::string user_id;
std::string device_id;
websocketpp::connection_hdl hdl;
};
class PresenceManager {
public:
void SetDeviceDB(DeviceDBManager* db) { db_ = db; }
void SetSendMsgCallback(
std::function<void(websocketpp::connection_hdl, json)> send_msg) {
send_msg_ = send_msg;
}
void SetSendToDeviceCallback(
std::function<void(const std::string&, json)> send_to_device) {
send_to_device_ = send_to_device;
}
void OnLogin(const std::string& user_id, const std::string& device_id,
websocketpp::connection_hdl hdl);
void OnLogout(const std::string& device_id);
bool IsOnline(const std::string& device_id) const;
std::vector<std::pair<std::string, bool>> BatchQuery(
const std::vector<std::string>& device_ids) const;
void NotifyUserDevices(const std::string& user_id,
const std::string& changed_device_id, bool online);
void UpdateUserDevices(const std::string& user_id,
const std::vector<std::string>& device_ids);
private:
std::function<void(websocketpp::connection_hdl, json)> send_msg_;
DeviceDBManager* db_ = nullptr;
std::function<void(const std::string&, json)> send_to_device_;
};
#endif

View File

@@ -16,10 +16,9 @@ bool GetStringField(const json& j, const char* key, std::string& value) {
SignalNegotiation::SignalNegotiation(
std::shared_ptr<TransmissionManager> transmission_manager,
std::string db_path)
: transmission_manager_(transmission_manager) {
device_db_manager_ = std::make_unique<DeviceDBManager>(db_path);
}
DeviceDBManager* device_db)
: transmission_manager_(transmission_manager),
device_db_manager_(device_db) {}
SignalNegotiation::~SignalNegotiation() {}
@@ -409,4 +408,4 @@ void SignalNegotiation::OnWebClientDisconnect(const std::string& user_id) {
pure_user_id);
}
}
}
}

View File

@@ -17,7 +17,7 @@ using nlohmann::json;
class SignalNegotiation {
public:
SignalNegotiation(std::shared_ptr<TransmissionManager> transmission_manager,
std::string db_path);
DeviceDBManager* device_db);
~SignalNegotiation();
void SetSendMsgCallback(
@@ -37,8 +37,8 @@ class SignalNegotiation {
private:
std::shared_ptr<TransmissionManager> transmission_manager_;
std::unique_ptr<DeviceDBManager> device_db_manager_;
DeviceDBManager* device_db_manager_;
std::function<void(websocketpp::connection_hdl, json)> send_msg_;
};
#endif
#endif

View File

@@ -31,11 +31,21 @@ SignalServer::SignalServer() {
std::placeholders::_2));
transmission_manager_ = std::make_shared<TransmissionManager>();
signal_negotiation_ =
std::make_unique<SignalNegotiation>(transmission_manager_, db_path_);
device_db_manager_ = std::make_unique<DeviceDBManager>(db_path_);
signal_negotiation_ = std::make_unique<SignalNegotiation>(
transmission_manager_, device_db_manager_.get());
signal_negotiation_->SetSendMsgCallback(std::bind(&SignalServer::SendMsg,
this, std::placeholders::_1,
std::placeholders::_2));
presence_manager_ = std::make_unique<PresenceManager>();
presence_manager_->SetSendMsgCallback(std::bind(&SignalServer::SendMsg, this,
std::placeholders::_1,
std::placeholders::_2));
presence_manager_->SetDeviceDB(device_db_manager_.get());
presence_manager_->SetSendToDeviceCallback(
[this](const std::string& id, json msg) {
SendMsg(transmission_manager_->GetWsHandle(id), msg);
});
}
SignalServer::SignalServer(uint16_t port, std::string certs_dir,
@@ -69,11 +79,21 @@ SignalServer::SignalServer(uint16_t port, std::string certs_dir,
std::placeholders::_2));
transmission_manager_ = std::make_shared<TransmissionManager>();
signal_negotiation_ =
std::make_unique<SignalNegotiation>(transmission_manager_, db_path_);
device_db_manager_ = std::make_unique<DeviceDBManager>(db_path_);
signal_negotiation_ = std::make_unique<SignalNegotiation>(
transmission_manager_, device_db_manager_.get());
signal_negotiation_->SetSendMsgCallback(std::bind(&SignalServer::SendMsg,
this, std::placeholders::_1,
std::placeholders::_2));
presence_manager_ = std::make_unique<PresenceManager>();
presence_manager_->SetSendMsgCallback(std::bind(&SignalServer::SendMsg, this,
std::placeholders::_1,
std::placeholders::_2));
presence_manager_->SetDeviceDB(device_db_manager_.get());
presence_manager_->SetSendToDeviceCallback(
[this](const std::string& id, json msg) {
SendMsg(transmission_manager_->GetWsHandle(id), msg);
});
}
SignalServer::~SignalServer() {}
@@ -95,6 +115,9 @@ bool SignalServer::OnClose(websocketpp::connection_hdl hdl) {
signal_negotiation_->OnWebClientDisconnect(user_id);
}
}
if (presence_manager_ && !user_id.empty()) {
presence_manager_->OnLogout(user_id);
}
ws_connections_.erase(hdl);
return true;
}
@@ -111,6 +134,9 @@ bool SignalServer::OnFail(websocketpp::connection_hdl hdl) {
signal_negotiation_->OnWebClientDisconnect(user_id);
}
}
if (presence_manager_ && !user_id.empty()) {
presence_manager_->OnLogout(user_id);
}
ws_connections_.erase(hdl);
return true;
}
@@ -271,6 +297,12 @@ void SignalServer::OnMessage(websocketpp::connection_hdl hdl,
}
case "login"_H:
signal_negotiation_->login_user(hdl, j);
if (presence_manager_) {
std::string id = transmission_manager_->GetUserId(hdl);
if (!id.empty()) {
presence_manager_->OnLogin(id, id, hdl);
}
}
break;
case "user_leave_transmission"_H:
signal_negotiation_->leave_transmission(hdl, j);
@@ -293,6 +325,30 @@ void SignalServer::OnMessage(websocketpp::connection_hdl hdl,
case "new_candidate_mid"_H:
signal_negotiation_->new_candidate_mid(hdl, j);
break;
case "recent_connections_presence"_H: {
std::string user_id;
if (!j.contains("user_id") || !j["user_id"].is_string()) {
LOG_ERROR("recent_connections missing field: user_id");
break;
}
user_id = j["user_id"].get<std::string>();
std::vector<std::string> device_ids;
if (j.contains("devices") && j["devices"].is_array()) {
for (auto& v : j["devices"]) {
if (v.is_string()) device_ids.push_back(v.get<std::string>());
}
}
if (presence_manager_) {
presence_manager_->UpdateUserDevices(user_id, device_ids);
auto statuses = presence_manager_->BatchQuery(device_ids);
json resp = {{"type", "presence"}, {"devices", json::array()}};
for (const auto& p : statuses) {
resp["devices"].push_back({{"id", p.first}, {"online", p.second}});
}
server_.send(hdl, resp.dump(), websocketpp::frame::opcode::text);
}
break;
}
default:
LOG_WARN("Unknown message type: {}", type);
break;

View File

@@ -14,6 +14,8 @@
#include <websocketpp/config/asio.hpp>
#include <websocketpp/server.hpp>
#include "device_db_manager.h"
#include "presence_manager.h"
#include "signal_negotiation.h"
using nlohmann::json;
@@ -51,7 +53,9 @@ class SignalServer {
unsigned int ws_connection_id_ = 0;
std::shared_ptr<TransmissionManager> transmission_manager_;
std::unique_ptr<DeviceDBManager> device_db_manager_;
std::unique_ptr<SignalNegotiation> signal_negotiation_;
std::unique_ptr<PresenceManager> presence_manager_;
};
#endif
#endif

View File

@@ -52,9 +52,14 @@ target("negotiation")
add_files("src/signal_negotiation.cpp")
add_includedirs("src", {public = true})
target("server")
target("presence")
set_kind("object")
add_deps("log", "common", "negotiation")
add_files("src/presence_manager.cpp")
target("server")
set_kind("object")
add_deps("log", "common", "negotiation", "presence")
add_files("src/signal_server.cpp")
target("crossdesk_server")