aboutsummaryrefslogtreecommitdiffstats
path: root/packages/pipeline/src/scripts/pull_copper.ts
blob: 69814f2095774767595af87eb186d32e755813c7 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
// tslint:disable:no-console
import * as R from 'ramda';
import { Connection, ConnectionOptions, createConnection, Repository } from 'typeorm';

import { CopperEndpoint, CopperSearchParams, CopperSource } from '../data_sources/copper';
import { CopperActivity, CopperActivityType, CopperCustomField, CopperLead, CopperOpportunity } from '../entities';
import * as ormConfig from '../ormconfig';
import {
    CopperSearchResponse,
    parseActivities,
    parseActivityTypes,
    parseCustomFields,
    parseLeads,
    parseOpportunities,
} from '../parsers/copper';
import { handleError } from '../utils';
const ONE_SECOND = 1000;
const COPPER_RATE_LIMIT = 10;
let connection: Connection;

(async () => {
    connection = await createConnection(ormConfig as ConnectionOptions);

    const accessToken = process.env.COPPER_ACCESS_TOKEN;
    const userEmail = process.env.COPPER_USER_EMAIL;
    if (accessToken === undefined || userEmail === undefined) {
        throw new Error('Missing required env var: COPPER_ACCESS_TOKEN and/or COPPER_USER_EMAIL');
    }
    const source = new CopperSource(COPPER_RATE_LIMIT, accessToken, userEmail);

    const fetchPromises = [
        fetchAndSaveLeadsAsync(source),
        fetchAndSaveOpportunitiesAsync(source),
        fetchAndSaveActivitiesAsync(source),
        fetchAndSaveCustomFieldsAsync(source),
        fetchAndSaveActivityTypesAsync(source),
    ];
    fetchPromises.forEach(async fn => {
        await fn;
    });
})().catch(handleError);

async function fetchAndSaveLeadsAsync(source: CopperSource): Promise<void> {
    const repository = connection.getRepository(CopperLead);
    const startTime = await getMaxAsync(connection, 'date_modified', 'raw.copper_leads');
    console.log(`Fetching Copper leads starting from ${startTime}...`);
    await fetchAndSaveAsync(CopperEndpoint.Leads, source, startTime, {}, parseLeads, repository);
}

async function fetchAndSaveOpportunitiesAsync(source: CopperSource): Promise<void> {
    const repository = connection.getRepository(CopperOpportunity);
    const startTime = await getMaxAsync(connection, 'date_modified', 'raw.copper_opportunities');
    console.log(`Fetching Copper opportunities starting from ${startTime}...`);
    await fetchAndSaveAsync(
        CopperEndpoint.Opportunities,
        source,
        startTime,
        { sort_by: 'name' },
        parseOpportunities,
        repository,
    );
}

async function fetchAndSaveActivitiesAsync(source: CopperSource): Promise<void> {
    const repository = connection.getRepository(CopperActivity);
    const startTime = await getMaxAsync(connection, 'date_modified', 'raw.copper_activities');
    const searchParams = {
        minimum_activity_date: Math.floor(startTime / ONE_SECOND),
    };
    console.log(`Fetching Copper activities starting from ${startTime}...`);
    await fetchAndSaveAsync(CopperEndpoint.Activities, source, startTime, searchParams, parseActivities, repository);
}

async function getMaxAsync(conn: Connection, sortColumn: string, tableName: string): Promise<number> {
    const queryResult = await conn.query(`SELECT MAX(${sortColumn}) as _max from ${tableName};`);
    if (R.isEmpty(queryResult)) {
        return 0;
    } else {
        return queryResult[0]._max;
    }
}

// (Xianny): Copper API doesn't allow queries to filter by date. To ensure that we are filling in ascending chronological
// order and not missing any records, we are scraping all available pages. If Copper data gets larger,
// it would make sense to search for and start filling from the first page that contains a new record.
// This search would increase our network calls and is not efficient to implement with our current small volume
// of Copper records.
async function fetchAndSaveAsync<T extends CopperSearchResponse, E>(
    endpoint: CopperEndpoint,
    source: CopperSource,
    startTime: number,
    searchParams: CopperSearchParams,
    parseFn: (recs: T[]) => E[],
    repository: Repository<E>,
): Promise<void> {
    let saved = 0;
    const numPages = await source.fetchNumberOfPagesAsync(endpoint);
    try {
        for (let i = numPages; i > 0; i--) {
            console.log(`Fetching page ${i}/${numPages} of ${endpoint}...`);
            const raw = await source.fetchSearchResultsAsync<T>(endpoint, {
                ...searchParams,
                page_number: i,
            });
            const newRecords = raw.filter(rec => rec.date_modified * ONE_SECOND > startTime);
            const parsed = parseFn(newRecords);
            await repository.save<any>(parsed);
            saved += newRecords.length;
        }
    } catch (err) {
        console.log(`Error fetching ${endpoint}, stopping: ${err.stack}`);
    } finally {
        console.log(`Saved ${saved} items from ${endpoint}, done.`);
    }
}

async function fetchAndSaveActivityTypesAsync(source: CopperSource): Promise<void> {
    console.log(`Fetching Copper activity types...`);
    const activityTypes = await source.fetchActivityTypesAsync();
    const repository = connection.getRepository(CopperActivityType);
    await repository.save(parseActivityTypes(activityTypes));
}

async function fetchAndSaveCustomFieldsAsync(source: CopperSource): Promise<void> {
    console.log(`Fetching Copper custom fields...`);
    const customFields = await source.fetchCustomFieldsAsync();
    const repository = connection.getRepository(CopperCustomField);
    await repository.save(parseCustomFields(customFields));
}