acumatica_odata_client.py 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
  1. import requests
  2. import threading
  3. from concurrent.futures import ThreadPoolExecutor, as_completed
  4. from urllib.parse import urlparse, parse_qs, urlencode, urlunparse
  5. class AcumaticaODataClient:
  6. def __init__(self, base_url, username, password, page_size=1000, max_workers=8):
  7. self.base_url = self._add_format_and_top(base_url, page_size)
  8. self.page_size = page_size
  9. self.session = requests.Session()
  10. self.session.auth = (username, password)
  11. self.session.headers.update({
  12. "Accept": "application/json"
  13. })
  14. self.lock = threading.Lock()
  15. self.max_workers = max_workers
  16. self.results = []
  17. def _add_format_and_top(self, url, top):
  18. # Add or update $format and $top in query string
  19. parts = list(urlparse(url))
  20. query = parse_qs(parts[4])
  21. query['$format'] = ['json']
  22. query['$top'] = [str(top)]
  23. parts[4] = urlencode(query, doseq=True)
  24. return urlunparse(parts)
  25. def _fetch_initial_links(self):
  26. """Collect all paginated URLs before multithreaded fetch."""
  27. next_url = self.base_url
  28. urls = []
  29. while next_url:
  30. print(f"[DISCOVER] {next_url}")
  31. resp = self.session.get(next_url)
  32. resp.raise_for_status()
  33. data = resp.json()
  34. urls.append(next_url)
  35. next_url = data.get('@odata.nextLink')
  36. return urls
  37. def _fetch_page(self, url):
  38. try:
  39. resp = self.session.get(url)
  40. resp.raise_for_status()
  41. data = resp.json()
  42. records = data.get('value', [])
  43. with self.lock:
  44. self.results.extend(records)
  45. print(f"[FETCHED] {len(records)} records from {url}")
  46. except Exception as e:
  47. print(f"[ERROR] {url} -> {e}")
  48. def fetch_all(self):
  49. page_urls = self._fetch_initial_links()
  50. with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
  51. futures = [executor.submit(self._fetch_page, url) for url in page_urls]
  52. for _ in as_completed(futures):
  53. pass
  54. return self.results