aboutsummaryrefslogtreecommitdiffstats
path: root/packages/pipeline/src
diff options
context:
space:
mode:
Diffstat (limited to 'packages/pipeline/src')
-rw-r--r--packages/pipeline/src/data_sources/copper/index.ts126
-rw-r--r--packages/pipeline/src/entities/copper_activity.ts41
-rw-r--r--packages/pipeline/src/entities/copper_activity_type.ts17
-rw-r--r--packages/pipeline/src/entities/copper_custom_field.ts15
-rw-r--r--packages/pipeline/src/entities/copper_lead.ts38
-rw-r--r--packages/pipeline/src/entities/copper_opportunity.ts45
-rw-r--r--packages/pipeline/src/entities/index.ts6
-rw-r--r--packages/pipeline/src/ormconfig.ts10
-rw-r--r--packages/pipeline/src/parsers/copper/index.ts259
-rw-r--r--packages/pipeline/src/scripts/pull_copper.ts129
-rw-r--r--packages/pipeline/src/utils/transformers/number_to_bigint.ts8
11 files changed, 692 insertions, 2 deletions
diff --git a/packages/pipeline/src/data_sources/copper/index.ts b/packages/pipeline/src/data_sources/copper/index.ts
new file mode 100644
index 000000000..15df2fd7d
--- /dev/null
+++ b/packages/pipeline/src/data_sources/copper/index.ts
@@ -0,0 +1,126 @@
+import { fetchAsync } from '@0x/utils';
+import Bottleneck from 'bottleneck';
+
+import {
+ CopperActivityTypeCategory,
+ CopperActivityTypeResponse,
+ CopperCustomFieldResponse,
+ CopperSearchResponse,
+} from '../../parsers/copper';
+
+const HTTP_OK_STATUS = 200;
+const COPPER_URI = 'https://api.prosperworks.com/developer_api/v1';
+
+const DEFAULT_PAGINATION_PARAMS = {
+ page_size: 200,
+ sort_by: 'date_modified',
+ sort_direction: 'desc',
+};
+
+export type CopperSearchParams = CopperLeadSearchParams | CopperActivitySearchParams | CopperOpportunitySearchParams;
+export interface CopperLeadSearchParams {
+ page_number?: number;
+}
+
+export interface CopperActivitySearchParams {
+ minimum_activity_date: number;
+ page_number?: number;
+}
+
+export interface CopperOpportunitySearchParams {
+ sort_by: string; // must override the default 'date_modified' for this endpoint
+ page_number?: number;
+}
+export enum CopperEndpoint {
+ Leads = '/leads/search',
+ Opportunities = '/opportunities/search',
+ Activities = '/activities/search',
+}
+const ONE_SECOND = 1000;
+
+function httpErrorCheck(response: Response): void {
+ if (response.status !== HTTP_OK_STATUS) {
+ throw new Error(`HTTP error while scraping Copper: [${JSON.stringify(response)}]`);
+ }
+}
+export class CopperSource {
+ private readonly _accessToken: string;
+ private readonly _userEmail: string;
+ private readonly _defaultHeaders: any;
+ private readonly _limiter: Bottleneck;
+
+ constructor(maxConcurrentRequests: number, accessToken: string, userEmail: string) {
+ this._accessToken = accessToken;
+ this._userEmail = userEmail;
+ this._defaultHeaders = {
+ 'Content-Type': 'application/json',
+ 'X-PW-AccessToken': this._accessToken,
+ 'X-PW-Application': 'developer_api',
+ 'X-PW-UserEmail': this._userEmail,
+ };
+ this._limiter = new Bottleneck({
+ minTime: ONE_SECOND / maxConcurrentRequests,
+ reservoir: 30,
+ reservoirRefreshAmount: 30,
+ reservoirRefreshInterval: maxConcurrentRequests,
+ });
+ }
+
+ public async fetchNumberOfPagesAsync(endpoint: CopperEndpoint, searchParams?: CopperSearchParams): Promise<number> {
+ const resp = await this._limiter.schedule(() =>
+ fetchAsync(COPPER_URI + endpoint, {
+ method: 'POST',
+ body: JSON.stringify({ ...DEFAULT_PAGINATION_PARAMS, ...searchParams }),
+ headers: this._defaultHeaders,
+ }),
+ );
+
+ httpErrorCheck(resp);
+
+ // total number of records that match the request parameters
+ if (resp.headers.has('X-Pw-Total')) {
+ const totalRecords: number = parseInt(resp.headers.get('X-Pw-Total') as string, 10); // tslint:disable-line:custom-no-magic-numbers
+ return Math.ceil(totalRecords / DEFAULT_PAGINATION_PARAMS.page_size);
+ } else {
+ return 1;
+ }
+ }
+ public async fetchSearchResultsAsync<T extends CopperSearchResponse>(
+ endpoint: CopperEndpoint,
+ searchParams?: CopperSearchParams,
+ ): Promise<T[]> {
+ const request = { ...DEFAULT_PAGINATION_PARAMS, ...searchParams };
+ const response = await this._limiter.schedule(() =>
+ fetchAsync(COPPER_URI + endpoint, {
+ method: 'POST',
+ body: JSON.stringify(request),
+ headers: this._defaultHeaders,
+ }),
+ );
+ httpErrorCheck(response);
+ const json: T[] = await response.json();
+ return json;
+ }
+
+ public async fetchActivityTypesAsync(): Promise<Map<CopperActivityTypeCategory, CopperActivityTypeResponse[]>> {
+ const response = await this._limiter.schedule(() =>
+ fetchAsync(`${COPPER_URI}/activity_types`, {
+ method: 'GET',
+ headers: this._defaultHeaders,
+ }),
+ );
+ httpErrorCheck(response);
+ return response.json();
+ }
+
+ public async fetchCustomFieldsAsync(): Promise<CopperCustomFieldResponse[]> {
+ const response = await this._limiter.schedule(() =>
+ fetchAsync(`${COPPER_URI}/custom_field_definitions`, {
+ method: 'GET',
+ headers: this._defaultHeaders,
+ }),
+ );
+ httpErrorCheck(response);
+ return response.json();
+ }
+}
diff --git a/packages/pipeline/src/entities/copper_activity.ts b/packages/pipeline/src/entities/copper_activity.ts
new file mode 100644
index 000000000..cbc034285
--- /dev/null
+++ b/packages/pipeline/src/entities/copper_activity.ts
@@ -0,0 +1,41 @@
+import { Column, Entity, Index, PrimaryColumn } from 'typeorm';
+
+import { numberToBigIntTransformer } from '../utils';
+
+@Entity({ name: 'copper_activities', schema: 'raw' })
+export class CopperActivity {
+ @PrimaryColumn({ type: 'bigint', transformer: numberToBigIntTransformer })
+ public id!: number;
+
+ @Index()
+ @Column({ name: 'parent_id', type: 'bigint', transformer: numberToBigIntTransformer })
+ public parentId!: number;
+ @Column({ name: 'parent_type', type: 'varchar' })
+ public parentType!: string;
+
+ // join with CopperActivityType
+ @Index()
+ @Column({ name: 'type_id', type: 'bigint', transformer: numberToBigIntTransformer })
+ public typeId!: number;
+ @Column({ name: 'type_category', type: 'varchar' })
+ public typeCategory!: string;
+ @Column({ name: 'type_name', type: 'varchar', nullable: true })
+ public typeName?: string;
+
+ @Column({ name: 'user_id', type: 'bigint', transformer: numberToBigIntTransformer })
+ public userId!: number;
+ @Column({ name: 'old_value_id', type: 'bigint', nullable: true, transformer: numberToBigIntTransformer })
+ public oldValueId?: number;
+ @Column({ name: 'old_value_name', type: 'varchar', nullable: true })
+ public oldValueName?: string;
+ @Column({ name: 'new_value_id', type: 'bigint', nullable: true, transformer: numberToBigIntTransformer })
+ public newValueId?: number;
+ @Column({ name: 'new_value_name', type: 'varchar', nullable: true })
+ public newValueName?: string;
+
+ @Index()
+ @Column({ name: 'date_created', type: 'bigint', transformer: numberToBigIntTransformer })
+ public dateCreated!: number;
+ @PrimaryColumn({ name: 'date_modified', type: 'bigint', transformer: numberToBigIntTransformer })
+ public dateModified!: number;
+}
diff --git a/packages/pipeline/src/entities/copper_activity_type.ts b/packages/pipeline/src/entities/copper_activity_type.ts
new file mode 100644
index 000000000..8fb2dcf70
--- /dev/null
+++ b/packages/pipeline/src/entities/copper_activity_type.ts
@@ -0,0 +1,17 @@
+import { Column, Entity, PrimaryColumn } from 'typeorm';
+
+import { numberToBigIntTransformer } from '../utils';
+
+@Entity({ name: 'copper_activity_types', schema: 'raw' })
+export class CopperActivityType {
+ @PrimaryColumn({ type: 'bigint', transformer: numberToBigIntTransformer })
+ public id!: number;
+ @Column({ name: 'category', type: 'varchar' })
+ public category!: string;
+ @Column({ name: 'name', type: 'varchar' })
+ public name!: string;
+ @Column({ name: 'is_disabled', type: 'boolean', nullable: true })
+ public isDisabled?: boolean;
+ @Column({ name: 'count_as_interaction', type: 'boolean', nullable: true })
+ public countAsInteraction?: boolean;
+}
diff --git a/packages/pipeline/src/entities/copper_custom_field.ts b/packages/pipeline/src/entities/copper_custom_field.ts
new file mode 100644
index 000000000..f23f6ab22
--- /dev/null
+++ b/packages/pipeline/src/entities/copper_custom_field.ts
@@ -0,0 +1,15 @@
+import { Column, Entity, PrimaryColumn } from 'typeorm';
+
+import { numberToBigIntTransformer } from '../utils';
+
+@Entity({ name: 'copper_custom_fields', schema: 'raw' })
+export class CopperCustomField {
+ @PrimaryColumn({ type: 'bigint', transformer: numberToBigIntTransformer })
+ public id!: number;
+ @Column({ name: 'data_type', type: 'varchar' })
+ public dataType!: string;
+ @Column({ name: 'field_type', type: 'varchar', nullable: true })
+ public fieldType?: string;
+ @Column({ name: 'name', type: 'varchar' })
+ public name!: string;
+}
diff --git a/packages/pipeline/src/entities/copper_lead.ts b/packages/pipeline/src/entities/copper_lead.ts
new file mode 100644
index 000000000..c51ccd761
--- /dev/null
+++ b/packages/pipeline/src/entities/copper_lead.ts
@@ -0,0 +1,38 @@
+import { Column, Entity, Index, PrimaryColumn } from 'typeorm';
+
+import { numberToBigIntTransformer } from '../utils';
+
+@Entity({ name: 'copper_leads', schema: 'raw' })
+export class CopperLead {
+ @PrimaryColumn({ type: 'bigint', transformer: numberToBigIntTransformer })
+ public id!: number;
+
+ @Column({ name: 'name', type: 'varchar', nullable: true })
+ public name?: string;
+ @Column({ name: 'first_name', type: 'varchar', nullable: true })
+ public firstName?: string;
+ @Column({ name: 'last_name', type: 'varchar', nullable: true })
+ public lastName?: string;
+ @Column({ name: 'middle_name', type: 'varchar', nullable: true })
+ public middleName?: string;
+ @Column({ name: 'assignee_id', type: 'bigint', transformer: numberToBigIntTransformer, nullable: true })
+ public assigneeId?: number;
+ @Column({ name: 'company_name', type: 'varchar', nullable: true })
+ public companyName?: string;
+ @Column({ name: 'customer_source_id', type: 'bigint', transformer: numberToBigIntTransformer, nullable: true })
+ public customerSourceId?: number;
+ @Column({ name: 'monetary_value', type: 'integer', nullable: true })
+ public monetaryValue?: number;
+ @Column({ name: 'status', type: 'varchar' })
+ public status!: string;
+ @Column({ name: 'status_id', type: 'bigint', transformer: numberToBigIntTransformer })
+ public statusId!: number;
+ @Column({ name: 'title', type: 'varchar', nullable: true })
+ public title?: string;
+
+ @Index()
+ @Column({ name: 'date_created', type: 'bigint', transformer: numberToBigIntTransformer })
+ public dateCreated!: number;
+ @PrimaryColumn({ name: 'date_modified', type: 'bigint', transformer: numberToBigIntTransformer })
+ public dateModified!: number;
+}
diff --git a/packages/pipeline/src/entities/copper_opportunity.ts b/packages/pipeline/src/entities/copper_opportunity.ts
new file mode 100644
index 000000000..e12bd69ce
--- /dev/null
+++ b/packages/pipeline/src/entities/copper_opportunity.ts
@@ -0,0 +1,45 @@
+import { Column, Entity, PrimaryColumn } from 'typeorm';
+
+import { numberToBigIntTransformer } from '../utils';
+
+@Entity({ name: 'copper_opportunities', schema: 'raw' })
+export class CopperOpportunity {
+ @PrimaryColumn({ name: 'id', type: 'bigint', transformer: numberToBigIntTransformer })
+ public id!: number;
+ @Column({ name: 'name', type: 'varchar' })
+ public name!: string;
+ @Column({ name: 'assignee_id', nullable: true, type: 'bigint', transformer: numberToBigIntTransformer })
+ public assigneeId?: number;
+ @Column({ name: 'close_date', nullable: true, type: 'varchar' })
+ public closeDate?: string;
+ @Column({ name: 'company_id', nullable: true, type: 'bigint', transformer: numberToBigIntTransformer })
+ public companyId?: number;
+ @Column({ name: 'company_name', nullable: true, type: 'varchar' })
+ public companyName?: string;
+ @Column({ name: 'customer_source_id', nullable: true, type: 'bigint', transformer: numberToBigIntTransformer })
+ public customerSourceId?: number;
+ @Column({ name: 'loss_reason_id', nullable: true, type: 'bigint', transformer: numberToBigIntTransformer })
+ public lossReasonId?: number;
+ @Column({ name: 'pipeline_id', type: 'bigint', transformer: numberToBigIntTransformer })
+ public pipelineId!: number;
+ @Column({ name: 'pipeline_stage_id', type: 'bigint', transformer: numberToBigIntTransformer })
+ public pipelineStageId!: number;
+ @Column({ name: 'primary_contact_id', nullable: true, type: 'bigint', transformer: numberToBigIntTransformer })
+ public primaryContactId?: number;
+ @Column({ name: 'priority', nullable: true, type: 'varchar' })
+ public priority?: string;
+ @Column({ name: 'status', type: 'varchar' })
+ public status!: string;
+ @Column({ name: 'interaction_count', type: 'bigint', transformer: numberToBigIntTransformer })
+ public interactionCount!: number;
+ @Column({ name: 'monetary_value', nullable: true, type: 'integer' })
+ public monetaryValue?: number;
+ @Column({ name: 'win_probability', nullable: true, type: 'integer' })
+ public winProbability?: number;
+ @Column({ name: 'date_created', type: 'bigint', transformer: numberToBigIntTransformer })
+ public dateCreated!: number;
+ @PrimaryColumn({ name: 'date_modified', type: 'bigint', transformer: numberToBigIntTransformer })
+ public dateModified!: number;
+ @Column({ name: 'custom_fields', type: 'jsonb' })
+ public customFields!: { [key: number]: number };
+}
diff --git a/packages/pipeline/src/entities/index.ts b/packages/pipeline/src/entities/index.ts
index cc3de78bb..27c153c07 100644
--- a/packages/pipeline/src/entities/index.ts
+++ b/packages/pipeline/src/entities/index.ts
@@ -16,4 +16,10 @@ export { TokenOrderbookSnapshot } from './token_order';
export { Transaction } from './transaction';
export { ERC20ApprovalEvent } from './erc20_approval_event';
+export { CopperLead } from './copper_lead';
+export { CopperActivity } from './copper_activity';
+export { CopperOpportunity } from './copper_opportunity';
+export { CopperActivityType } from './copper_activity_type';
+export { CopperCustomField } from './copper_custom_field';
+
export type ExchangeEvent = ExchangeFillEvent | ExchangeCancelEvent | ExchangeCancelUpToEvent;
diff --git a/packages/pipeline/src/ormconfig.ts b/packages/pipeline/src/ormconfig.ts
index fe11d81d5..2700714cd 100644
--- a/packages/pipeline/src/ormconfig.ts
+++ b/packages/pipeline/src/ormconfig.ts
@@ -2,6 +2,11 @@ import { ConnectionOptions } from 'typeorm';
import {
Block,
+ CopperActivity,
+ CopperActivityType,
+ CopperCustomField,
+ CopperLead,
+ CopperOpportunity,
DexTrade,
ERC20ApprovalEvent,
ExchangeCancelEvent,
@@ -18,6 +23,11 @@ import {
const entities = [
Block,
+ CopperOpportunity,
+ CopperActivity,
+ CopperActivityType,
+ CopperCustomField,
+ CopperLead,
DexTrade,
ExchangeCancelEvent,
ExchangeCancelUpToEvent,
diff --git a/packages/pipeline/src/parsers/copper/index.ts b/packages/pipeline/src/parsers/copper/index.ts
new file mode 100644
index 000000000..6c0c5abd5
--- /dev/null
+++ b/packages/pipeline/src/parsers/copper/index.ts
@@ -0,0 +1,259 @@
+import * as R from 'ramda';
+
+import { CopperActivity, CopperActivityType, CopperCustomField, CopperLead, CopperOpportunity } from '../../entities';
+
+const ONE_SECOND = 1000;
+export type CopperSearchResponse = CopperLeadResponse | CopperActivityResponse | CopperOpportunityResponse;
+export interface CopperLeadResponse {
+ id: number;
+ name?: string;
+ first_name?: string;
+ last_name?: string;
+ middle_name?: string;
+ assignee_id?: number;
+ company_name?: string;
+ customer_source_id?: number;
+ monetary_value?: number;
+ status: string;
+ status_id: number;
+ title?: string;
+ date_created: number; // in seconds
+ date_modified: number; // in seconds
+}
+
+export interface CopperActivityResponse {
+ id: number;
+ parent: CopperActivityParentResponse;
+ type: CopperActivityTypeResponse;
+ user_id: number;
+ activity_date: number;
+ old_value: CopperActivityValueResponse;
+ new_value: CopperActivityValueResponse;
+ date_created: number; // in seconds
+ date_modified: number; // in seconds
+}
+
+export interface CopperActivityValueResponse {
+ id: number;
+ name: string;
+}
+export interface CopperActivityParentResponse {
+ id: number;
+ type: string;
+}
+
+// custom activity types
+export enum CopperActivityTypeCategory {
+ user = 'user',
+ system = 'system',
+}
+export interface CopperActivityTypeResponse {
+ id: number;
+ category: CopperActivityTypeCategory;
+ name: string;
+ is_disabled?: boolean;
+ count_as_interaction?: boolean;
+}
+
+export interface CopperOpportunityResponse {
+ id: number;
+ name: string;
+ assignee_id?: number;
+ close_date?: string;
+ company_id?: number;
+ company_name?: string;
+ customer_source_id?: number;
+ loss_reason_id?: number;
+ pipeline_id: number;
+ pipeline_stage_id: number;
+ primary_contact_id?: number;
+ priority?: string;
+ status: string;
+ tags: string[];
+ interaction_count: number;
+ monetary_value?: number;
+ win_probability?: number;
+ date_created: number; // in seconds
+ date_modified: number; // in seconds
+ custom_fields: CopperNestedCustomFieldResponse[];
+}
+interface CopperNestedCustomFieldResponse {
+ custom_field_definition_id: number;
+ value: number | number[] | null;
+}
+// custom fields
+export enum CopperCustomFieldType {
+ String = 'String',
+ Text = 'Text',
+ Dropdown = 'Dropdown',
+ MultiSelect = 'MultiSelect', // not in API documentation but shows up in results
+ Date = 'Date',
+ Checkbox = 'Checkbox',
+ Float = 'Float',
+ URL = 'URL',
+ Percentage = 'Percentage',
+ Currency = 'Currency',
+ Connect = 'Connect',
+}
+export interface CopperCustomFieldOptionResponse {
+ id: number;
+ name: string;
+}
+export interface CopperCustomFieldResponse {
+ id: number;
+ name: string;
+ data_type: CopperCustomFieldType;
+ options?: CopperCustomFieldOptionResponse[];
+}
+/**
+ * Parse response from Copper API /search/leads/
+ *
+ * @param leads - The array of leads returned from the API
+ * @returns Returns an array of Copper Lead entities
+ */
+export function parseLeads(leads: CopperLeadResponse[]): CopperLead[] {
+ return leads.map(lead => {
+ const entity = new CopperLead();
+ entity.id = lead.id;
+ entity.name = lead.name || undefined;
+ entity.firstName = lead.first_name || undefined;
+ entity.lastName = lead.last_name || undefined;
+ entity.middleName = lead.middle_name || undefined;
+ entity.assigneeId = lead.assignee_id || undefined;
+ entity.companyName = lead.company_name || undefined;
+ entity.customerSourceId = lead.customer_source_id || undefined;
+ entity.monetaryValue = lead.monetary_value || undefined;
+ entity.status = lead.status;
+ entity.statusId = lead.status_id;
+ entity.title = lead.title || undefined;
+ entity.dateCreated = lead.date_created * ONE_SECOND;
+ entity.dateModified = lead.date_modified * ONE_SECOND;
+ return entity;
+ });
+}
+
+/**
+ * Parse response from Copper API /search/activities/
+ *
+ * @param activities - The array of activities returned from the API
+ * @returns Returns an array of Copper Activity entities
+ */
+export function parseActivities(activities: CopperActivityResponse[]): CopperActivity[] {
+ return activities.map(activity => {
+ const entity = new CopperActivity();
+ entity.id = activity.id;
+
+ entity.parentId = activity.parent.id;
+ entity.parentType = activity.parent.type;
+
+ entity.typeId = activity.type.id;
+ entity.typeCategory = activity.type.category.toString();
+ entity.typeName = activity.type.name;
+
+ entity.userId = activity.user_id;
+ entity.dateCreated = activity.date_created * ONE_SECOND;
+ entity.dateModified = activity.date_modified * ONE_SECOND;
+
+ // nested nullable fields
+ entity.oldValueId = R.path(['old_value', 'id'], activity);
+ entity.oldValueName = R.path(['old_value', 'name'], activity);
+ entity.newValueId = R.path(['new_value', 'id'], activity);
+ entity.newValueName = R.path(['new_value', 'name'], activity);
+
+ return entity;
+ });
+}
+
+/**
+ * Parse response from Copper API /search/opportunities/
+ *
+ * @param opportunities - The array of opportunities returned from the API
+ * @returns Returns an array of Copper Opportunity entities
+ */
+export function parseOpportunities(opportunities: CopperOpportunityResponse[]): CopperOpportunity[] {
+ return opportunities.map(opp => {
+ const customFields: { [key: number]: number } = opp.custom_fields
+ .filter(f => f.value !== null)
+ .map(f => ({
+ ...f,
+ value: ([] as number[]).concat(f.value || []), // normalise all values to number[]
+ }))
+ .map(f => f.value.map(val => [f.custom_field_definition_id, val] as [number, number])) // pair each value with the custom_field_definition_id
+ .reduce((acc, pair) => acc.concat(pair)) // flatten
+ .reduce<{ [key: number]: number }>((obj, [key, value]) => {
+ // transform into object literal
+ obj[key] = value;
+ return obj;
+ }, {});
+
+ const entity = new CopperOpportunity();
+ entity.id = opp.id;
+ entity.name = opp.name;
+ entity.assigneeId = opp.assignee_id || undefined;
+ entity.closeDate = opp.close_date || undefined;
+ entity.companyId = opp.company_id || undefined;
+ entity.companyName = opp.company_name || undefined;
+ entity.customerSourceId = opp.customer_source_id || undefined;
+ entity.lossReasonId = opp.loss_reason_id || undefined;
+ entity.pipelineId = opp.pipeline_id;
+ entity.pipelineStageId = opp.pipeline_stage_id;
+ entity.primaryContactId = opp.primary_contact_id || undefined;
+ entity.priority = opp.priority || undefined;
+ entity.status = opp.status;
+ entity.interactionCount = opp.interaction_count;
+ entity.monetaryValue = opp.monetary_value || undefined;
+ entity.winProbability = opp.win_probability === null ? undefined : opp.win_probability;
+ entity.dateCreated = opp.date_created * ONE_SECOND;
+ entity.dateModified = opp.date_modified * ONE_SECOND;
+ entity.customFields = customFields;
+ return entity;
+ });
+}
+
+/**
+ * Parse response from Copper API /activity_types/
+ *
+ * @param activityTypeResponse - Activity Types response from the API, keyed by "user" or "system"
+ * @returns Returns an array of Copper Activity Type entities
+ */
+export function parseActivityTypes(
+ activityTypeResponse: Map<CopperActivityTypeCategory, CopperActivityTypeResponse[]>,
+): CopperActivityType[] {
+ const values: CopperActivityTypeResponse[] = R.flatten(Object.values(activityTypeResponse));
+ return values.map(activityType => ({
+ id: activityType.id,
+ name: activityType.name,
+ category: activityType.category.toString(),
+ isDisabled: activityType.is_disabled,
+ countAsInteraction: activityType.count_as_interaction,
+ }));
+}
+
+/**
+ * Parse response from Copper API /custom_field_definitions/
+ *
+ * @param customFieldResponse - array of custom field definitions returned from the API, consisting of top-level fields and nested fields
+ * @returns Returns an array of Copper Custom Field entities
+ */
+export function parseCustomFields(customFieldResponse: CopperCustomFieldResponse[]): CopperCustomField[] {
+ function parseTopLevelField(field: CopperCustomFieldResponse): CopperCustomField[] {
+ const topLevelField: CopperCustomField = {
+ id: field.id,
+ name: field.name,
+ dataType: field.data_type.toString(),
+ };
+
+ if (field.options !== undefined) {
+ const nestedFields: CopperCustomField[] = field.options.map(option => ({
+ id: option.id,
+ name: option.name,
+ dataType: field.name,
+ fieldType: 'option',
+ }));
+ return nestedFields.concat(topLevelField);
+ } else {
+ return [topLevelField];
+ }
+ }
+ return R.chain(parseTopLevelField, customFieldResponse);
+}
diff --git a/packages/pipeline/src/scripts/pull_copper.ts b/packages/pipeline/src/scripts/pull_copper.ts
new file mode 100644
index 000000000..69814f209
--- /dev/null
+++ b/packages/pipeline/src/scripts/pull_copper.ts
@@ -0,0 +1,129 @@
+// tslint:disable:no-console
+import * as R from 'ramda';
+import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm';
+
+import { CopperEndpoint, CopperSearchParams, CopperSource } from '../data_sources/copper';
+import { CopperActivity, CopperActivityType, CopperCustomField, CopperLead, CopperOpportunity } from '../entities';
+import * as ormConfig from '../ormconfig';
+import {
+ CopperSearchResponse,
+ parseActivities,
+ parseActivityTypes,
+ parseCustomFields,
+ parseLeads,
+ parseOpportunities,
+} from '../parsers/copper';
+import { handleError } from '../utils';
+const ONE_SECOND = 1000;
+const COPPER_RATE_LIMIT = 10;
+let connection: Connection;
+
+(async () => {
+ connection = await createConnection(ormConfig as ConnectionOptions);
+
+ const accessToken = process.env.COPPER_ACCESS_TOKEN;
+ const userEmail = process.env.COPPER_USER_EMAIL;
+ if (accessToken === undefined || userEmail === undefined) {
+ throw new Error('Missing required env var: COPPER_ACCESS_TOKEN and/or COPPER_USER_EMAIL');
+ }
+ const source = new CopperSource(COPPER_RATE_LIMIT, accessToken, userEmail);
+
+ const fetchPromises = [
+ fetchAndSaveLeadsAsync(source),
+ fetchAndSaveOpportunitiesAsync(source),
+ fetchAndSaveActivitiesAsync(source),
+ fetchAndSaveCustomFieldsAsync(source),
+ fetchAndSaveActivityTypesAsync(source),
+ ];
+ fetchPromises.forEach(async fn => {
+ await fn;
+ });
+})().catch(handleError);
+
+async function fetchAndSaveLeadsAsync(source: CopperSource): Promise<void> {
+ const repository = connection.getRepository(CopperLead);
+ const startTime = await getMaxAsync(connection, 'date_modified', 'raw.copper_leads');
+ console.log(`Fetching Copper leads starting from ${startTime}...`);
+ await fetchAndSaveAsync(CopperEndpoint.Leads, source, startTime, {}, parseLeads, repository);
+}
+
+async function fetchAndSaveOpportunitiesAsync(source: CopperSource): Promise<void> {
+ const repository = connection.getRepository(CopperOpportunity);
+ const startTime = await getMaxAsync(connection, 'date_modified', 'raw.copper_opportunities');
+ console.log(`Fetching Copper opportunities starting from ${startTime}...`);
+ await fetchAndSaveAsync(
+ CopperEndpoint.Opportunities,
+ source,
+ startTime,
+ { sort_by: 'name' },
+ parseOpportunities,
+ repository,
+ );
+}
+
+async function fetchAndSaveActivitiesAsync(source: CopperSource): Promise<void> {
+ const repository = connection.getRepository(CopperActivity);
+ const startTime = await getMaxAsync(connection, 'date_modified', 'raw.copper_activities');
+ const searchParams = {
+ minimum_activity_date: Math.floor(startTime / ONE_SECOND),
+ };
+ console.log(`Fetching Copper activities starting from ${startTime}...`);
+ await fetchAndSaveAsync(CopperEndpoint.Activities, source, startTime, searchParams, parseActivities, repository);
+}
+
+async function getMaxAsync(conn: Connection, sortColumn: string, tableName: string): Promise<number> {
+ const queryResult = await conn.query(`SELECT MAX(${sortColumn}) as _max from ${tableName};`);
+ if (R.isEmpty(queryResult)) {
+ return 0;
+ } else {
+ return queryResult[0]._max;
+ }
+}
+
+// (Xianny): Copper API doesn't allow queries to filter by date. To ensure that we are filling in ascending chronological
+// order and not missing any records, we are scraping all available pages. If Copper data gets larger,
+// it would make sense to search for and start filling from the first page that contains a new record.
+// This search would increase our network calls and is not efficient to implement with our current small volume
+// of Copper records.
+async function fetchAndSaveAsync<T extends CopperSearchResponse, E>(
+ endpoint: CopperEndpoint,
+ source: CopperSource,
+ startTime: number,
+ searchParams: CopperSearchParams,
+ parseFn: (recs: T[]) => E[],
+ repository: Repository<E>,
+): Promise<void> {
+ let saved = 0;
+ const numPages = await source.fetchNumberOfPagesAsync(endpoint);
+ try {
+ for (let i = numPages; i > 0; i--) {
+ console.log(`Fetching page ${i}/${numPages} of ${endpoint}...`);
+ const raw = await source.fetchSearchResultsAsync<T>(endpoint, {
+ ...searchParams,
+ page_number: i,
+ });
+ const newRecords = raw.filter(rec => rec.date_modified * ONE_SECOND > startTime);
+ const parsed = parseFn(newRecords);
+ await repository.save<any>(parsed);
+ saved += newRecords.length;
+ }
+ } catch (err) {
+ console.log(`Error fetching ${endpoint}, stopping: ${err.stack}`);
+ } finally {
+ console.log(`Saved ${saved} items from ${endpoint}, done.`);
+ }
+}
+
+async function fetchAndSaveActivityTypesAsync(source: CopperSource): Promise<void> {
+ console.log(`Fetching Copper activity types...`);
+ const activityTypes = await source.fetchActivityTypesAsync();
+ const repository = connection.getRepository(CopperActivityType);
+ await repository.save(parseActivityTypes(activityTypes));
+}
+
+async function fetchAndSaveCustomFieldsAsync(source: CopperSource): Promise<void> {
+ console.log(`Fetching Copper custom fields...`);
+ const customFields = await source.fetchCustomFieldsAsync();
+ const repository = connection.getRepository(CopperCustomField);
+ await repository.save(parseCustomFields(customFields));
+}
diff --git a/packages/pipeline/src/utils/transformers/number_to_bigint.ts b/packages/pipeline/src/utils/transformers/number_to_bigint.ts
index 85560c1f0..9736d7c18 100644
--- a/packages/pipeline/src/utils/transformers/number_to_bigint.ts
+++ b/packages/pipeline/src/utils/transformers/number_to_bigint.ts
@@ -9,8 +9,12 @@ const decimalRadix = 10;
// https://github.com/typeorm/typeorm/issues/2400 for more information.
export class NumberToBigIntTransformer implements ValueTransformer {
// tslint:disable-next-line:prefer-function-over-method
- public to(value: number): string {
- return value.toString();
+ public to(value: number): string | null {
+ if (value === null || value === undefined) {
+ return null;
+ } else {
+ return value.toString();
+ }
}
// tslint:disable-next-line:prefer-function-over-method