diff options
author | Steve Klebanoff <steve.klebanoff@gmail.com> | 2019-01-10 00:54:55 +0800 |
---|---|---|
committer | Steve Klebanoff <steve.klebanoff@gmail.com> | 2019-01-10 00:54:55 +0800 |
commit | fb3605026ef63ef6897010a52bf3f4c116cdf271 (patch) | |
tree | af3887e5f785e049daeaf795e0f3e4a9ad370d93 /packages/pipeline/src | |
parent | 76dde294f10bb3d5c1074cd6599668bcb1cdd8ec (diff) | |
parent | 5b8c9122a292f6558c45440b053ceae52d36d495 (diff) | |
download | dexon-sol-tools-fb3605026ef63ef6897010a52bf3f4c116cdf271.tar dexon-sol-tools-fb3605026ef63ef6897010a52bf3f4c116cdf271.tar.gz dexon-sol-tools-fb3605026ef63ef6897010a52bf3f4c116cdf271.tar.bz2 dexon-sol-tools-fb3605026ef63ef6897010a52bf3f4c116cdf271.tar.lz dexon-sol-tools-fb3605026ef63ef6897010a52bf3f4c116cdf271.tar.xz dexon-sol-tools-fb3605026ef63ef6897010a52bf3f4c116cdf271.tar.zst dexon-sol-tools-fb3605026ef63ef6897010a52bf3f4c116cdf271.zip |
Merge branch 'development' into feature/instant/tell-amount-available
Diffstat (limited to 'packages/pipeline/src')
22 files changed, 801 insertions, 57 deletions
diff --git a/packages/pipeline/src/data_sources/copper/index.ts b/packages/pipeline/src/data_sources/copper/index.ts new file mode 100644 index 000000000..15df2fd7d --- /dev/null +++ b/packages/pipeline/src/data_sources/copper/index.ts @@ -0,0 +1,126 @@ +import { fetchAsync } from '@0x/utils'; +import Bottleneck from 'bottleneck'; + +import { + CopperActivityTypeCategory, + CopperActivityTypeResponse, + CopperCustomFieldResponse, + CopperSearchResponse, +} from '../../parsers/copper'; + +const HTTP_OK_STATUS = 200; +const COPPER_URI = 'https://api.prosperworks.com/developer_api/v1'; + +const DEFAULT_PAGINATION_PARAMS = { + page_size: 200, + sort_by: 'date_modified', + sort_direction: 'desc', +}; + +export type CopperSearchParams = CopperLeadSearchParams | CopperActivitySearchParams | CopperOpportunitySearchParams; +export interface CopperLeadSearchParams { + page_number?: number; +} + +export interface CopperActivitySearchParams { + minimum_activity_date: number; + page_number?: number; +} + +export interface CopperOpportunitySearchParams { + sort_by: string; // must override the default 'date_modified' for this endpoint + page_number?: number; +} +export enum CopperEndpoint { + Leads = '/leads/search', + Opportunities = '/opportunities/search', + Activities = '/activities/search', +} +const ONE_SECOND = 1000; + +function httpErrorCheck(response: Response): void { + if (response.status !== HTTP_OK_STATUS) { + throw new Error(`HTTP error while scraping Copper: [${JSON.stringify(response)}]`); + } +} +export class CopperSource { + private readonly _accessToken: string; + private readonly _userEmail: string; + private readonly _defaultHeaders: any; + private readonly _limiter: Bottleneck; + + constructor(maxConcurrentRequests: number, accessToken: string, userEmail: string) { + this._accessToken = accessToken; + this._userEmail = userEmail; + this._defaultHeaders = { + 'Content-Type': 'application/json', + 'X-PW-AccessToken': this._accessToken, + 'X-PW-Application': 'developer_api', + 'X-PW-UserEmail': this._userEmail, + }; + this._limiter = new Bottleneck({ + minTime: ONE_SECOND / maxConcurrentRequests, + reservoir: 30, + reservoirRefreshAmount: 30, + reservoirRefreshInterval: maxConcurrentRequests, + }); + } + + public async fetchNumberOfPagesAsync(endpoint: CopperEndpoint, searchParams?: CopperSearchParams): Promise<number> { + const resp = await this._limiter.schedule(() => + fetchAsync(COPPER_URI + endpoint, { + method: 'POST', + body: JSON.stringify({ ...DEFAULT_PAGINATION_PARAMS, ...searchParams }), + headers: this._defaultHeaders, + }), + ); + + httpErrorCheck(resp); + + // total number of records that match the request parameters + if (resp.headers.has('X-Pw-Total')) { + const totalRecords: number = parseInt(resp.headers.get('X-Pw-Total') as string, 10); // tslint:disable-line:custom-no-magic-numbers + return Math.ceil(totalRecords / DEFAULT_PAGINATION_PARAMS.page_size); + } else { + return 1; + } + } + public async fetchSearchResultsAsync<T extends CopperSearchResponse>( + endpoint: CopperEndpoint, + searchParams?: CopperSearchParams, + ): Promise<T[]> { + const request = { ...DEFAULT_PAGINATION_PARAMS, ...searchParams }; + const response = await this._limiter.schedule(() => + fetchAsync(COPPER_URI + endpoint, { + method: 'POST', + body: JSON.stringify(request), + headers: this._defaultHeaders, + }), + ); + httpErrorCheck(response); + const json: T[] = await response.json(); + return json; + } + + public async fetchActivityTypesAsync(): Promise<Map<CopperActivityTypeCategory, CopperActivityTypeResponse[]>> { + const response = await this._limiter.schedule(() => + fetchAsync(`${COPPER_URI}/activity_types`, { + method: 'GET', + headers: this._defaultHeaders, + }), + ); + httpErrorCheck(response); + return response.json(); + } + + public async fetchCustomFieldsAsync(): Promise<CopperCustomFieldResponse[]> { + const response = await this._limiter.schedule(() => + fetchAsync(`${COPPER_URI}/custom_field_definitions`, { + method: 'GET', + headers: this._defaultHeaders, + }), + ); + httpErrorCheck(response); + return response.json(); + } +} diff --git a/packages/pipeline/src/entities/copper_activity.ts b/packages/pipeline/src/entities/copper_activity.ts new file mode 100644 index 000000000..cbc034285 --- /dev/null +++ b/packages/pipeline/src/entities/copper_activity.ts @@ -0,0 +1,41 @@ +import { Column, Entity, Index, PrimaryColumn } from 'typeorm'; + +import { numberToBigIntTransformer } from '../utils'; + +@Entity({ name: 'copper_activities', schema: 'raw' }) +export class CopperActivity { + @PrimaryColumn({ type: 'bigint', transformer: numberToBigIntTransformer }) + public id!: number; + + @Index() + @Column({ name: 'parent_id', type: 'bigint', transformer: numberToBigIntTransformer }) + public parentId!: number; + @Column({ name: 'parent_type', type: 'varchar' }) + public parentType!: string; + + // join with CopperActivityType + @Index() + @Column({ name: 'type_id', type: 'bigint', transformer: numberToBigIntTransformer }) + public typeId!: number; + @Column({ name: 'type_category', type: 'varchar' }) + public typeCategory!: string; + @Column({ name: 'type_name', type: 'varchar', nullable: true }) + public typeName?: string; + + @Column({ name: 'user_id', type: 'bigint', transformer: numberToBigIntTransformer }) + public userId!: number; + @Column({ name: 'old_value_id', type: 'bigint', nullable: true, transformer: numberToBigIntTransformer }) + public oldValueId?: number; + @Column({ name: 'old_value_name', type: 'varchar', nullable: true }) + public oldValueName?: string; + @Column({ name: 'new_value_id', type: 'bigint', nullable: true, transformer: numberToBigIntTransformer }) + public newValueId?: number; + @Column({ name: 'new_value_name', type: 'varchar', nullable: true }) + public newValueName?: string; + + @Index() + @Column({ name: 'date_created', type: 'bigint', transformer: numberToBigIntTransformer }) + public dateCreated!: number; + @PrimaryColumn({ name: 'date_modified', type: 'bigint', transformer: numberToBigIntTransformer }) + public dateModified!: number; +} diff --git a/packages/pipeline/src/entities/copper_activity_type.ts b/packages/pipeline/src/entities/copper_activity_type.ts new file mode 100644 index 000000000..8fb2dcf70 --- /dev/null +++ b/packages/pipeline/src/entities/copper_activity_type.ts @@ -0,0 +1,17 @@ +import { Column, Entity, PrimaryColumn } from 'typeorm'; + +import { numberToBigIntTransformer } from '../utils'; + +@Entity({ name: 'copper_activity_types', schema: 'raw' }) +export class CopperActivityType { + @PrimaryColumn({ type: 'bigint', transformer: numberToBigIntTransformer }) + public id!: number; + @Column({ name: 'category', type: 'varchar' }) + public category!: string; + @Column({ name: 'name', type: 'varchar' }) + public name!: string; + @Column({ name: 'is_disabled', type: 'boolean', nullable: true }) + public isDisabled?: boolean; + @Column({ name: 'count_as_interaction', type: 'boolean', nullable: true }) + public countAsInteraction?: boolean; +} diff --git a/packages/pipeline/src/entities/copper_custom_field.ts b/packages/pipeline/src/entities/copper_custom_field.ts new file mode 100644 index 000000000..f23f6ab22 --- /dev/null +++ b/packages/pipeline/src/entities/copper_custom_field.ts @@ -0,0 +1,15 @@ +import { Column, Entity, PrimaryColumn } from 'typeorm'; + +import { numberToBigIntTransformer } from '../utils'; + +@Entity({ name: 'copper_custom_fields', schema: 'raw' }) +export class CopperCustomField { + @PrimaryColumn({ type: 'bigint', transformer: numberToBigIntTransformer }) + public id!: number; + @Column({ name: 'data_type', type: 'varchar' }) + public dataType!: string; + @Column({ name: 'field_type', type: 'varchar', nullable: true }) + public fieldType?: string; + @Column({ name: 'name', type: 'varchar' }) + public name!: string; +} diff --git a/packages/pipeline/src/entities/copper_lead.ts b/packages/pipeline/src/entities/copper_lead.ts new file mode 100644 index 000000000..c51ccd761 --- /dev/null +++ b/packages/pipeline/src/entities/copper_lead.ts @@ -0,0 +1,38 @@ +import { Column, Entity, Index, PrimaryColumn } from 'typeorm'; + +import { numberToBigIntTransformer } from '../utils'; + +@Entity({ name: 'copper_leads', schema: 'raw' }) +export class CopperLead { + @PrimaryColumn({ type: 'bigint', transformer: numberToBigIntTransformer }) + public id!: number; + + @Column({ name: 'name', type: 'varchar', nullable: true }) + public name?: string; + @Column({ name: 'first_name', type: 'varchar', nullable: true }) + public firstName?: string; + @Column({ name: 'last_name', type: 'varchar', nullable: true }) + public lastName?: string; + @Column({ name: 'middle_name', type: 'varchar', nullable: true }) + public middleName?: string; + @Column({ name: 'assignee_id', type: 'bigint', transformer: numberToBigIntTransformer, nullable: true }) + public assigneeId?: number; + @Column({ name: 'company_name', type: 'varchar', nullable: true }) + public companyName?: string; + @Column({ name: 'customer_source_id', type: 'bigint', transformer: numberToBigIntTransformer, nullable: true }) + public customerSourceId?: number; + @Column({ name: 'monetary_value', type: 'integer', nullable: true }) + public monetaryValue?: number; + @Column({ name: 'status', type: 'varchar' }) + public status!: string; + @Column({ name: 'status_id', type: 'bigint', transformer: numberToBigIntTransformer }) + public statusId!: number; + @Column({ name: 'title', type: 'varchar', nullable: true }) + public title?: string; + + @Index() + @Column({ name: 'date_created', type: 'bigint', transformer: numberToBigIntTransformer }) + public dateCreated!: number; + @PrimaryColumn({ name: 'date_modified', type: 'bigint', transformer: numberToBigIntTransformer }) + public dateModified!: number; +} diff --git a/packages/pipeline/src/entities/copper_opportunity.ts b/packages/pipeline/src/entities/copper_opportunity.ts new file mode 100644 index 000000000..e12bd69ce --- /dev/null +++ b/packages/pipeline/src/entities/copper_opportunity.ts @@ -0,0 +1,45 @@ +import { Column, Entity, PrimaryColumn } from 'typeorm'; + +import { numberToBigIntTransformer } from '../utils'; + +@Entity({ name: 'copper_opportunities', schema: 'raw' }) +export class CopperOpportunity { + @PrimaryColumn({ name: 'id', type: 'bigint', transformer: numberToBigIntTransformer }) + public id!: number; + @Column({ name: 'name', type: 'varchar' }) + public name!: string; + @Column({ name: 'assignee_id', nullable: true, type: 'bigint', transformer: numberToBigIntTransformer }) + public assigneeId?: number; + @Column({ name: 'close_date', nullable: true, type: 'varchar' }) + public closeDate?: string; + @Column({ name: 'company_id', nullable: true, type: 'bigint', transformer: numberToBigIntTransformer }) + public companyId?: number; + @Column({ name: 'company_name', nullable: true, type: 'varchar' }) + public companyName?: string; + @Column({ name: 'customer_source_id', nullable: true, type: 'bigint', transformer: numberToBigIntTransformer }) + public customerSourceId?: number; + @Column({ name: 'loss_reason_id', nullable: true, type: 'bigint', transformer: numberToBigIntTransformer }) + public lossReasonId?: number; + @Column({ name: 'pipeline_id', type: 'bigint', transformer: numberToBigIntTransformer }) + public pipelineId!: number; + @Column({ name: 'pipeline_stage_id', type: 'bigint', transformer: numberToBigIntTransformer }) + public pipelineStageId!: number; + @Column({ name: 'primary_contact_id', nullable: true, type: 'bigint', transformer: numberToBigIntTransformer }) + public primaryContactId?: number; + @Column({ name: 'priority', nullable: true, type: 'varchar' }) + public priority?: string; + @Column({ name: 'status', type: 'varchar' }) + public status!: string; + @Column({ name: 'interaction_count', type: 'bigint', transformer: numberToBigIntTransformer }) + public interactionCount!: number; + @Column({ name: 'monetary_value', nullable: true, type: 'integer' }) + public monetaryValue?: number; + @Column({ name: 'win_probability', nullable: true, type: 'integer' }) + public winProbability?: number; + @Column({ name: 'date_created', type: 'bigint', transformer: numberToBigIntTransformer }) + public dateCreated!: number; + @PrimaryColumn({ name: 'date_modified', type: 'bigint', transformer: numberToBigIntTransformer }) + public dateModified!: number; + @Column({ name: 'custom_fields', type: 'jsonb' }) + public customFields!: { [key: number]: number }; +} diff --git a/packages/pipeline/src/entities/index.ts b/packages/pipeline/src/entities/index.ts index cc3de78bb..27c153c07 100644 --- a/packages/pipeline/src/entities/index.ts +++ b/packages/pipeline/src/entities/index.ts @@ -16,4 +16,10 @@ export { TokenOrderbookSnapshot } from './token_order'; export { Transaction } from './transaction'; export { ERC20ApprovalEvent } from './erc20_approval_event'; +export { CopperLead } from './copper_lead'; +export { CopperActivity } from './copper_activity'; +export { CopperOpportunity } from './copper_opportunity'; +export { CopperActivityType } from './copper_activity_type'; +export { CopperCustomField } from './copper_custom_field'; + export type ExchangeEvent = ExchangeFillEvent | ExchangeCancelEvent | ExchangeCancelUpToEvent; diff --git a/packages/pipeline/src/entities/token_order.ts b/packages/pipeline/src/entities/token_order.ts index 4b8f0abc3..2709747cb 100644 --- a/packages/pipeline/src/entities/token_order.ts +++ b/packages/pipeline/src/entities/token_order.ts @@ -1,7 +1,6 @@ import { BigNumber } from '@0x/utils'; import { Column, Entity, PrimaryColumn } from 'typeorm'; -import { OrderType } from '../types'; import { bigNumberTransformer, numberToBigIntTransformer } from '../utils'; @Entity({ name: 'token_orderbook_snapshots', schema: 'raw' }) @@ -11,7 +10,7 @@ export class TokenOrderbookSnapshot { @PrimaryColumn({ name: 'source' }) public source!: string; @PrimaryColumn({ name: 'order_type' }) - public orderType!: OrderType; + public orderType!: string; @PrimaryColumn({ name: 'price', type: 'numeric', transformer: bigNumberTransformer }) public price!: BigNumber; @PrimaryColumn({ name: 'base_asset_symbol' }) diff --git a/packages/pipeline/src/ormconfig.ts b/packages/pipeline/src/ormconfig.ts index fe11d81d5..2700714cd 100644 --- a/packages/pipeline/src/ormconfig.ts +++ b/packages/pipeline/src/ormconfig.ts @@ -2,6 +2,11 @@ import { ConnectionOptions } from 'typeorm'; import { Block, + CopperActivity, + CopperActivityType, + CopperCustomField, + CopperLead, + CopperOpportunity, DexTrade, ERC20ApprovalEvent, ExchangeCancelEvent, @@ -18,6 +23,11 @@ import { const entities = [ Block, + CopperOpportunity, + CopperActivity, + CopperActivityType, + CopperCustomField, + CopperLead, DexTrade, ExchangeCancelEvent, ExchangeCancelUpToEvent, diff --git a/packages/pipeline/src/parsers/copper/index.ts b/packages/pipeline/src/parsers/copper/index.ts new file mode 100644 index 000000000..6c0c5abd5 --- /dev/null +++ b/packages/pipeline/src/parsers/copper/index.ts @@ -0,0 +1,259 @@ +import * as R from 'ramda'; + +import { CopperActivity, CopperActivityType, CopperCustomField, CopperLead, CopperOpportunity } from '../../entities'; + +const ONE_SECOND = 1000; +export type CopperSearchResponse = CopperLeadResponse | CopperActivityResponse | CopperOpportunityResponse; +export interface CopperLeadResponse { + id: number; + name?: string; + first_name?: string; + last_name?: string; + middle_name?: string; + assignee_id?: number; + company_name?: string; + customer_source_id?: number; + monetary_value?: number; + status: string; + status_id: number; + title?: string; + date_created: number; // in seconds + date_modified: number; // in seconds +} + +export interface CopperActivityResponse { + id: number; + parent: CopperActivityParentResponse; + type: CopperActivityTypeResponse; + user_id: number; + activity_date: number; + old_value: CopperActivityValueResponse; + new_value: CopperActivityValueResponse; + date_created: number; // in seconds + date_modified: number; // in seconds +} + +export interface CopperActivityValueResponse { + id: number; + name: string; +} +export interface CopperActivityParentResponse { + id: number; + type: string; +} + +// custom activity types +export enum CopperActivityTypeCategory { + user = 'user', + system = 'system', +} +export interface CopperActivityTypeResponse { + id: number; + category: CopperActivityTypeCategory; + name: string; + is_disabled?: boolean; + count_as_interaction?: boolean; +} + +export interface CopperOpportunityResponse { + id: number; + name: string; + assignee_id?: number; + close_date?: string; + company_id?: number; + company_name?: string; + customer_source_id?: number; + loss_reason_id?: number; + pipeline_id: number; + pipeline_stage_id: number; + primary_contact_id?: number; + priority?: string; + status: string; + tags: string[]; + interaction_count: number; + monetary_value?: number; + win_probability?: number; + date_created: number; // in seconds + date_modified: number; // in seconds + custom_fields: CopperNestedCustomFieldResponse[]; +} +interface CopperNestedCustomFieldResponse { + custom_field_definition_id: number; + value: number | number[] | null; +} +// custom fields +export enum CopperCustomFieldType { + String = 'String', + Text = 'Text', + Dropdown = 'Dropdown', + MultiSelect = 'MultiSelect', // not in API documentation but shows up in results + Date = 'Date', + Checkbox = 'Checkbox', + Float = 'Float', + URL = 'URL', + Percentage = 'Percentage', + Currency = 'Currency', + Connect = 'Connect', +} +export interface CopperCustomFieldOptionResponse { + id: number; + name: string; +} +export interface CopperCustomFieldResponse { + id: number; + name: string; + data_type: CopperCustomFieldType; + options?: CopperCustomFieldOptionResponse[]; +} +/** + * Parse response from Copper API /search/leads/ + * + * @param leads - The array of leads returned from the API + * @returns Returns an array of Copper Lead entities + */ +export function parseLeads(leads: CopperLeadResponse[]): CopperLead[] { + return leads.map(lead => { + const entity = new CopperLead(); + entity.id = lead.id; + entity.name = lead.name || undefined; + entity.firstName = lead.first_name || undefined; + entity.lastName = lead.last_name || undefined; + entity.middleName = lead.middle_name || undefined; + entity.assigneeId = lead.assignee_id || undefined; + entity.companyName = lead.company_name || undefined; + entity.customerSourceId = lead.customer_source_id || undefined; + entity.monetaryValue = lead.monetary_value || undefined; + entity.status = lead.status; + entity.statusId = lead.status_id; + entity.title = lead.title || undefined; + entity.dateCreated = lead.date_created * ONE_SECOND; + entity.dateModified = lead.date_modified * ONE_SECOND; + return entity; + }); +} + +/** + * Parse response from Copper API /search/activities/ + * + * @param activities - The array of activities returned from the API + * @returns Returns an array of Copper Activity entities + */ +export function parseActivities(activities: CopperActivityResponse[]): CopperActivity[] { + return activities.map(activity => { + const entity = new CopperActivity(); + entity.id = activity.id; + + entity.parentId = activity.parent.id; + entity.parentType = activity.parent.type; + + entity.typeId = activity.type.id; + entity.typeCategory = activity.type.category.toString(); + entity.typeName = activity.type.name; + + entity.userId = activity.user_id; + entity.dateCreated = activity.date_created * ONE_SECOND; + entity.dateModified = activity.date_modified * ONE_SECOND; + + // nested nullable fields + entity.oldValueId = R.path(['old_value', 'id'], activity); + entity.oldValueName = R.path(['old_value', 'name'], activity); + entity.newValueId = R.path(['new_value', 'id'], activity); + entity.newValueName = R.path(['new_value', 'name'], activity); + + return entity; + }); +} + +/** + * Parse response from Copper API /search/opportunities/ + * + * @param opportunities - The array of opportunities returned from the API + * @returns Returns an array of Copper Opportunity entities + */ +export function parseOpportunities(opportunities: CopperOpportunityResponse[]): CopperOpportunity[] { + return opportunities.map(opp => { + const customFields: { [key: number]: number } = opp.custom_fields + .filter(f => f.value !== null) + .map(f => ({ + ...f, + value: ([] as number[]).concat(f.value || []), // normalise all values to number[] + })) + .map(f => f.value.map(val => [f.custom_field_definition_id, val] as [number, number])) // pair each value with the custom_field_definition_id + .reduce((acc, pair) => acc.concat(pair)) // flatten + .reduce<{ [key: number]: number }>((obj, [key, value]) => { + // transform into object literal + obj[key] = value; + return obj; + }, {}); + + const entity = new CopperOpportunity(); + entity.id = opp.id; + entity.name = opp.name; + entity.assigneeId = opp.assignee_id || undefined; + entity.closeDate = opp.close_date || undefined; + entity.companyId = opp.company_id || undefined; + entity.companyName = opp.company_name || undefined; + entity.customerSourceId = opp.customer_source_id || undefined; + entity.lossReasonId = opp.loss_reason_id || undefined; + entity.pipelineId = opp.pipeline_id; + entity.pipelineStageId = opp.pipeline_stage_id; + entity.primaryContactId = opp.primary_contact_id || undefined; + entity.priority = opp.priority || undefined; + entity.status = opp.status; + entity.interactionCount = opp.interaction_count; + entity.monetaryValue = opp.monetary_value || undefined; + entity.winProbability = opp.win_probability === null ? undefined : opp.win_probability; + entity.dateCreated = opp.date_created * ONE_SECOND; + entity.dateModified = opp.date_modified * ONE_SECOND; + entity.customFields = customFields; + return entity; + }); +} + +/** + * Parse response from Copper API /activity_types/ + * + * @param activityTypeResponse - Activity Types response from the API, keyed by "user" or "system" + * @returns Returns an array of Copper Activity Type entities + */ +export function parseActivityTypes( + activityTypeResponse: Map<CopperActivityTypeCategory, CopperActivityTypeResponse[]>, +): CopperActivityType[] { + const values: CopperActivityTypeResponse[] = R.flatten(Object.values(activityTypeResponse)); + return values.map(activityType => ({ + id: activityType.id, + name: activityType.name, + category: activityType.category.toString(), + isDisabled: activityType.is_disabled, + countAsInteraction: activityType.count_as_interaction, + })); +} + +/** + * Parse response from Copper API /custom_field_definitions/ + * + * @param customFieldResponse - array of custom field definitions returned from the API, consisting of top-level fields and nested fields + * @returns Returns an array of Copper Custom Field entities + */ +export function parseCustomFields(customFieldResponse: CopperCustomFieldResponse[]): CopperCustomField[] { + function parseTopLevelField(field: CopperCustomFieldResponse): CopperCustomField[] { + const topLevelField: CopperCustomField = { + id: field.id, + name: field.name, + dataType: field.data_type.toString(), + }; + + if (field.options !== undefined) { + const nestedFields: CopperCustomField[] = field.options.map(option => ({ + id: option.id, + name: option.name, + dataType: field.name, + fieldType: 'option', + })); + return nestedFields.concat(topLevelField); + } else { + return [topLevelField]; + } + } + return R.chain(parseTopLevelField, customFieldResponse); +} diff --git a/packages/pipeline/src/parsers/ddex_orders/index.ts b/packages/pipeline/src/parsers/ddex_orders/index.ts index d7b97efbe..eeb9c9d5b 100644 --- a/packages/pipeline/src/parsers/ddex_orders/index.ts +++ b/packages/pipeline/src/parsers/ddex_orders/index.ts @@ -23,8 +23,12 @@ export function parseDdexOrders( ): TokenOrder[] { const aggregatedBids = aggregateOrders(ddexOrderbook.bids); const aggregatedAsks = aggregateOrders(ddexOrderbook.asks); - const parsedBids = aggregatedBids.map(order => parseDdexOrder(ddexMarket, observedTimestamp, 'bid', source, order)); - const parsedAsks = aggregatedAsks.map(order => parseDdexOrder(ddexMarket, observedTimestamp, 'ask', source, order)); + const parsedBids = aggregatedBids.map(order => + parseDdexOrder(ddexMarket, observedTimestamp, OrderType.Bid, source, order), + ); + const parsedAsks = aggregatedAsks.map(order => + parseDdexOrder(ddexMarket, observedTimestamp, OrderType.Ask, source, order), + ); return parsedBids.concat(parsedAsks); } diff --git a/packages/pipeline/src/parsers/events/exchange_events.ts b/packages/pipeline/src/parsers/events/exchange_events.ts index e18106c75..9c4a5f89a 100644 --- a/packages/pipeline/src/parsers/events/exchange_events.ts +++ b/packages/pipeline/src/parsers/events/exchange_events.ts @@ -5,7 +5,7 @@ import { LogWithDecodedArgs } from 'ethereum-types'; import * as R from 'ramda'; import { ExchangeCancelEvent, ExchangeCancelUpToEvent, ExchangeFillEvent } from '../../entities'; -import { bigNumbertoStringOrNull } from '../../utils'; +import { bigNumbertoStringOrNull, convertAssetProxyIdToType } from '../../utils'; /** * Parses raw event logs for a fill event and returns an array of @@ -40,9 +40,7 @@ export const parseExchangeCancelUpToEvents: ( */ export function _convertToExchangeFillEvent(eventLog: LogWithDecodedArgs<ExchangeFillEventArgs>): ExchangeFillEvent { const makerAssetData = assetDataUtils.decodeAssetDataOrThrow(eventLog.args.makerAssetData); - const makerAssetType = makerAssetData.assetProxyId === AssetProxyId.ERC20 ? 'erc20' : 'erc721'; const takerAssetData = assetDataUtils.decodeAssetDataOrThrow(eventLog.args.takerAssetData); - const takerAssetType = takerAssetData.assetProxyId === AssetProxyId.ERC20 ? 'erc20' : 'erc721'; const exchangeFillEvent = new ExchangeFillEvent(); exchangeFillEvent.contractAddress = eventLog.address as string; exchangeFillEvent.blockNumber = eventLog.blockNumber as number; @@ -59,16 +57,24 @@ export function _convertToExchangeFillEvent(eventLog: LogWithDecodedArgs<Exchang exchangeFillEvent.takerFeePaid = eventLog.args.takerFeePaid; exchangeFillEvent.orderHash = eventLog.args.orderHash; exchangeFillEvent.rawMakerAssetData = eventLog.args.makerAssetData; - exchangeFillEvent.makerAssetType = makerAssetType; + // tslint:disable-next-line:no-unnecessary-type-assertion + exchangeFillEvent.makerAssetType = convertAssetProxyIdToType(makerAssetData.assetProxyId as AssetProxyId); exchangeFillEvent.makerAssetProxyId = makerAssetData.assetProxyId; - exchangeFillEvent.makerTokenAddress = makerAssetData.tokenAddress; + // HACK(abandeali1): this event schema currently does not support multiple maker/taker assets, so we store the first token address from the MultiAssetProxy assetData + exchangeFillEvent.makerTokenAddress = assetDataUtils.isMultiAssetData(makerAssetData) + ? assetDataUtils.decodeMultiAssetDataRecursively(eventLog.args.makerAssetData).nestedAssetData[0].tokenAddress + : makerAssetData.tokenAddress; // tslint has a false positive here. Type assertion is required. // tslint:disable-next-line:no-unnecessary-type-assertion exchangeFillEvent.makerTokenId = bigNumbertoStringOrNull((makerAssetData as ERC721AssetData).tokenId); exchangeFillEvent.rawTakerAssetData = eventLog.args.takerAssetData; - exchangeFillEvent.takerAssetType = takerAssetType; + // tslint:disable-next-line:no-unnecessary-type-assertion + exchangeFillEvent.takerAssetType = convertAssetProxyIdToType(takerAssetData.assetProxyId as AssetProxyId); exchangeFillEvent.takerAssetProxyId = takerAssetData.assetProxyId; - exchangeFillEvent.takerTokenAddress = takerAssetData.tokenAddress; + // HACK(abandeali1): this event schema currently does not support multiple maker/taker assets, so we store the first token address from the MultiAssetProxy assetData + exchangeFillEvent.takerTokenAddress = assetDataUtils.isMultiAssetData(takerAssetData) + ? assetDataUtils.decodeMultiAssetDataRecursively(eventLog.args.takerAssetData).nestedAssetData[0].tokenAddress + : takerAssetData.tokenAddress; // tslint:disable-next-line:no-unnecessary-type-assertion exchangeFillEvent.takerTokenId = bigNumbertoStringOrNull((takerAssetData as ERC721AssetData).tokenId); return exchangeFillEvent; @@ -83,9 +89,7 @@ export function _convertToExchangeCancelEvent( eventLog: LogWithDecodedArgs<ExchangeCancelEventArgs>, ): ExchangeCancelEvent { const makerAssetData = assetDataUtils.decodeAssetDataOrThrow(eventLog.args.makerAssetData); - const makerAssetType = makerAssetData.assetProxyId === AssetProxyId.ERC20 ? 'erc20' : 'erc721'; const takerAssetData = assetDataUtils.decodeAssetDataOrThrow(eventLog.args.takerAssetData); - const takerAssetType = takerAssetData.assetProxyId === AssetProxyId.ERC20 ? 'erc20' : 'erc721'; const exchangeCancelEvent = new ExchangeCancelEvent(); exchangeCancelEvent.contractAddress = eventLog.address as string; exchangeCancelEvent.blockNumber = eventLog.blockNumber as number; @@ -98,15 +102,23 @@ export function _convertToExchangeCancelEvent( exchangeCancelEvent.senderAddress = eventLog.args.senderAddress; exchangeCancelEvent.orderHash = eventLog.args.orderHash; exchangeCancelEvent.rawMakerAssetData = eventLog.args.makerAssetData; - exchangeCancelEvent.makerAssetType = makerAssetType; + // tslint:disable-next-line:no-unnecessary-type-assertion + exchangeCancelEvent.makerAssetType = convertAssetProxyIdToType(makerAssetData.assetProxyId as AssetProxyId); exchangeCancelEvent.makerAssetProxyId = makerAssetData.assetProxyId; - exchangeCancelEvent.makerTokenAddress = makerAssetData.tokenAddress; + // HACK(abandeali1): this event schema currently does not support multiple maker/taker assets, so we store the first token address from the MultiAssetProxy assetData + exchangeCancelEvent.makerTokenAddress = assetDataUtils.isMultiAssetData(makerAssetData) + ? assetDataUtils.decodeMultiAssetDataRecursively(eventLog.args.makerAssetData).nestedAssetData[0].tokenAddress + : makerAssetData.tokenAddress; // tslint:disable-next-line:no-unnecessary-type-assertion exchangeCancelEvent.makerTokenId = bigNumbertoStringOrNull((makerAssetData as ERC721AssetData).tokenId); exchangeCancelEvent.rawTakerAssetData = eventLog.args.takerAssetData; - exchangeCancelEvent.takerAssetType = takerAssetType; + // tslint:disable-next-line:no-unnecessary-type-assertion + exchangeCancelEvent.takerAssetType = convertAssetProxyIdToType(takerAssetData.assetProxyId as AssetProxyId); exchangeCancelEvent.takerAssetProxyId = takerAssetData.assetProxyId; - exchangeCancelEvent.takerTokenAddress = takerAssetData.tokenAddress; + // HACK(abandeali1): this event schema currently does not support multiple maker/taker assets, so we store the first token address from the MultiAssetProxy assetData + exchangeCancelEvent.takerTokenAddress = assetDataUtils.isMultiAssetData(takerAssetData) + ? assetDataUtils.decodeMultiAssetDataRecursively(eventLog.args.takerAssetData).nestedAssetData[0].tokenAddress + : takerAssetData.tokenAddress; // tslint:disable-next-line:no-unnecessary-type-assertion exchangeCancelEvent.takerTokenId = bigNumbertoStringOrNull((takerAssetData as ERC721AssetData).tokenId); return exchangeCancelEvent; diff --git a/packages/pipeline/src/parsers/idex_orders/index.ts b/packages/pipeline/src/parsers/idex_orders/index.ts index dfe27455c..14b871195 100644 --- a/packages/pipeline/src/parsers/idex_orders/index.ts +++ b/packages/pipeline/src/parsers/idex_orders/index.ts @@ -2,7 +2,7 @@ import { BigNumber } from '@0x/utils'; import { aggregateOrders } from '../utils'; -import { IdexOrder, IdexOrderbook, IdexOrderParam } from '../../data_sources/idex'; +import { IdexOrderbook, IdexOrderParam } from '../../data_sources/idex'; import { TokenOrderbookSnapshot as TokenOrder } from '../../entities'; import { OrderType } from '../../types'; @@ -21,7 +21,9 @@ export function parseIdexOrders(idexOrderbook: IdexOrderbook, observedTimestamp: const idexBidOrder = idexOrderbook.bids[0]; const parsedBids = aggregatedBids.length > 0 - ? aggregatedBids.map(order => parseIdexOrder(idexBidOrder.params, observedTimestamp, 'bid', source, order)) + ? aggregatedBids.map(order => + parseIdexOrder(idexBidOrder.params, observedTimestamp, OrderType.Bid, source, order), + ) : []; const aggregatedAsks = aggregateOrders(idexOrderbook.asks); @@ -29,7 +31,9 @@ export function parseIdexOrders(idexOrderbook: IdexOrderbook, observedTimestamp: const idexAskOrder = idexOrderbook.asks[0]; const parsedAsks = aggregatedAsks.length > 0 - ? aggregatedAsks.map(order => parseIdexOrder(idexAskOrder.params, observedTimestamp, 'ask', source, order)) + ? aggregatedAsks.map(order => + parseIdexOrder(idexAskOrder.params, observedTimestamp, OrderType.Ask, source, order), + ) : []; return parsedBids.concat(parsedAsks); } @@ -62,7 +66,7 @@ export function parseIdexOrder( tokenOrder.baseVolume = amount; tokenOrder.quoteVolume = price.times(amount); - if (orderType === 'bid') { + if (orderType === OrderType.Bid) { tokenOrder.baseAssetSymbol = idexOrderParam.buySymbol; tokenOrder.baseAssetAddress = idexOrderParam.tokenBuy; tokenOrder.quoteAssetSymbol = idexOrderParam.sellSymbol; diff --git a/packages/pipeline/src/parsers/oasis_orders/index.ts b/packages/pipeline/src/parsers/oasis_orders/index.ts index 13997f31b..b71fb65b9 100644 --- a/packages/pipeline/src/parsers/oasis_orders/index.ts +++ b/packages/pipeline/src/parsers/oasis_orders/index.ts @@ -23,13 +23,13 @@ export function parseOasisOrders( observedTimestamp: number, source: string, ): TokenOrder[] { - const aggregatedBids = aggregateOrders(R.filter(R.propEq('act', 'bid'), oasisOrderbook)); - const aggregatedAsks = aggregateOrders(R.filter(R.propEq('act', 'ask'), oasisOrderbook)); + const aggregatedBids = aggregateOrders(R.filter(R.propEq('act', OrderType.Bid), oasisOrderbook)); + const aggregatedAsks = aggregateOrders(R.filter(R.propEq('act', OrderType.Ask), oasisOrderbook)); const parsedBids = aggregatedBids.map(order => - parseOasisOrder(oasisMarket, observedTimestamp, 'bid', source, order), + parseOasisOrder(oasisMarket, observedTimestamp, OrderType.Bid, source, order), ); const parsedAsks = aggregatedAsks.map(order => - parseOasisOrder(oasisMarket, observedTimestamp, 'ask', source, order), + parseOasisOrder(oasisMarket, observedTimestamp, OrderType.Ask, source, order), ); return parsedBids.concat(parsedAsks); } diff --git a/packages/pipeline/src/parsers/paradex_orders/index.ts b/packages/pipeline/src/parsers/paradex_orders/index.ts index 5ceeb64a4..85990dae4 100644 --- a/packages/pipeline/src/parsers/paradex_orders/index.ts +++ b/packages/pipeline/src/parsers/paradex_orders/index.ts @@ -21,10 +21,10 @@ export function parseParadexOrders( source: string, ): TokenOrder[] { const parsedBids = paradexOrderbookResponse.bids.map(order => - parseParadexOrder(paradexMarket, observedTimestamp, 'bid', source, order), + parseParadexOrder(paradexMarket, observedTimestamp, OrderType.Bid, source, order), ); const parsedAsks = paradexOrderbookResponse.asks.map(order => - parseParadexOrder(paradexMarket, observedTimestamp, 'ask', source, order), + parseParadexOrder(paradexMarket, observedTimestamp, OrderType.Ask, source, order), ); return parsedBids.concat(parsedAsks); } diff --git a/packages/pipeline/src/parsers/sra_orders/index.ts b/packages/pipeline/src/parsers/sra_orders/index.ts index ef8901e40..13fe632a4 100644 --- a/packages/pipeline/src/parsers/sra_orders/index.ts +++ b/packages/pipeline/src/parsers/sra_orders/index.ts @@ -4,7 +4,7 @@ import { AssetProxyId, ERC721AssetData } from '@0x/types'; import * as R from 'ramda'; import { SraOrder } from '../../entities'; -import { bigNumbertoStringOrNull } from '../../utils'; +import { bigNumbertoStringOrNull, convertAssetProxyIdToType } from '../../utils'; /** * Parses a raw order response from an SRA endpoint and returns an array of @@ -22,9 +22,7 @@ export function parseSraOrders(rawOrdersResponse: OrdersResponse): SraOrder[] { export function _convertToEntity(apiOrder: APIOrder): SraOrder { // TODO(albrow): refactor out common asset data decoding code. const makerAssetData = assetDataUtils.decodeAssetDataOrThrow(apiOrder.order.makerAssetData); - const makerAssetType = makerAssetData.assetProxyId === AssetProxyId.ERC20 ? 'erc20' : 'erc721'; const takerAssetData = assetDataUtils.decodeAssetDataOrThrow(apiOrder.order.takerAssetData); - const takerAssetType = takerAssetData.assetProxyId === AssetProxyId.ERC20 ? 'erc20' : 'erc721'; const sraOrder = new SraOrder(); sraOrder.exchangeAddress = apiOrder.order.exchangeAddress; @@ -43,16 +41,24 @@ export function _convertToEntity(apiOrder: APIOrder): SraOrder { sraOrder.signature = apiOrder.order.signature; sraOrder.rawMakerAssetData = apiOrder.order.makerAssetData; - sraOrder.makerAssetType = makerAssetType; + // tslint:disable-next-line:no-unnecessary-type-assertion + sraOrder.makerAssetType = convertAssetProxyIdToType(makerAssetData.assetProxyId as AssetProxyId); sraOrder.makerAssetProxyId = makerAssetData.assetProxyId; - sraOrder.makerTokenAddress = makerAssetData.tokenAddress; + // HACK(abandeali1): this event schema currently does not support multiple maker/taker assets, so we store the first token address from the MultiAssetProxy assetData + sraOrder.makerTokenAddress = assetDataUtils.isMultiAssetData(makerAssetData) + ? assetDataUtils.decodeMultiAssetDataRecursively(apiOrder.order.makerAssetData).nestedAssetData[0].tokenAddress + : makerAssetData.tokenAddress; // tslint has a false positive here. Type assertion is required. // tslint:disable-next-line:no-unnecessary-type-assertion sraOrder.makerTokenId = bigNumbertoStringOrNull((makerAssetData as ERC721AssetData).tokenId); sraOrder.rawTakerAssetData = apiOrder.order.takerAssetData; - sraOrder.takerAssetType = takerAssetType; + // tslint:disable-next-line:no-unnecessary-type-assertion + sraOrder.takerAssetType = convertAssetProxyIdToType(takerAssetData.assetProxyId as AssetProxyId); sraOrder.takerAssetProxyId = takerAssetData.assetProxyId; - sraOrder.takerTokenAddress = takerAssetData.tokenAddress; + // HACK(abandeali1): this event schema currently does not support multiple maker/taker assets, so we store the first token address from the MultiAssetProxy assetData + sraOrder.takerTokenAddress = assetDataUtils.isMultiAssetData(takerAssetData) + ? assetDataUtils.decodeMultiAssetDataRecursively(apiOrder.order.takerAssetData).nestedAssetData[0].tokenAddress + : takerAssetData.tokenAddress; // tslint:disable-next-line:no-unnecessary-type-assertion sraOrder.takerTokenId = bigNumbertoStringOrNull((takerAssetData as ERC721AssetData).tokenId); diff --git a/packages/pipeline/src/scripts/pull_copper.ts b/packages/pipeline/src/scripts/pull_copper.ts new file mode 100644 index 000000000..69814f209 --- /dev/null +++ b/packages/pipeline/src/scripts/pull_copper.ts @@ -0,0 +1,129 @@ +// tslint:disable:no-console +import * as R from 'ramda'; +import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm'; + +import { CopperEndpoint, CopperSearchParams, CopperSource } from '../data_sources/copper'; +import { CopperActivity, CopperActivityType, CopperCustomField, CopperLead, CopperOpportunity } from '../entities'; +import * as ormConfig from '../ormconfig'; +import { + CopperSearchResponse, + parseActivities, + parseActivityTypes, + parseCustomFields, + parseLeads, + parseOpportunities, +} from '../parsers/copper'; +import { handleError } from '../utils'; +const ONE_SECOND = 1000; +const COPPER_RATE_LIMIT = 10; +let connection: Connection; + +(async () => { + connection = await createConnection(ormConfig as ConnectionOptions); + + const accessToken = process.env.COPPER_ACCESS_TOKEN; + const userEmail = process.env.COPPER_USER_EMAIL; + if (accessToken === undefined || userEmail === undefined) { + throw new Error('Missing required env var: COPPER_ACCESS_TOKEN and/or COPPER_USER_EMAIL'); + } + const source = new CopperSource(COPPER_RATE_LIMIT, accessToken, userEmail); + + const fetchPromises = [ + fetchAndSaveLeadsAsync(source), + fetchAndSaveOpportunitiesAsync(source), + fetchAndSaveActivitiesAsync(source), + fetchAndSaveCustomFieldsAsync(source), + fetchAndSaveActivityTypesAsync(source), + ]; + fetchPromises.forEach(async fn => { + await fn; + }); +})().catch(handleError); + +async function fetchAndSaveLeadsAsync(source: CopperSource): Promise<void> { + const repository = connection.getRepository(CopperLead); + const startTime = await getMaxAsync(connection, 'date_modified', 'raw.copper_leads'); + console.log(`Fetching Copper leads starting from ${startTime}...`); + await fetchAndSaveAsync(CopperEndpoint.Leads, source, startTime, {}, parseLeads, repository); +} + +async function fetchAndSaveOpportunitiesAsync(source: CopperSource): Promise<void> { + const repository = connection.getRepository(CopperOpportunity); + const startTime = await getMaxAsync(connection, 'date_modified', 'raw.copper_opportunities'); + console.log(`Fetching Copper opportunities starting from ${startTime}...`); + await fetchAndSaveAsync( + CopperEndpoint.Opportunities, + source, + startTime, + { sort_by: 'name' }, + parseOpportunities, + repository, + ); +} + +async function fetchAndSaveActivitiesAsync(source: CopperSource): Promise<void> { + const repository = connection.getRepository(CopperActivity); + const startTime = await getMaxAsync(connection, 'date_modified', 'raw.copper_activities'); + const searchParams = { + minimum_activity_date: Math.floor(startTime / ONE_SECOND), + }; + console.log(`Fetching Copper activities starting from ${startTime}...`); + await fetchAndSaveAsync(CopperEndpoint.Activities, source, startTime, searchParams, parseActivities, repository); +} + +async function getMaxAsync(conn: Connection, sortColumn: string, tableName: string): Promise<number> { + const queryResult = await conn.query(`SELECT MAX(${sortColumn}) as _max from ${tableName};`); + if (R.isEmpty(queryResult)) { + return 0; + } else { + return queryResult[0]._max; + } +} + +// (Xianny): Copper API doesn't allow queries to filter by date. To ensure that we are filling in ascending chronological +// order and not missing any records, we are scraping all available pages. If Copper data gets larger, +// it would make sense to search for and start filling from the first page that contains a new record. +// This search would increase our network calls and is not efficient to implement with our current small volume +// of Copper records. +async function fetchAndSaveAsync<T extends CopperSearchResponse, E>( + endpoint: CopperEndpoint, + source: CopperSource, + startTime: number, + searchParams: CopperSearchParams, + parseFn: (recs: T[]) => E[], + repository: Repository<E>, +): Promise<void> { + let saved = 0; + const numPages = await source.fetchNumberOfPagesAsync(endpoint); + try { + for (let i = numPages; i > 0; i--) { + console.log(`Fetching page ${i}/${numPages} of ${endpoint}...`); + const raw = await source.fetchSearchResultsAsync<T>(endpoint, { + ...searchParams, + page_number: i, + }); + const newRecords = raw.filter(rec => rec.date_modified * ONE_SECOND > startTime); + const parsed = parseFn(newRecords); + await repository.save<any>(parsed); + saved += newRecords.length; + } + } catch (err) { + console.log(`Error fetching ${endpoint}, stopping: ${err.stack}`); + } finally { + console.log(`Saved ${saved} items from ${endpoint}, done.`); + } +} + +async function fetchAndSaveActivityTypesAsync(source: CopperSource): Promise<void> { + console.log(`Fetching Copper activity types...`); + const activityTypes = await source.fetchActivityTypesAsync(); + const repository = connection.getRepository(CopperActivityType); + await repository.save(parseActivityTypes(activityTypes)); +} + +async function fetchAndSaveCustomFieldsAsync(source: CopperSource): Promise<void> { + console.log(`Fetching Copper custom fields...`); + const customFields = await source.fetchCustomFieldsAsync(); + const repository = connection.getRepository(CopperCustomField); + await repository.save(parseCustomFields(customFields)); +} diff --git a/packages/pipeline/src/scripts/pull_missing_blocks.ts b/packages/pipeline/src/scripts/pull_missing_blocks.ts index bb5385126..ced9d99eb 100644 --- a/packages/pipeline/src/scripts/pull_missing_blocks.ts +++ b/packages/pipeline/src/scripts/pull_missing_blocks.ts @@ -14,20 +14,29 @@ import { handleError, INFURA_ROOT_URL } from '../utils'; // Number of blocks to save at once. const BATCH_SAVE_SIZE = 1000; // Maximum number of requests to send at once. -const MAX_CONCURRENCY = 10; +const MAX_CONCURRENCY = 20; // Maximum number of blocks to query for at once. This is also the maximum // number of blocks we will hold in memory prior to being saved to the database. const MAX_BLOCKS_PER_QUERY = 1000; let connection: Connection; +const tablesWithMissingBlocks = [ + 'raw.exchange_fill_events', + 'raw.exchange_cancel_events', + 'raw.exchange_cancel_up_to_events', + 'raw.erc20_approval_events', +]; + (async () => { connection = await createConnection(ormConfig as ConnectionOptions); const provider = web3Factory.getRpcProvider({ rpcUrl: INFURA_ROOT_URL, }); const web3Source = new Web3Source(provider); - await getAllMissingBlocksAsync(web3Source); + for (const tableName of tablesWithMissingBlocks) { + await getAllMissingBlocksAsync(web3Source, tableName); + } process.exit(0); })().catch(handleError); @@ -35,10 +44,11 @@ interface MissingBlocksResponse { block_number: string; } -async function getAllMissingBlocksAsync(web3Source: Web3Source): Promise<void> { +async function getAllMissingBlocksAsync(web3Source: Web3Source, tableName: string): Promise<void> { const blocksRepository = connection.getRepository(Block); while (true) { - const blockNumbers = await getMissingBlockNumbersAsync(); + console.log(`Checking for missing blocks in ${tableName}...`); + const blockNumbers = await getMissingBlockNumbersAsync(tableName); if (blockNumbers.length === 0) { // There are no more missing blocks. We're done. break; @@ -46,24 +56,14 @@ async function getAllMissingBlocksAsync(web3Source: Web3Source): Promise<void> { await getAndSaveBlocksAsync(web3Source, blocksRepository, blockNumbers); } const totalBlocks = await blocksRepository.count(); - console.log(`Done saving blocks. There are now ${totalBlocks} total blocks.`); + console.log(`Done saving blocks for ${tableName}. There are now ${totalBlocks} total blocks.`); } -async function getMissingBlockNumbersAsync(): Promise<number[]> { - // Note(albrow): The easiest way to get all the blocks we need is to - // consider all the events tables together in a single query. If this query - // gets too slow, we should consider re-architecting so that we can work on - // getting the blocks for one type of event at a time. +async function getMissingBlockNumbersAsync(tableName: string): Promise<number[]> { + // This query returns up to `MAX_BLOCKS_PER_QUERY` distinct block numbers + // which are present in `tableName` but not in `raw.blocks`. const response = (await connection.query( - `WITH all_events AS ( - SELECT block_number FROM raw.exchange_fill_events - UNION SELECT block_number FROM raw.exchange_cancel_events - UNION SELECT block_number FROM raw.exchange_cancel_up_to_events - UNION SELECT block_number FROM raw.erc20_approval_events - ) - SELECT DISTINCT(block_number) FROM all_events - WHERE block_number NOT IN (SELECT number FROM raw.blocks) - ORDER BY block_number ASC LIMIT $1`, + `SELECT DISTINCT(block_number) FROM ${tableName} LEFT JOIN raw.blocks ON ${tableName}.block_number = raw.blocks.number WHERE number IS NULL LIMIT $1;`, [MAX_BLOCKS_PER_QUERY], )) as MissingBlocksResponse[]; const blockNumberStrings = R.pluck('block_number', response); @@ -86,4 +86,5 @@ async function getAndSaveBlocksAsync( const blocks = R.map(parseBlock, rawBlocks); console.log(`Saving ${blocks.length} blocks...`); await blocksRepository.save(blocks, { chunk: Math.ceil(blocks.length / BATCH_SAVE_SIZE) }); + console.log('Done saving this batch of blocks'); } diff --git a/packages/pipeline/src/types.ts b/packages/pipeline/src/types.ts index e02b42a40..5f2121807 100644 --- a/packages/pipeline/src/types.ts +++ b/packages/pipeline/src/types.ts @@ -1,2 +1,9 @@ -export type AssetType = 'erc20' | 'erc721'; -export type OrderType = 'bid' | 'ask'; +export enum AssetType { + ERC20 = 'erc20', + ERC721 = 'erc721', + MultiAsset = 'multiAsset', +} +export enum OrderType { + Bid = 'bid', + Ask = 'ask', +} diff --git a/packages/pipeline/src/utils/transformers/asset_proxy_id_types.ts b/packages/pipeline/src/utils/transformers/asset_proxy_id_types.ts new file mode 100644 index 000000000..2cd05a616 --- /dev/null +++ b/packages/pipeline/src/utils/transformers/asset_proxy_id_types.ts @@ -0,0 +1,20 @@ +import { AssetProxyId } from '@0x/types'; + +import { AssetType } from '../../types'; + +/** + * Converts an assetProxyId to its string equivalent + * @param assetProxyId Id of AssetProxy + */ +export function convertAssetProxyIdToType(assetProxyId: AssetProxyId): AssetType { + switch (assetProxyId) { + case AssetProxyId.ERC20: + return AssetType.ERC20; + case AssetProxyId.ERC721: + return AssetType.ERC721; + case AssetProxyId.MultiAsset: + return AssetType.MultiAsset; + default: + throw new Error(`${assetProxyId} not a supported assetProxyId`); + } +} diff --git a/packages/pipeline/src/utils/transformers/index.ts b/packages/pipeline/src/utils/transformers/index.ts index 232c1c5de..31a4c9223 100644 --- a/packages/pipeline/src/utils/transformers/index.ts +++ b/packages/pipeline/src/utils/transformers/index.ts @@ -1,2 +1,3 @@ export * from './big_number'; export * from './number_to_bigint'; +export * from './asset_proxy_id_types'; diff --git a/packages/pipeline/src/utils/transformers/number_to_bigint.ts b/packages/pipeline/src/utils/transformers/number_to_bigint.ts index 85560c1f0..9736d7c18 100644 --- a/packages/pipeline/src/utils/transformers/number_to_bigint.ts +++ b/packages/pipeline/src/utils/transformers/number_to_bigint.ts @@ -9,8 +9,12 @@ const decimalRadix = 10; // https://github.com/typeorm/typeorm/issues/2400 for more information. export class NumberToBigIntTransformer implements ValueTransformer { // tslint:disable-next-line:prefer-function-over-method - public to(value: number): string { - return value.toString(); + public to(value: number): string | null { + if (value === null || value === undefined) { + return null; + } else { + return value.toString(); + } } // tslint:disable-next-line:prefer-function-over-method |