@@ -27,7 +27,75 @@ bool FConversationsManager::HasConversation(const Uint64 InConversationId)
2727 return (ConversationIdToConversationData.ContainsKey (InConversationId));
2828}
2929
30- void FConversationsManager::AddMessage (const Uint64 InConversationId, const Uint64 InSenderId, const std::string& InMessage)
30+ bool FConversationsManager::IsUserInConversation (const Uint64 InUserId, const Uint64 InConversationId)
31+ {
32+ const bool bConversationExists = HasConversation (InConversationId);
33+ if (bConversationExists)
34+ {
35+ std::shared_ptr<FConversationData> ConversationDataSharedPtr;
36+
37+ // Get conversation data with lock
38+ {
39+ std::shared_lock Lock (ConversationIdToConversationDataMutex);
40+ ConversationDataSharedPtr = ConversationIdToConversationData[InConversationId];
41+ }
42+
43+ // Access data with lock
44+ {
45+ std::shared_lock Lock (ConversationDataSharedPtr->Lock );
46+ return ConversationDataSharedPtr->UsersIds .Contains (InUserId);
47+ }
48+ }
49+
50+ return false ;
51+ }
52+
53+ bool FConversationsManager::IsMessageInConversation (Uint64 InMessageId, Uint64 InConversationId)
54+ {
55+ const bool bConversationExists = HasConversation (InConversationId);
56+ if (bConversationExists)
57+ {
58+ std::shared_ptr<FConversationData> ConversationDataSharedPtr;
59+
60+ // Get conversation data with lock
61+ {
62+ std::shared_lock Lock (ConversationIdToConversationDataMutex);
63+ ConversationDataSharedPtr = ConversationIdToConversationData[InConversationId];
64+ }
65+
66+ // Access data with lock
67+ // Linear search from the end
68+ {
69+ std::shared_lock Lock (ConversationDataSharedPtr->Lock );
70+ const std::deque<FConversationMessageData>& Deque = ConversationDataSharedPtr->MessagesDeque .Deque ;
71+
72+ uint16 CheckedCount = 0 ;
73+ const uint16 MaxLookback = 50 ;
74+
75+ // Go from latest messages
76+ for (auto It = Deque.begin (); It != Deque.end (); ++It)
77+ {
78+ if (It->MessageId == InMessageId)
79+ {
80+ return true ; // Found
81+ }
82+
83+ CheckedCount++;
84+
85+ // We only allow to edit last 50? Message
86+ // tbh 50 is generous anyway
87+ if (It->MessageId < InMessageId || CheckedCount > MaxLookback)
88+ {
89+ return false ;
90+ }
91+ }
92+ }
93+ }
94+
95+ return false ;
96+ }
97+
98+ Uint64 FConversationsManager::AddMessage (const Uint64 InConversationId, const Uint64 InSenderId, const std::string& InMessage)
3199{
32100 std::shared_ptr<FConversationData> ConversationPtr = GetConversation (InConversationId);
33101
@@ -41,13 +109,113 @@ void FConversationsManager::AddMessage(const Uint64 InConversationId, const Uint
41109 ConversationMessageData.Message = InMessage;
42110
43111 // Lock conversation for adding message
44- std::unique_lock Lock (ConversationIdToConversationDataMutex );
112+ std::unique_lock Lock (ConversationPtr-> Lock );
45113
46114 // Add message at begging of table as it's newest
47- ConversationPtr->MessagesMap .PushFront (ConversationMessageData);
115+ ConversationPtr->MessagesDeque .PushFront (ConversationMessageData);
48116 }
117+
118+ return OutId;
119+ }
120+
121+ void FConversationsManager::EditMessage (const Uint64 InRequesterId, const Uint64 InConversationId, const Uint64 InMessageId, const std::string& InNewMessage)
122+ {
123+ // 1. Database Update First
124+ const EDatabaseOperationResult Result = UpdateMessageEditInDB (InRequesterId, InConversationId, InMessageId, InNewMessage);
125+
126+ if (Result == EDatabaseOperationResult::Success)
127+ {
128+ std::shared_ptr<FConversationData> ConversationDataSharedPtr;
129+
130+ // 2. Find the conversation (Safe read from the global map)
131+ {
132+ std::shared_lock Lock (ConversationIdToConversationDataMutex);
133+ auto ConvIt = ConversationIdToConversationData.Map .find (InConversationId);
134+ if (ConvIt == ConversationIdToConversationData.end () || ConvIt->second == nullptr )
135+ {
136+ return ; // Conversation does not exist
137+ }
138+ ConversationDataSharedPtr = ConvIt->second ;
139+ }
140+
141+ // 3. Edit the message (WRITE lock at the conversation level)
142+ {
143+ // Using unique_lock! No other thread can read or write to this deque right now.
144+ std::unique_lock WriteLock (ConversationDataSharedPtr->Lock );
145+ std::deque<FConversationMessageData>& Deque = ConversationDataSharedPtr->MessagesDeque .Deque ;
146+
147+ int32 CurrentLoopback = 0 ;
148+ const int32 MaxLoopback = 50 ;
149+
150+ // Reverse linear search, because users usually edit recent messages.
151+ for (auto It = Deque.begin (); It != Deque.end (); ++It)
152+ {
153+ if (It->MessageId == InMessageId)
154+ {
155+ // Found it! Replace the content.
156+ It->Message = InNewMessage;
157+ It->Status = EConversationMessageStatus::Edited;
158+
159+ break ;
160+ }
161+
162+ // Optimization: If we hit a smaller ID, the message cannot be here anymore.
163+ if (It->MessageId < InMessageId || CurrentLoopback >= MaxLoopback)
164+ {
165+ break ;
166+ }
167+ }
168+ }
169+
170+ // Remember to broadcast change when using this function
171+ }
49172}
50173
174+ void FConversationsManager::DeleteMessage (const Uint64 InRequesterId, const Uint64 InConversationId, const Uint64 InMessageId)
175+ {
176+ // 1. Database Update First
177+ const EDatabaseOperationResult Result = UpdateMessageDeleteInDB (InRequesterId, InConversationId, InMessageId);
178+
179+ if (Result == EDatabaseOperationResult::Success)
180+ {
181+ std::shared_ptr<FConversationData> ConversationDataSharedPtr;
182+
183+ // 2. Find the conversation
184+ {
185+ std::shared_lock Lock (ConversationIdToConversationDataMutex);
186+ auto ConvIt = ConversationIdToConversationData.Map .find (InConversationId);
187+ if (ConvIt == ConversationIdToConversationData.end () || ConvIt->second == nullptr )
188+ {
189+ return ;
190+ }
191+ ConversationDataSharedPtr = ConvIt->second ;
192+ }
193+
194+ // 3. Delete the message (WRITE lock at the conversation level)
195+ {
196+ std::unique_lock WriteLock (ConversationDataSharedPtr->Lock );
197+ std::deque<FConversationMessageData>& Deque = ConversationDataSharedPtr->MessagesDeque .Deque ;
198+
199+ auto It = std::lower_bound (
200+ Deque.begin (),
201+ Deque.end (),
202+ InMessageId,
203+ [](const FConversationMessageData& Msg, const Uint64 Id) {
204+ return Msg.MessageId < Id;
205+ }
206+ );
207+
208+ if (It != Deque.end () && It->MessageId == InMessageId)
209+ {
210+ // std::deque::erase removes the element and shifts the rest.
211+ // This is a relatively fast operation for std::deque.
212+ Deque.erase (It);
213+ }
214+ }
215+
216+ // Remember to broadcast change when using this function
217+ }
218+ }
51219void FConversationsManager::GetLastConversationByUserId (const Uint64 InUserId, const int32 Offset, const int32 Limit, CArray<Uint64>& OutConversationIds)
52220{
53221 // @TODO We could for sure optimize this.
@@ -70,7 +238,7 @@ std::vector<FConversationMessageData> FConversationsManager::GetConversationMess
70238{
71239 std::vector<FConversationMessageData> ConversationMessage;
72240
73- const int32 CurrentMessagesCount = Conversation->MessagesMap .Size ();
241+ const int32 CurrentMessagesCount = Conversation->MessagesDeque .Size ();
74242 const int32 TargetMessagesCount = Offset + Count;
75243 if (CurrentMessagesCount < TargetMessagesCount)
76244 {
@@ -83,13 +251,13 @@ std::vector<FConversationMessageData> FConversationsManager::GetConversationMess
83251 // Add messages to cache
84252 for (const FConversationMessageData& Message : ConversationMessage)
85253 {
86- Conversation->MessagesMap .PushBack (Message);
254+ Conversation->MessagesDeque .PushBack (Message);
87255 }
88256
89257 // Add any present in memory but skipped in download
90258 if (TargetOffset != Offset)
91259 {
92- std::vector<FConversationMessageData> ConversationMessageInMemoryPart = Conversation->MessagesMap .GetRange (Offset, TargetOffset);
260+ std::vector<FConversationMessageData> ConversationMessageInMemoryPart = Conversation->MessagesDeque .GetRange (Offset, TargetOffset);
93261
94262 for (FConversationMessageData& MessageInMemoryPart : ConversationMessageInMemoryPart)
95263 {
@@ -99,7 +267,7 @@ std::vector<FConversationMessageData> FConversationsManager::GetConversationMess
99267 }
100268 else
101269 {
102- ConversationMessage = Conversation->MessagesMap .GetRange (Offset, Count);
270+ ConversationMessage = Conversation->MessagesDeque .GetRange (Offset, Count);
103271 }
104272
105273 return ConversationMessage;
@@ -175,10 +343,19 @@ void FConversationsManager::DownloadConversationsFromRange(const Uint64 UserId,
175343 AddConversationToCache (Conversation.ConversationId , Participants, LastReadMessageIds);
176344
177345 const std::shared_ptr<FConversationData> ConversationPtr = GetConversation (Conversation.ConversationId );
178- const std::vector<FConversationMessageData> Messages = DownloadConversationMessages (Conversation. ConversationId , 0 , 40 );
179- for ( const FConversationMessageData& Message : Messages)
346+
347+ std::vector< FConversationMessageData> Messages;
180348 {
181- ConversationPtr->MessagesMap .PushBack (Message);
349+ std::shared_lock<std::shared_mutex> Lock (ConversationPtr->Lock );
350+ Messages = DownloadConversationMessages (Conversation.ConversationId , 0 , 40 );
351+ }
352+
353+ {
354+ std::unique_lock Lock (ConversationPtr->Lock );
355+ for (const FConversationMessageData& Message : Messages)
356+ {
357+ ConversationPtr->MessagesDeque .PushBack (Message);
358+ }
182359 }
183360 }
184361}
@@ -287,6 +464,97 @@ std::vector<FConversationMessageData> FConversationsManager::DownloadConversatio
287464 return ConversationData;
288465}
289466
467+ EDatabaseOperationResult FConversationsManager::UpdateMessageEditInDB (const Uint64 RequesterUserId, const Uint64 InConversationId, const Uint64 InMessageId, const std::string& InNewMessage)
468+ {
469+ try
470+ {
471+ FDataBaseConnect Connect;
472+ if (Connect.IsConnected ())
473+ {
474+ soci::session& DataBaseSession = Connect.GetSession ();
475+
476+ // cast to int for safety
477+ int32 StatusInt = static_cast <int32>(EConversationMessageStatus::Edited);
478+
479+ soci::statement St = (DataBaseSession.prepare <<
480+ " UPDATE messages SET text = :text, text_status = :status "
481+ " WHERE id = :msgId AND conversation_id = :convId AND sender_id = :senderId" ,
482+ soci::use (InNewMessage, " text" ),
483+ soci::use (StatusInt, " status" ),
484+ soci::use (InMessageId, " msgId" ),
485+ soci::use (InConversationId, " convId" ),
486+ soci::use (RequesterUserId, " senderId" )
487+ );
488+
489+ St.execute ();
490+
491+ // Check how many rows were affected
492+ if (St.get_affected_rows () == 0 )
493+ {
494+ LOG_ERROR (" No rows affected" );
495+
496+ return EDatabaseOperationResult::DatabaseFailed;
497+ }
498+
499+ return EDatabaseOperationResult::Success;
500+ }
501+ }
502+ catch (const soci::soci_error& e)
503+ {
504+ LOG_ERROR (" Database error during message edit: " << e.what ());
505+
506+ return EDatabaseOperationResult::OperationFailed;
507+ }
508+
509+ return EDatabaseOperationResult::Unknown;
510+ }
511+
512+ EDatabaseOperationResult FConversationsManager::UpdateMessageDeleteInDB (const Uint64 RequesterUserId, const Uint64 InConversationId, const Uint64 InMessageId)
513+ {
514+ try
515+ {
516+ FDataBaseConnect Connect;
517+ if (Connect.IsConnected ())
518+ {
519+ soci::session& DataBaseSession = Connect.GetSession ();
520+
521+ int32 StatusInt = static_cast <int32>(EConversationMessageStatus::Deleted);
522+
523+ soci::indicator NullIndicator = soci::i_null;
524+ std::string EmptyText = " " ;
525+ int32 EmptyEncryptType = 0 ; // Ignored, using indicator
526+
527+ soci::statement St = (DataBaseSession.prepare <<
528+ " UPDATE messages SET "
529+ " text = :text, "
530+ " text_status = :status, "
531+ " text_encrypt_type = :encType, "
532+ " text_encryption_value = :encVal "
533+ " WHERE id = :msgId AND conversation_id = :convId AND sender_id = :senderId" ,
534+ soci::use (EmptyText, " text" ),
535+ soci::use (StatusInt, " status" ),
536+ soci::use (EmptyEncryptType, NullIndicator, " encType" ), // Set NULL
537+ soci::use (EmptyText, NullIndicator, " encVal" ), // Set NULL
538+ soci::use (InMessageId, " msgId" ),
539+ soci::use (InConversationId, " convId" ),
540+ soci::use (RequesterUserId, " senderId" )
541+ );
542+
543+ St.execute ();
544+
545+ return EDatabaseOperationResult::Success;
546+ }
547+ }
548+ catch (const soci::soci_error& e)
549+ {
550+ LOG_ERROR (" Database error during message deletion: " << e.what ());
551+
552+ return EDatabaseOperationResult::OperationFailed;
553+ }
554+
555+ return EDatabaseOperationResult::Unknown;
556+ }
557+
290558Uint64 FConversationsManager::UploadOrGetConversation (const std::vector<Uint64>& UserIds, bool & bIsNewConversation)
291559{
292560 // Declare ConversationId properly
0 commit comments