2424import java .util .ArrayList ;
2525import java .util .Base64 ;
2626import java .util .List ;
27+ import java .util .regex .Matcher ;
28+ import java .util .regex .Pattern ;
2729
2830@ Service
2931@ Slf4j
@@ -50,19 +52,14 @@ public GmailWebhookService(GeminiService geminiService, JobService jobService, U
5052 @ Async ("taskExecutor" )
5153 public void processHistorySync (String userEmail ) {
5254 final String email = userEmail .toLowerCase ();
53-
55+
5456 LocalDateTime now = LocalDateTime .now ();
5557 LocalDateTime expiryThreshold = now .minusMinutes (15 );
5658
57- int updatedRows = userRepository .claimSyncLock (email , now , expiryThreshold );
58- if (updatedRows == 0 ) return ;
59-
60- cacheEvictService .evictAllForUser (email );
59+ if (userRepository .claimSyncLock (email , now , expiryThreshold ) == 0 ) return ;
6160
6261 try {
63- User user = userRepository .findByEmail (email )
64- .orElseThrow (() -> new RuntimeException ("User not found after lock" ));
65-
62+ User user = userRepository .findByEmail (email ).orElseThrow ();
6663 if (user .getGmailRefreshToken () == null ) return ;
6764
6865 String accessToken = getFreshAccessToken (user .getGmailRefreshToken ());
@@ -87,15 +84,12 @@ public void processHistorySync(String userEmail) {
8784 List <EmailBatchItem > batchItems = collectMessages (service , historyResponse .getHistory ());
8885
8986 if (!batchItems .isEmpty ()) {
90-
91- List <JobDTO > extractedJobs = geminiService .extractJobsFromBatch (batchItems );
92-
93- log .info ("Ingesting batch of {} emails via Gemini for {}" , batchItems .size (), email );
94-
95- jobService .saveBatchResults (email , batchItems , extractedJobs );
96- }
87+ log .info ("Ingesting batch of {} emails for {}" , batchItems .size (), email );
88+ List <JobDTO > extractedJobs = geminiService .extractJobsFromBatch (batchItems );
89+ jobService .saveBatchResults (email , batchItems , extractedJobs );
90+ }
9791 } catch (Exception e ) {
98- log .error ("High-Performance Sync failed for {}: " , email , e );
92+ log .error ("Sync failed for {}: " , email , e );
9993 } finally {
10094 userRepository .releaseSyncLock (email );
10195 cacheEvictService .evictAllForUser (email );
@@ -110,45 +104,105 @@ private List<EmailBatchItem> collectMessages(Gmail service, List<History> histor
110104 if (history .getMessagesAdded () == null ) continue ;
111105 for (HistoryMessageAdded added : history .getMessagesAdded ()) {
112106 try {
113- Message m = service .users ().messages ().get ("me" , added .getMessage ().getId ())
114- .setFormat ("full" ).execute ();
115-
116- long millisecondTimestamp = m .getInternalDate ();
117- LocalDateTime emailDate = LocalDateTime .ofInstant (
118- Instant .ofEpochMilli (millisecondTimestamp ), ZoneOffset .UTC );
107+ Message m = service .users ().messages ().get ("me" , added .getMessage ().getId ()).setFormat ("full" ).execute ();
108+ LocalDateTime emailDate = LocalDateTime .ofInstant (Instant .ofEpochMilli (m .getInternalDate ()), ZoneOffset .UTC );
119109
120- String from = "" , subj = "" , replyTo = "" ;
110+ String from = "" , subj = "" , replyTo = "" ;
121111 for (var h : m .getPayload ().getHeaders ()) {
122112 if ("From" .equalsIgnoreCase (h .getName ())) from = h .getValue ();
123113 if ("Subject" .equalsIgnoreCase (h .getName ())) subj = h .getValue ();
124114 if ("Reply-To" .equalsIgnoreCase (h .getName ())) replyTo = h .getValue ();
125115 }
126116
127117 if (!isSystemNoise (subj )) {
128- String body = extractTextFromBody (m .getPayload ());
118+ String body = extractProcessedBody (m .getPayload ());
129119 items .add (new EmailBatchItem (from , subj , replyTo , body , emailDate ));
130120 }
131121 } catch (Exception e ) {
132- log .warn ("Failed to fetch message {} : {}" , added . getMessage (). getId () , e .getMessage ());
122+ log .warn ("Failed message fetch : {}" , e .getMessage ());
133123 }
134124 }
135125 }
136126 return items ;
137127 }
138128
139- private String extractTextFromBody (MessagePart part ) {
129+ private String extractProcessedBody (MessagePart payload ) {
130+ StringBuilder rawBuffer = new StringBuilder ();
131+ recursiveRawCollect (payload , rawBuffer );
132+
133+ String cleaned = surgicalClean (rawBuffer .toString ());
134+
135+ return cleaned ;
136+ }
137+
138+ private void recursiveRawCollect (MessagePart part , StringBuilder buffer ) {
139+ if (part .getParts () != null ) {
140+ for (MessagePart subPart : part .getParts ()) recursiveRawCollect (subPart , buffer );
141+ }
140142 if (part .getBody () != null && part .getBody ().getData () != null ) {
141- String content = new String (Base64 .getUrlDecoder ().decode (part .getBody ().getData ()));
142- if (part .getMimeType ().contains ("text/plain" )) return content ;
143- if (part .getMimeType ().contains ("text/html" )) return content .replaceAll ("<[^>]*>" , " " );
143+ buffer .append (new String (Base64 .getUrlDecoder ().decode (part .getBody ().getData ()))).append ("\n " );
144144 }
145- if (part .getParts () != null ) {
146- for (MessagePart subPart : part .getParts ()) {
147- String text = extractTextFromBody (subPart );
148- if (text != null && !text .isBlank ()) return text ;
145+ }
146+
147+ private String surgicalClean (String rawHtml ) {
148+ if (rawHtml == null || rawHtml .isBlank ()) return "" ;
149+
150+ String content = rawHtml .replaceAll ("(?is)<style.*?>.*?</style>" , "" )
151+ .replaceAll ("(?is)<script.*?>.*?</script>" , "" );
152+
153+ StringBuilder sb = new StringBuilder ();
154+ Matcher m = Pattern .compile ("(?is)<a\\ s+[^>]*?href\\ s*=\\ s*[\" ']([^\" ']+)[\" '][^>]*?>(.*?)</a>" ).matcher (content );
155+
156+ int lastEnd = 0 ;
157+ while (m .find ()) {
158+ sb .append (content , lastEnd , m .start ());
159+
160+ String rawUrl = m .group (1 ).replace ("&" , "&" );
161+ String linkText = m .group (2 ).replaceAll ("<[^>]*>" , "" ).trim ();
162+
163+ String processedUrl = processUrlByDomain (rawUrl );
164+
165+ boolean isJobLink = processedUrl .contains ("viewjob" ) || processedUrl .contains ("confirmemail" ) ||
166+ processedUrl .contains ("linkedin.com/jobs" ) || processedUrl .contains ("careers" ) ||
167+ processedUrl .contains ("apply" );
168+
169+ if (isJobLink && processedUrl .length () > 15 ) {
170+ sb .append (" [LINK_START]" ).append (linkText ).append ("[LINK_URL]" ).append (processedUrl ).append ("[LINK_END] " );
171+ } else {
172+ sb .append (" " ).append (linkText ).append (" " );
149173 }
174+
175+ lastEnd = m .end ();
176+ }
177+ sb .append (content .substring (lastEnd ));
178+
179+ return sb .toString ()
180+ .replaceAll ("(?i)<br\\ s*/?>" , "\n " )
181+ .replaceAll ("(?i)</td>" , " " )
182+ .replaceAll ("<[^>]*>" , " " )
183+ .replaceAll (" " , " " )
184+ .replaceAll ("\\ s+" , " " )
185+ .trim ();
186+ }
187+
188+ private String processUrlByDomain (String url ) {
189+ if (url == null ) return "" ;
190+ String lowerUrl = url .toLowerCase ();
191+
192+ if (lowerUrl .contains ("linkedin.com/jobs" ) || lowerUrl .contains ("linkedin.com/comm/jobs" )) {
193+ int queryIndex = url .indexOf ("?" );
194+ return queryIndex > 0 ? url .substring (0 , queryIndex ) : url ;
150195 }
151- return "" ;
196+
197+ if (lowerUrl .contains ("indeed.com" )) {
198+ return url ;
199+ }
200+
201+ if (url .contains ("utm_" ) || url .contains ("ref=" )) {
202+ return url .replaceAll ("[?&]utm_[^&]+" , "" ).replaceAll ("[?&]ref=[^&]+" , "" );
203+ }
204+
205+ return url ;
152206 }
153207
154208 private void bootstrapUserHistory (Gmail service , User user ) throws Exception {
@@ -164,13 +218,6 @@ private boolean isSystemNoise(String subject) {
164218 return s .contains ("security alert" ) || s .contains ("sign-in" ) || s .contains ("verification code" );
165219 }
166220
167- // private void evictUserCaches(String email) {
168- // Cache userCache = cacheManager.getCache("users");
169- // Cache entityCache = cacheManager.getCache("userEntities");
170- // if (userCache != null) userCache.evict(email);
171- // if (entityCache != null) entityCache.evict(email);
172- // }
173-
174221 public String getFreshAccessToken (String refreshToken ) throws Exception {
175222 return new GoogleRefreshTokenRequest (GoogleNetHttpTransport .newTrustedTransport (), GsonFactory .getDefaultInstance (),
176223 refreshToken , clientId , clientSecret ).execute ().getAccessToken ();
0 commit comments