aboutsummaryrefslogtreecommitdiffstats
path: root/packages/pipeline/src/scripts/pull_copper.ts
blob: 5e4a6a64389496c51162f82c0ca687dda93ee98b (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
import * as R from 'ramda';
import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm';

import { logUtils } from '@0x/utils';

import { CopperEndpoint, CopperSearchParams, CopperSource } from '../data_sources/copper';
import { CopperActivity, CopperActivityType, CopperCustomField, CopperLead, CopperOpportunity } from '../entities';
import * as ormConfig from '../ormconfig';
import {
    CopperSearchResponse,
    parseActivities,
    parseActivityTypes,
    parseCustomFields,
    parseLeads,
    parseOpportunities,
} from '../parsers/copper';
import { handleError } from '../utils';
const ONE_SECOND = 1000;
const COPPER_RATE_LIMIT = 10;
let connection: Connection;

(async () => {
    connection = await createConnection(ormConfig as ConnectionOptions);

    const accessToken = process.env.COPPER_ACCESS_TOKEN;
    const userEmail = process.env.COPPER_USER_EMAIL;
    if (accessToken === undefined || userEmail === undefined) {
        throw new Error('Missing required env var: COPPER_ACCESS_TOKEN and/or COPPER_USER_EMAIL');
    }
    const source = new CopperSource(COPPER_RATE_LIMIT, accessToken, userEmail);

    const fetchPromises = [
        fetchAndSaveLeadsAsync(source),
        fetchAndSaveOpportunitiesAsync(source),
        fetchAndSaveActivitiesAsync(source),
        fetchAndSaveCustomFieldsAsync(source),
        fetchAndSaveActivityTypesAsync(source),
    ];
    fetchPromises.forEach(async fn => {
        await fn;
    });
})().catch(handleError);

async function fetchAndSaveLeadsAsync(source: CopperSource): Promise<void> {
    const repository = connection.getRepository(CopperLead);
    const startTime = await getMaxAsync(connection, 'date_modified', 'raw.copper_leads');
    logUtils.log(`Fetching Copper leads starting from ${startTime}...`);
    await fetchAndSaveAsync(CopperEndpoint.Leads, source, startTime, {}, parseLeads, repository);
}

async function fetchAndSaveOpportunitiesAsync(source: CopperSource): Promise<void> {
    const repository = connection.getRepository(CopperOpportunity);
    const startTime = await getMaxAsync(connection, 'date_modified', 'raw.copper_opportunities');
    logUtils.log(`Fetching Copper opportunities starting from ${startTime}...`);
    await fetchAndSaveAsync(
        CopperEndpoint.Opportunities,
        source,
        startTime,
        { sort_by: 'name' },
        parseOpportunities,
        repository,
    );
}

async function fetchAndSaveActivitiesAsync(source: CopperSource): Promise<void> {
    const repository = connection.getRepository(CopperActivity);
    const startTime = await getMaxAsync(connection, 'date_modified', 'raw.copper_activities');
    const searchParams = {
        minimum_activity_date: Math.floor(startTime / ONE_SECOND),
    };
    logUtils.log(`Fetching Copper activities starting from ${startTime}...`);
    await fetchAndSaveAsync(CopperEndpoint.Activities, source, startTime, searchParams, parseActivities, repository);
}

async function getMaxAsync(conn: Connection, sortColumn: string, tableName: string): Promise<number> {
    const queryResult = await conn.query(`SELECT MAX(${sortColumn}) as _max from ${tableName};`);
    if (R.isEmpty(queryResult)) {
        return 0;
    } else {
        return queryResult[0]._max;
    }
}

// (Xianny): Copper API doesn't allow queries to filter by date. To ensure that we are filling in ascending chronological
// order and not missing any records, we are scraping all available pages. If Copper data gets larger,
// it would make sense to search for and start filling from the first page that contains a new record.
// This search would increase our network calls and is not efficient to implement with our current small volume
// of Copper records.
async function fetchAndSaveAsync<T extends CopperSearchResponse, E>(
    endpoint: CopperEndpoint,
    source: CopperSource,
    startTime: number,
    searchParams: CopperSearchParams,
    parseFn: (recs: T[]) => E[],
    repository: Repository<E>,
): Promise<void> {
    let saved = 0;
    const numPages = await source.fetchNumberOfPagesAsync(endpoint);
    try {
        for (let i = numPages; i > 0; i--) {
            logUtils.log(`Fetching page ${i}/${numPages} of ${endpoint}...`);
            const raw = await source.fetchSearchResultsAsync<T>(endpoint, {
                ...searchParams,
                page_number: i,
            });
            const newRecords = raw.filter(rec => rec.date_modified * ONE_SECOND > startTime);
            const parsed = parseFn(newRecords);
            await repository.save<any>(parsed);
            saved += newRecords.length;
        }
    } catch (err) {
        logUtils.log(`Error fetching ${endpoint}, stopping: ${err.stack}`);
    } finally {
        logUtils.log(`Saved ${saved} items from ${endpoint}, done.`);
    }
}

async function fetchAndSaveActivityTypesAsync(source: CopperSource): Promise<void> {
    logUtils.log(`Fetching Copper activity types...`);
    const activityTypes = await source.fetchActivityTypesAsync();
    const repository = connection.getRepository(CopperActivityType);
    await repository.save(parseActivityTypes(activityTypes));
}

async function fetchAndSaveCustomFieldsAsync(source: CopperSource): Promise<void> {
    logUtils.log(`Fetching Copper custom fields...`);
    const customFields = await source.fetchCustomFieldsAsync();
    const repository = connection.getRepository(CopperCustomField);
    await repository.save(parseCustomFields(customFields));
}