| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263 |
- import requests
- import threading
- from concurrent.futures import ThreadPoolExecutor, as_completed
- from urllib.parse import urlparse, parse_qs, urlencode, urlunparse
- class AcumaticaODataClient:
- def __init__(self, base_url, username, password, page_size=1000, max_workers=8):
- self.base_url = self._add_format_and_top(base_url, page_size)
- self.page_size = page_size
- self.session = requests.Session()
- self.session.auth = (username, password)
- self.session.headers.update({
- "Accept": "application/json"
- })
- self.lock = threading.Lock()
- self.max_workers = max_workers
- self.results = []
- def _add_format_and_top(self, url, top):
- # Add or update $format and $top in query string
- parts = list(urlparse(url))
- query = parse_qs(parts[4])
- query['$format'] = ['json']
- query['$top'] = [str(top)]
- parts[4] = urlencode(query, doseq=True)
- return urlunparse(parts)
- def _fetch_initial_links(self):
- """Collect all paginated URLs before multithreaded fetch."""
- next_url = self.base_url
- urls = []
- while next_url:
- print(f"[DISCOVER] {next_url}")
- resp = self.session.get(next_url)
- resp.raise_for_status()
- data = resp.json()
- urls.append(next_url)
- next_url = data.get('@odata.nextLink')
- return urls
- def _fetch_page(self, url):
- try:
- resp = self.session.get(url)
- resp.raise_for_status()
- data = resp.json()
- records = data.get('value', [])
- with self.lock:
- self.results.extend(records)
- print(f"[FETCHED] {len(records)} records from {url}")
- except Exception as e:
- print(f"[ERROR] {url} -> {e}")
- def fetch_all(self):
- page_urls = self._fetch_initial_links()
- with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
- futures = [executor.submit(self._fetch_page, url) for url in page_urls]
- for _ in as_completed(futures):
- pass
- return self.results
|