diff options
-rw-r--r-- | packages/pipeline/src/scripts/pull_missing_events.ts | 35 |
1 files changed, 26 insertions, 9 deletions
diff --git a/packages/pipeline/src/scripts/pull_missing_events.ts b/packages/pipeline/src/scripts/pull_missing_events.ts index 2dc81f205..bceed299c 100644 --- a/packages/pipeline/src/scripts/pull_missing_events.ts +++ b/packages/pipeline/src/scripts/pull_missing_events.ts @@ -8,12 +8,12 @@ import { Connection, ConnectionOptions, createConnection, Repository } from 'typ import { ExchangeEventsSource } from '../data_sources/contract-wrappers/exchange_events'; import { ExchangeFillEvent } from '../entities'; import * as ormConfig from '../ormconfig'; -import { parseExchangeEvents } from '../parsers/events'; +import { ExchangeEventEntity, parseExchangeEvents } from '../parsers/events'; import { handleError } from '../utils'; const EXCHANGE_START_BLOCK = 6271590; // Block number when the Exchange contract was deployed to mainnet. -const START_BLOCK_OFFSET = 1000; // Number of blocks before the last known block to consider when updating fill events. -const BATCH_SAVE_SIZE = 10000; // Number of events to save at once. +const START_BLOCK_OFFSET = 100; // Number of blocks before the last known block to consider when updating fill events. +const BATCH_SAVE_SIZE = 1000; // Number of events to save at once. let connection: Connection; @@ -38,17 +38,36 @@ async function getExchangeEventsAsync(provider: Web3ProviderEngine): Promise<voi const events = parseExchangeEvents(eventLogs); console.log(`Retrieved and parsed ${events.length} total events.`); console.log('Saving events...'); + if (startBlock === EXCHANGE_START_BLOCK) { + // Split data into numChunks pieces of maximum size BATCH_SAVE_SIZE + // each. + for (const eventsBatch of R.splitEvery(BATCH_SAVE_SIZE, events)) { + await eventsRepository.insert(eventsBatch); + } + } else { + // If we possibly have some overlap where we need to update some + // existing events, we need to use our workaround/fallback. + await saveIndividuallyWithFallbackAsync(eventsRepository, events); + } + const totalEvents = await eventsRepository.count(); + console.log(`Done saving events. There are now ${totalEvents} total events.`); +} + +async function saveIndividuallyWithFallbackAsync( + eventsRepository: Repository<ExchangeFillEvent>, + events: ExchangeEventEntity[], +): Promise<void> { // Note(albrow): This is a temporary hack because `save` is not working as // documented and is causing a foreign key constraint violation. Hopefully // can remove later because this "poor man's upsert" implementation operates // on one event at a time and is therefore much slower. - // await eventsRepository.save(events, { chunk: Math.ceil(events.length / BATCH_SAVE_SIZE) }); for (const event of events) { try { - await eventsRepository.save(event); + // First try and insert. + await eventsRepository.insert(event); } catch { - // Assume this is a foreign key constraint error and try doing an - // update instead. + // If it fails, assume it was a foreign key constraint error and try + // doing an update instead. await eventsRepository.update( { contractAddress: event.contractAddress, @@ -59,8 +78,6 @@ async function getExchangeEventsAsync(provider: Web3ProviderEngine): Promise<voi ); } } - const totalEvents = await eventsRepository.count(); - console.log(`Done saving events. There are now ${totalEvents} total events.`); } async function getStartBlockAsync(eventsRepository: Repository<ExchangeFillEvent>): Promise<number> { |