NodeJS Runner for Connectors
A Node.js utility for running OWOX Data Marts connectors in isolated environments.
The NodeJS connector runner allows you to execute data connectors on your machine in isolated Node.js environments. This is particularly useful for development, testing, debugging, and production runs of data integration connectors.
Key Features
Section titled “Key Features”- Isolated Environment Creation: Creates separate Node.js environments for each connector run
- Automatic Dependency Management: Installs required dependencies for each connector
- Configuration Validation: Validates connector and storage configurations
- Run Configuration Support: Supports different run types (INCREMENTAL, MANUAL_BACKFILL, FULL_REFRESH)
- Storage Support: Supports multiple storage backends (Google BigQuery, AWS Athena)
- Resource Cleanup: Automatically cleans up temporary files and dependencies after execution
- Environment Variable Management: Passes configuration and run context via environment variables
Installation
Section titled “Installation”# From the root directory of the repositorynpm install
Basic Usage
Section titled “Basic Usage”To run a connector:
npm run connector-runner-node -- path/to/connector-config.json
Programmatic Usage
Section titled “Programmatic Usage”const { ConnectorRunner, Config, RunConfig } = require('@owox/connector-runner');
const runner = new ConnectorRunner();const datamartId = 'my-datamart';const runId = 'run-' + Date.now();
// Create configurationconst config = new Config({ name: 'TikTokAdsConnector', source: { name: 'TikTokAds', config: { // source configuration } }, storage: { name: 'GoogleBigQuery', config: { // storage configuration } }});
// Create run configurationconst runConfig = new RunConfig({ type: 'INCREMENTAL', data: [ { configField: 'date', value: '2024-01-01' } ], state: { lastRun: '2024-01-01T00:00:00Z' }});
await runner.run(datamartId, runId, config, runConfig);
Using ConnectorExecutionService Directly
Section titled “Using ConnectorExecutionService Directly”const ConnectorExecutionService = require('@owox/connector-runner/src/application/services/connector-execution-service');const { Config, RunConfig } = require('@owox/connector-runner/src/application/dto');
const executionService = new ConnectorExecutionService();const datamartId = 'my-datamart';const runId = 'run-' + Date.now();
const config = new Config({ // configuration object});
const runConfig = new RunConfig({ type: 'INCREMENTAL', data: [], state: {}});
await executionService.execute(datamartId, runId, config, runConfig);
Configuration Structure
Section titled “Configuration Structure”Configuration Schema
Section titled “Configuration Schema”The connector configuration JSON file has the following structure:
{ "name": "ConnectorName", // The class name of the connector "description": "Connector Description", // The description of the connector "source": { "name": "ConnectorName", // The name of the connector. (Connector dir name) "config": { // The connector configuration parameters. The parameters are defined in the connector constructor. "ParameterName": { "value": "ParameterValue" } } }, "storage": { "name": "StorageName", // The name of the storage. (Storage dir name) "config": { // The storage configuration parameters. The parameters are defined in the storage constructor. "ParameterName": { "value": "ParameterValue" } } }}
Note: The configuration structure has been updated from the legacy
integration
field tosource
for better clarity.
Run Configuration
Section titled “Run Configuration”The run configuration controls how the connector executes:
const runConfig = new RunConfig({ type: 'INCREMENTAL', // Run type: 'INCREMENTAL', 'MANUAL_BACKFILL', 'FULL_REFRESH', 'CUSTOM' data: [ // Additional run parameters { configField: 'date', value: '2024-01-01' }, { configField: 'limit', value: 100 } ], state: { // Previous run state for incremental runs lastRun: '2024-01-01T00:00:00Z' }});
Run Types
Section titled “Run Types”INCREMENTAL
: Only fetch new data since the last runMANUAL_BACKFILL
: Fetch data for a specific date rangeFULL_REFRESH
: Fetch all available dataCUSTOM
: Custom run type with specific parameters
Configuration Parameters
Section titled “Configuration Parameters”name
(string): The class name of the connectordescription
(string): Description of the connectorsource
(object): Source configurationname
(string): The name of the source connector (corresponds to directory name inpackages/connectors/src/Sources
)config
(object): Source-specific configuration parameters
storage
(object): Storage configurationname
(string): The name of the storage backend (corresponds to directory name inpackages/connectors/src/Storages
)config
(object): Storage-specific configuration parameters
Example Configurations
Section titled “Example Configurations”TikTok Ads to Google BigQuery
Section titled “TikTok Ads to Google BigQuery”{ "name": "TikTokAdsConnector", "description": "TikTok Ads Connector to Google BigQuery", "source": { "name": "TikTokAds", "config": { "AccessToken": { "value": "YOUR_ACCESS_TOKEN" }, "AppId": { "value": "YOUR_APP_ID" }, "AppSecret": { "value": "YOUR_APP_SECRET" }, "AdvertiserIDs": { "value": "YOUR_ADVERTISER_ID" }, "Objects": { "value": "campaigns" }, "DataLevel": { "value": "AUCTION_AD" }, "StartDate": { "value": "2023-01-01" }, "ReimportLookbackWindow": { "value": 5 }, "Fields": { "value": "campaigns campaign_id, campaigns campaign_name" } } }, "storage": { "name": "GoogleBigQuery", "config": { "DestinationLocation": { "value": "US" }, "DestinationDatasetID": { "value": "YOUR_DATASET_ID" }, "DestinationProjectID": { "value": "YOUR_PROJECT_ID" }, "DestinationDatasetName": { "value": "YOUR_DATASET_NAME" }, "ProjectID": { "value": "YOUR_PROJECT_ID" } } }}
TikTok Ads to AWS Athena
Section titled “TikTok Ads to AWS Athena”{ "name": "TikTokAdsConnector", "description": "TikTok Ads Connector to AWS Athena", "source": { "name": "TikTokAds", "config": { "AccessToken": { "value": "YOUR_ACCESS_TOKEN" }, "AppId": { "value": "YOUR_APP_ID" }, "AppSecret": { "value": "YOUR_APP_SECRET" }, "AdvertiserIDs": { "value": "YOUR_ADVERTISER_ID" }, "Objects": { "value": "campaigns" }, "DataLevel": { "value": "AUCTION_AD" }, "StartDate": { "value": "2023-01-01" }, "ReimportLookbackWindow": { "value": 1 }, "Fields": { "value": "campaigns campaign_id, campaigns campaign_name" } } }, "storage": { "name": "AwsAthena", "config": { "AWSRegion": { "value": "us-east-1" }, "AWSAccessKeyId": { "value": "YOUR_ACCESS_KEY_ID" }, "AWSSecretAccessKey": { "value": "YOUR_SECRET_ACCESS_KEY" }, "S3BucketName": { "value": "YOUR_BUCKET_NAME" }, "S3Prefix": { "value": "tiktok_ads_" }, "AthenaDatabaseName": { "value": "YOUR_DATABASE_NAME" }, "DestinationTableName": { "value": "tiktok_ads_" }, "AthenaOutputLocation": { "value": "s3://YOUR_BUCKET_NAME/athena_dir" }, "MaxBufferSize": { "value": 250 } } }}
Architecture
Section titled “Architecture”src/├── core/│ ├── interfaces/ # Abstract interfaces for extensibility│ │ ├── execution-environment.js│ │ ├── dependency-manager.js│ │ └── template-renderer.js│ └── domain/ # Domain objects and business logic│ └── run-context.js # Execution context with environment variables├── infrastructure/ # Implementation details│ ├── environments/│ │ └── nodejs-environment.js # Node.js execution environment│ ├── dependencies/│ │ └── npm-dependency-manager.js│ └── templates/│ └── nodejs-template-renderer.js├── application/ # Application services and DTOs│ ├── services/│ │ └── connector-execution-service.js│ └── dto/│ ├── config.js # Configuration DTOs│ └── run-config.js # Run configuration DTO└── cli/ └── connector-runner-cli.js # Command line interface
Core Components
Section titled “Core Components”The connector runner consists of several key components:
ConnectorRunner (src/index.js
)
Section titled “ConnectorRunner (src/index.js)”Main orchestrator class that:
- Validates configuration parameters
- Creates isolated environments for each run
- Manages connector execution lifecycle
- Handles cleanup after execution
ConnectorExecutionService (src/application/services/connector-execution-service.js
)
Section titled “ConnectorExecutionService (src/application/services/connector-execution-service.js)”Service responsible for orchestrating connector execution:
- Validates parameters and storage environments
- Creates run contexts with environment variables
- Manages execution lifecycle and error handling
- Coordinates with execution environments
RunContext (src/core/domain/run-context.js
)
Section titled “RunContext (src/core/domain/run-context.js)”Domain object representing the execution context:
- Manages datamart ID, run ID, and configurations
- Generates environment variables for connector execution
- Provides unique identifiers for runs
Config DTOs (src/application/dto/
)
Section titled “Config DTOs (src/application/dto/)”Data Transfer Objects for configuration validation:
Config
: Main configuration wrapperSourceConfig
: Source-specific configurationStorageConfig
: Storage-specific configurationRunConfig
: Run configuration with type, data, and state
NodeJsEnvironment (src/infrastructure/environments/nodejs-environment.js
)
Section titled “NodeJsEnvironment (src/infrastructure/environments/nodejs-environment.js)”Manages isolated Node.js environments:
- Creates temporary directories for each run
- Installs required dependencies
- Generates runner templates
- Cleans up resources after execution
NodeJsTemplateRenderer (src/infrastructure/templates/nodejs-template-renderer.js
)
Section titled “NodeJsTemplateRenderer (src/infrastructure/templates/nodejs-template-renderer.js)”Generates execution templates that:
- Import required dependencies as globals
- Set up OWOX connector libraries
- Handle environment variables (OW_CONFIG, OW_RUN_CONFIG, etc.)
- Execute the specified connector with proper error handling
Execution Flow
Section titled “Execution Flow”- Configuration Validation: Validates the provided JSON configuration and run configuration
- Run Context Creation: Creates a run context with environment variables
- Environment Setup: Creates an isolated Node.js environment with required dependencies
- Template Generation: Generates a runner script with proper imports and configuration
- Connector Execution: Spawns a Node.js process to run the connector
- Cleanup: Removes temporary files and dependencies
Environment Variables
Section titled “Environment Variables”The connector runner passes configuration to the connector via environment variables:
OW_DATAMART_ID
: The ID of the datamart being executedOW_RUN_ID
: The unique ID for this runOW_CONFIG
: JSON string containing the main configurationOW_RUN_CONFIG
: JSON string containing the run configuration
Isolation Strategy
Section titled “Isolation Strategy”Each connector run operates in a completely isolated environment:
- Separate working directories under
../../dist/data-marts/conectivity/runs
- Independent
package.json
andnode_modules
- Environment variables for configuration passing
- Automatic cleanup after execution
Testing
Section titled “Testing”The connector runner includes comprehensive test coverage:
# Run all testsnpm test
# Run tests with coveragenpm run test:coverage
Test Structure
Section titled “Test Structure”- Unit Tests: Test individual components in isolation
- Integration Tests: Test component interactions
- DTO Tests: Test configuration validation and serialization
- Template Tests: Test template generation and environment variable handling
Development
Section titled “Development”Linting and Formatting
Section titled “Linting and Formatting”# Run ESLintnpm run lint
# Fix linting issuesnpm run lint:fix
# Format code with Prettiernpm run format
# Check formattingnpm run format:check