pubmed_client/pubmed/client/history.rs
1//! History server operations (EPost, fetch from history, streaming search)
2
3use crate::common::PubMedId;
4use crate::error::{PubMedError, Result};
5use crate::pubmed::models::{EPostResult, HistorySession, PubMedArticle, SearchResult};
6use crate::pubmed::parser::parse_articles_from_xml;
7use crate::pubmed::query::SortOrder;
8use crate::pubmed::responses::{EPostResponse, ESearchResult};
9use crate::retry::with_retry;
10use tracing::{debug, info, instrument, warn};
11
12use super::PubMedClient;
13
14/// State machine for streaming search results
15#[cfg(not(target_arch = "wasm32"))]
16enum SearchAllState {
17 /// Initial state before search
18 Initial { query: String, batch_size: usize },
19 /// Fetching articles from history server
20 Fetching {
21 session: HistorySession,
22 total: usize,
23 batch_size: usize,
24 current_offset: usize,
25 pending_articles: Vec<PubMedArticle>,
26 article_index: usize,
27 },
28 /// All articles have been fetched
29 Done,
30}
31
32impl PubMedClient {
33 /// Search for articles with history server support
34 ///
35 /// This method enables NCBI's history server feature, which stores search results
36 /// on the server and returns WebEnv/query_key identifiers. These can be used
37 /// with `fetch_from_history()` to efficiently paginate through large result sets.
38 ///
39 /// # Arguments
40 ///
41 /// * `query` - Search query string
42 /// * `limit` - Maximum number of PMIDs to return in the initial response
43 ///
44 /// # Returns
45 ///
46 /// Returns a `Result<SearchResult>` containing PMIDs and history session information
47 ///
48 /// # Example
49 ///
50 /// ```no_run
51 /// use pubmed_client::PubMedClient;
52 ///
53 /// #[tokio::main]
54 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
55 /// let client = PubMedClient::new();
56 /// let result = client.search_with_history("covid-19", 100).await?;
57 ///
58 /// println!("Total results: {}", result.total_count);
59 /// println!("First batch: {} PMIDs", result.pmids.len());
60 ///
61 /// // Use history session to fetch more results
62 /// if let Some(session) = result.history_session() {
63 /// let next_batch = client.fetch_from_history(&session, 100, 100).await?;
64 /// println!("Next batch: {} articles", next_batch.len());
65 /// }
66 ///
67 /// Ok(())
68 /// }
69 /// ```
70 #[instrument(skip(self), fields(query = %query, limit = limit))]
71 pub async fn search_with_history(&self, query: &str, limit: usize) -> Result<SearchResult> {
72 self.search_with_history_and_options(query, limit, None)
73 .await
74 }
75
76 /// Search for articles with history server support and sort options
77 ///
78 /// This method enables NCBI's history server feature, which stores search results
79 /// on the server and returns WebEnv/query_key identifiers. These can be used
80 /// with `fetch_from_history()` to efficiently paginate through large result sets.
81 ///
82 /// Also returns query translation showing how PubMed interpreted the query.
83 ///
84 /// # Arguments
85 ///
86 /// * `query` - Search query string
87 /// * `limit` - Maximum number of PMIDs to return in the initial response
88 /// * `sort` - Optional sort order for results
89 ///
90 /// # Returns
91 ///
92 /// Returns a `Result<SearchResult>` containing PMIDs, history session, and query translation
93 ///
94 /// # Example
95 ///
96 /// ```no_run
97 /// use pubmed_client::{PubMedClient, pubmed::SortOrder};
98 ///
99 /// #[tokio::main]
100 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
101 /// let client = PubMedClient::new();
102 /// let result = client
103 /// .search_with_history_and_options("asthma", 100, Some(&SortOrder::PublicationDate))
104 /// .await?;
105 ///
106 /// println!("Total results: {}", result.total_count);
107 /// if let Some(translation) = &result.query_translation {
108 /// println!("Query interpreted as: {}", translation);
109 /// }
110 /// Ok(())
111 /// }
112 /// ```
113 #[instrument(skip(self, sort), fields(query = %query, limit = limit))]
114 pub async fn search_with_history_and_options(
115 &self,
116 query: &str,
117 limit: usize,
118 sort: Option<&SortOrder>,
119 ) -> Result<SearchResult> {
120 if query.trim().is_empty() {
121 debug!("Empty query provided, returning empty results");
122 return Ok(SearchResult {
123 pmids: Vec::new(),
124 total_count: 0,
125 webenv: None,
126 query_key: None,
127 query_translation: None,
128 });
129 }
130
131 // Use usehistory=y to enable history server
132 let mut url = format!(
133 "{}/esearch.fcgi?db=pubmed&term={}&retmax={}&retstart={}&retmode=json&usehistory=y",
134 self.base_url,
135 urlencoding::encode(query),
136 limit,
137 0
138 );
139
140 if let Some(sort_order) = sort {
141 url.push_str(&format!("&sort={}", sort_order.as_api_param()));
142 }
143
144 debug!("Making ESearch API request with history");
145 let response = self.make_request(&url).await?;
146
147 let search_result: ESearchResult = response.json().await?;
148
149 // Check for API error response
150 if let Some(error_msg) = &search_result.esearchresult.error {
151 return Err(PubMedError::ApiError {
152 status: 200,
153 message: format!("NCBI ESearch API error: {}", error_msg),
154 });
155 }
156
157 let total_count: usize = search_result
158 .esearchresult
159 .count
160 .as_ref()
161 .and_then(|c| c.parse().ok())
162 .unwrap_or(0);
163
164 info!(
165 total_count = total_count,
166 returned_count = search_result.esearchresult.idlist.len(),
167 has_webenv = search_result.esearchresult.webenv.is_some(),
168 query_translation = ?search_result.esearchresult.querytranslation,
169 "Search with history completed"
170 );
171
172 Ok(SearchResult {
173 pmids: search_result.esearchresult.idlist,
174 total_count,
175 webenv: search_result.esearchresult.webenv,
176 query_key: search_result.esearchresult.query_key,
177 query_translation: search_result.esearchresult.querytranslation,
178 })
179 }
180
181 /// Upload a list of PMIDs to the NCBI History server using EPost
182 ///
183 /// This stores the UIDs on the server and returns WebEnv/query_key identifiers
184 /// that can be used with `fetch_from_history()` to retrieve article metadata.
185 ///
186 /// This is useful when you have a pre-existing list of PMIDs (e.g., from a file,
187 /// database, or external source) and want to use them with history server features
188 /// like batch fetching.
189 ///
190 /// # Arguments
191 ///
192 /// * `pmids` - Slice of PubMed IDs as strings
193 ///
194 /// # Returns
195 ///
196 /// Returns a `Result<EPostResult>` containing WebEnv and query_key
197 ///
198 /// # Errors
199 ///
200 /// * `ParseError::InvalidPmid` - If any PMID is invalid
201 /// * `PubMedError::RequestError` - If the HTTP request fails
202 /// * `PubMedError::ApiError` - If the NCBI API returns an error
203 ///
204 /// # Example
205 ///
206 /// ```no_run
207 /// use pubmed_client::PubMedClient;
208 ///
209 /// #[tokio::main]
210 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
211 /// let client = PubMedClient::new();
212 ///
213 /// // Upload PMIDs to the history server
214 /// let result = client.epost(&["31978945", "33515491", "25760099"]).await?;
215 ///
216 /// println!("WebEnv: {}", result.webenv);
217 /// println!("Query Key: {}", result.query_key);
218 ///
219 /// // Use the session to fetch articles
220 /// let session = result.history_session();
221 /// let articles = client.fetch_from_history(&session, 0, 100).await?;
222 /// println!("Fetched {} articles", articles.len());
223 ///
224 /// Ok(())
225 /// }
226 /// ```
227 #[instrument(skip(self), fields(pmids_count = pmids.len()))]
228 pub async fn epost(&self, pmids: &[&str]) -> Result<EPostResult> {
229 self.epost_internal(pmids, None).await
230 }
231
232 /// Upload PMIDs to an existing History server session using EPost
233 ///
234 /// This appends UIDs to an existing WebEnv session, allowing you to combine
235 /// multiple sets of IDs into a single session for subsequent operations.
236 ///
237 /// # Arguments
238 ///
239 /// * `pmids` - Slice of PubMed IDs as strings
240 /// * `session` - Existing history session to append to
241 ///
242 /// # Returns
243 ///
244 /// Returns a `Result<EPostResult>` with the updated session information.
245 /// The returned `webenv` will be the same as the input session, and a new
246 /// `query_key` will be assigned for the uploaded IDs.
247 ///
248 #[instrument(skip(self), fields(pmids_count = pmids.len()))]
249 pub async fn epost_to_session(
250 &self,
251 pmids: &[&str],
252 session: &HistorySession,
253 ) -> Result<EPostResult> {
254 self.epost_internal(pmids, Some(session)).await
255 }
256
257 /// Internal implementation for EPost
258 async fn epost_internal(
259 &self,
260 pmids: &[&str],
261 session: Option<&HistorySession>,
262 ) -> Result<EPostResult> {
263 if pmids.is_empty() {
264 return Err(PubMedError::InvalidQuery(
265 "PMID list cannot be empty for EPost".to_string(),
266 ));
267 }
268
269 // Validate all PMIDs upfront
270 let validated: Vec<u32> = pmids
271 .iter()
272 .map(|pmid| {
273 PubMedId::parse(pmid)
274 .map(|p| p.as_u32())
275 .map_err(PubMedError::from)
276 })
277 .collect::<Result<Vec<_>>>()?;
278
279 let id_list: String = validated
280 .iter()
281 .map(|id| id.to_string())
282 .collect::<Vec<_>>()
283 .join(",");
284
285 // Build form data for POST request
286 let mut params = vec![
287 ("db".to_string(), "pubmed".to_string()),
288 ("id".to_string(), id_list),
289 ("retmode".to_string(), "json".to_string()),
290 ];
291
292 if let Some(session) = session {
293 params.push(("WebEnv".to_string(), session.webenv.clone()));
294 }
295
296 // Append API parameters (api_key, email, tool)
297 params.extend(self.config().build_api_params());
298
299 let url = format!("{}/epost.fcgi", self.base_url);
300
301 debug!(pmids_count = pmids.len(), "Making EPost API request");
302
303 let response = with_retry(
304 || async {
305 self.rate_limiter().acquire().await?;
306 debug!("Making POST request to: {}", url);
307 let response = self
308 .http_client()
309 .post(&url)
310 .form(¶ms)
311 .send()
312 .await
313 .map_err(PubMedError::from)?;
314
315 if response.status().is_server_error() || response.status().as_u16() == 429 {
316 return Err(PubMedError::ApiError {
317 status: response.status().as_u16(),
318 message: response
319 .status()
320 .canonical_reason()
321 .unwrap_or("Unknown error")
322 .to_string(),
323 });
324 }
325
326 Ok(response)
327 },
328 &self.config().retry_config,
329 "NCBI EPost API request",
330 )
331 .await?;
332
333 if !response.status().is_success() {
334 warn!("EPost request failed with status: {}", response.status());
335 return Err(PubMedError::ApiError {
336 status: response.status().as_u16(),
337 message: response
338 .status()
339 .canonical_reason()
340 .unwrap_or("Unknown error")
341 .to_string(),
342 });
343 }
344
345 let epost_response: EPostResponse = response.json().await?;
346
347 // Check for API error
348 if let Some(error_msg) = &epost_response.epostresult.error {
349 return Err(PubMedError::ApiError {
350 status: 200,
351 message: format!("NCBI EPost API error: {}", error_msg),
352 });
353 }
354
355 let webenv = epost_response
356 .epostresult
357 .webenv
358 .ok_or_else(|| PubMedError::WebEnvNotAvailable)?;
359
360 let query_key = epost_response
361 .epostresult
362 .query_key
363 .ok_or_else(|| PubMedError::WebEnvNotAvailable)?;
364
365 info!(
366 pmids_count = pmids.len(),
367 query_key = %query_key,
368 "EPost completed successfully"
369 );
370
371 Ok(EPostResult { webenv, query_key })
372 }
373
374 /// Fetch articles from history server using WebEnv session
375 ///
376 /// This method retrieves articles from a previously executed search using
377 /// the history server. It's useful for paginating through large result sets
378 /// without re-running the search query.
379 ///
380 /// # Arguments
381 ///
382 /// * `session` - History session containing WebEnv and query_key
383 /// * `start` - Starting index (0-based) for pagination
384 /// * `max` - Maximum number of articles to fetch
385 ///
386 /// # Returns
387 ///
388 /// Returns a `Result<Vec<PubMedArticle>>` containing the fetched articles
389 ///
390 /// # Note
391 ///
392 /// WebEnv sessions typically expire after 1 hour of inactivity.
393 ///
394 /// # Example
395 ///
396 /// ```no_run
397 /// use pubmed_client::PubMedClient;
398 ///
399 /// #[tokio::main]
400 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
401 /// let client = PubMedClient::new();
402 ///
403 /// // First, search with history
404 /// let result = client.search_with_history("cancer treatment", 100).await?;
405 ///
406 /// if let Some(session) = result.history_session() {
407 /// // Fetch articles 100-199
408 /// let batch2 = client.fetch_from_history(&session, 100, 100).await?;
409 /// println!("Fetched {} articles", batch2.len());
410 ///
411 /// // Fetch articles 200-299
412 /// let batch3 = client.fetch_from_history(&session, 200, 100).await?;
413 /// println!("Fetched {} more articles", batch3.len());
414 /// }
415 ///
416 /// Ok(())
417 /// }
418 /// ```
419 #[instrument(skip(self), fields(start = start, max = max))]
420 pub async fn fetch_from_history(
421 &self,
422 session: &HistorySession,
423 start: usize,
424 max: usize,
425 ) -> Result<Vec<PubMedArticle>> {
426 // Use WebEnv and query_key to fetch from history server
427 let url = format!(
428 "{}/efetch.fcgi?db=pubmed&query_key={}&WebEnv={}&retstart={}&retmax={}&retmode=xml&rettype=abstract",
429 self.base_url,
430 urlencoding::encode(&session.query_key),
431 urlencoding::encode(&session.webenv),
432 start,
433 max
434 );
435
436 debug!("Making EFetch API request from history");
437 let response = self.make_request(&url).await?;
438
439 let xml_text = response.text().await?;
440
441 // Check for empty response or error
442 if xml_text.trim().is_empty() {
443 return Ok(Vec::new());
444 }
445
446 // Check for NCBI error response
447 if xml_text.contains("<ERROR>") {
448 let error_msg = xml_text
449 .split("<ERROR>")
450 .nth(1)
451 .and_then(|s| s.split("</ERROR>").next())
452 .unwrap_or("Unknown error");
453
454 return Err(PubMedError::HistorySessionError(error_msg.to_string()));
455 }
456
457 // Parse multiple articles from XML using serde-based parser
458 let articles = parse_articles_from_xml(&xml_text)?;
459
460 info!(
461 fetched_count = articles.len(),
462 start = start,
463 "Fetched articles from history"
464 );
465
466 Ok(articles)
467 }
468
469 /// Fetch all articles for a list of PMIDs using EPost and the History server
470 ///
471 /// This is the recommended method for fetching large numbers of articles by PMID.
472 /// It uploads the PMID list to the History server via EPost (using HTTP POST to
473 /// avoid URL length limits), then fetches articles in batches using pagination.
474 ///
475 /// For small lists (up to ~200 PMIDs), `fetch_articles()` works fine. Use this
476 /// method when you have hundreds or thousands of PMIDs.
477 ///
478 /// # Arguments
479 ///
480 /// * `pmids` - Slice of PubMed IDs as strings
481 ///
482 /// # Returns
483 ///
484 /// Returns a `Result<Vec<PubMedArticle>>` containing all fetched articles
485 ///
486 /// # Example
487 ///
488 /// ```no_run
489 /// use pubmed_client::PubMedClient;
490 ///
491 /// #[tokio::main]
492 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
493 /// let client = PubMedClient::new();
494 ///
495 /// // Works efficiently even with thousands of PMIDs
496 /// let pmids: Vec<&str> = vec!["31978945", "33515491", "25760099"];
497 /// let articles = client.fetch_all_by_pmids(&pmids).await?;
498 /// println!("Fetched {} articles", articles.len());
499 ///
500 /// Ok(())
501 /// }
502 /// ```
503 #[instrument(skip(self), fields(pmids_count = pmids.len()))]
504 pub async fn fetch_all_by_pmids(&self, pmids: &[&str]) -> Result<Vec<PubMedArticle>> {
505 if pmids.is_empty() {
506 return Ok(Vec::new());
507 }
508
509 // Upload PMIDs to History server
510 let epost_result = self.epost(pmids).await?;
511 let session = epost_result.history_session();
512
513 const BATCH_SIZE: usize = 200;
514 let total = pmids.len();
515 let mut all_articles = Vec::with_capacity(total);
516 let mut offset = 0;
517
518 while offset < total {
519 let articles = self
520 .fetch_from_history(&session, offset, BATCH_SIZE)
521 .await?;
522
523 if articles.is_empty() {
524 break;
525 }
526
527 info!(
528 offset = offset,
529 fetched = articles.len(),
530 total = total,
531 "Fetched batch from history"
532 );
533
534 offset += articles.len();
535 all_articles.extend(articles);
536 }
537
538 info!(
539 total_fetched = all_articles.len(),
540 requested = pmids.len(),
541 "fetch_all_by_pmids completed"
542 );
543
544 Ok(all_articles)
545 }
546
547 /// Search and stream all matching articles using history server
548 ///
549 /// This method performs a search and returns a stream that automatically
550 /// paginates through all results using the NCBI history server. It's ideal
551 /// for processing large result sets without loading all articles into memory.
552 ///
553 /// # Arguments
554 ///
555 /// * `query` - Search query string
556 /// * `batch_size` - Number of articles to fetch per batch (recommended: 100-500)
557 ///
558 /// # Returns
559 ///
560 /// Returns a `Stream` that yields `Result<PubMedArticle>` for each article
561 ///
562 /// # Example
563 ///
564 /// ```no_run
565 /// use pubmed_client::PubMedClient;
566 /// use futures_util::StreamExt;
567 /// use std::pin::pin;
568 ///
569 /// #[tokio::main]
570 /// async fn main() -> Result<(), Box<dyn std::error::Error>> {
571 /// let client = PubMedClient::new();
572 ///
573 /// let stream = client.search_all("cancer biomarker", 100);
574 /// let mut stream = pin!(stream);
575 /// let mut count = 0;
576 ///
577 /// while let Some(result) = stream.next().await {
578 /// match result {
579 /// Ok(article) => {
580 /// count += 1;
581 /// println!("{}: {}", article.pmid, article.title);
582 /// }
583 /// Err(e) => eprintln!("Error: {}", e),
584 /// }
585 ///
586 /// // Stop after 1000 articles
587 /// if count >= 1000 {
588 /// break;
589 /// }
590 /// }
591 ///
592 /// println!("Processed {} articles", count);
593 /// Ok(())
594 /// }
595 /// ```
596 #[cfg(not(target_arch = "wasm32"))]
597 pub fn search_all(
598 &self,
599 query: &str,
600 batch_size: usize,
601 ) -> impl futures_util::Stream<Item = Result<PubMedArticle>> + '_ {
602 use futures_util::stream;
603
604 let query = query.to_string();
605 let batch_size = batch_size.max(1); // Ensure at least 1
606
607 stream::unfold(
608 SearchAllState::Initial { query, batch_size },
609 move |state| async move {
610 match state {
611 SearchAllState::Initial { query, batch_size } => {
612 // Perform initial search with history
613 match self.search_with_history(&query, batch_size).await {
614 Ok(result) => {
615 let session = result.history_session();
616 let total = result.total_count;
617
618 if result.pmids.is_empty() {
619 return None;
620 }
621
622 // Fetch first batch of articles
623 match session {
624 Some(session) => {
625 match self.fetch_from_history(&session, 0, batch_size).await
626 {
627 Ok(articles) => {
628 let next_state = SearchAllState::Fetching {
629 session,
630 total,
631 batch_size,
632 current_offset: batch_size,
633 pending_articles: articles,
634 article_index: 0,
635 };
636 self.next_article_from_state(next_state)
637 }
638 Err(e) => Some((Err(e), SearchAllState::Done)),
639 }
640 }
641 None => {
642 // No history session, can't stream
643 Some((
644 Err(PubMedError::WebEnvNotAvailable),
645 SearchAllState::Done,
646 ))
647 }
648 }
649 }
650 Err(e) => Some((Err(e), SearchAllState::Done)),
651 }
652 }
653 SearchAllState::Fetching {
654 session,
655 total,
656 batch_size,
657 current_offset,
658 pending_articles,
659 article_index,
660 } => {
661 if article_index < pending_articles.len() {
662 // Return next article from current batch
663 let article = pending_articles[article_index].clone();
664 Some((
665 Ok(article),
666 SearchAllState::Fetching {
667 session,
668 total,
669 batch_size,
670 current_offset,
671 pending_articles,
672 article_index: article_index + 1,
673 },
674 ))
675 } else if current_offset < total {
676 // Fetch next batch
677 match self
678 .fetch_from_history(&session, current_offset, batch_size)
679 .await
680 {
681 Ok(articles) => {
682 if articles.is_empty() {
683 return None;
684 }
685 let next_state = SearchAllState::Fetching {
686 session,
687 total,
688 batch_size,
689 current_offset: current_offset + batch_size,
690 pending_articles: articles,
691 article_index: 0,
692 };
693 self.next_article_from_state(next_state)
694 }
695 Err(e) => Some((Err(e), SearchAllState::Done)),
696 }
697 } else {
698 // All done
699 None
700 }
701 }
702 SearchAllState::Done => None,
703 }
704 },
705 )
706 }
707
708 /// Helper to get next article from state
709 #[cfg(not(target_arch = "wasm32"))]
710 fn next_article_from_state(
711 &self,
712 state: SearchAllState,
713 ) -> Option<(Result<PubMedArticle>, SearchAllState)> {
714 match state {
715 SearchAllState::Fetching {
716 ref pending_articles,
717 article_index,
718 ..
719 } if article_index < pending_articles.len() => {
720 let article = pending_articles[article_index].clone();
721 let SearchAllState::Fetching {
722 session,
723 total,
724 batch_size,
725 current_offset,
726 pending_articles,
727 article_index,
728 } = state
729 else {
730 unreachable!()
731 };
732 Some((
733 Ok(article),
734 SearchAllState::Fetching {
735 session,
736 total,
737 batch_size,
738 current_offset,
739 pending_articles,
740 article_index: article_index + 1,
741 },
742 ))
743 }
744 _ => None,
745 }
746 }
747}
748
749#[cfg(test)]
750mod tests {
751 use std::time::{Duration, Instant};
752
753 use super::*;
754
755 #[tokio::test]
756 async fn test_epost_empty_input() {
757 let client = PubMedClient::new();
758 let result = client.epost(&[]).await;
759 assert!(result.is_err());
760 if let Err(e) = result {
761 assert!(e.to_string().contains("empty"));
762 }
763 }
764
765 #[tokio::test]
766 async fn test_epost_invalid_pmid() {
767 let client = PubMedClient::new();
768 let result = client.epost(&["not_a_number"]).await;
769 assert!(result.is_err());
770 }
771
772 #[tokio::test]
773 async fn test_epost_validates_all_pmids_before_request() {
774 let client = PubMedClient::new();
775
776 let start = Instant::now();
777 let result = client.epost(&["31978945", "invalid", "33515491"]).await;
778 assert!(result.is_err());
779 let elapsed = start.elapsed();
780 assert!(elapsed < Duration::from_millis(100));
781 }
782
783 #[tokio::test]
784 async fn test_fetch_all_by_pmids_empty_input() {
785 let client = PubMedClient::new();
786 let result = client.fetch_all_by_pmids(&[]).await;
787 assert!(result.is_ok());
788 assert!(result.unwrap().is_empty());
789 }
790
791 #[tokio::test]
792 async fn test_fetch_all_by_pmids_invalid_pmid() {
793 let client = PubMedClient::new();
794 let result = client.fetch_all_by_pmids(&["not_a_number"]).await;
795 assert!(result.is_err());
796 }
797}