amazon-kinesis-connectors icon indicating copy to clipboard operation
amazon-kinesis-connectors copied to clipboard

HTTP proxy support

Open gamefundas opened this issue 8 years ago • 4 comments

I am wondering if the library supports Http Proxy. We are trying to use the ES connector using AWS Cloudwatch Consumer and for some reason the API is unable to store the Lease information in DynamoDB. I get the HTTP connection timed out error.

I tried passing the proxy settings through JAVA_OPTIONS in call to the "java -jar" as System properties but that doesn't seem to make any difference.

Any known fixes for this?

Here's the Stack trace to the effect.

2016-02-20 02:57:10,741 INFO AmazonHttpClient - Unable to execute HTTP request: connect timed out java.net.SocketTimeoutException: connect timed out at java.net.PlainSocketImpl.socketConnect(Native Method) at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:345) at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) at java.net.Socket.connect(Socket.java:589) at sun.security.ssl.SSLSocketImpl.connect(SSLSocketImpl.java:656) at org.apache.http.conn.ssl.SSLSocketFactory.connectSocket(SSLSocketFactory.java:524) at org.apache.http.conn.ssl.SSLSocketFactory.connectSocket(SSLSocketFactory.java:403) at com.amazonaws.http.conn.ssl.SdkTLSSocketFactory.connectSocket(SdkTLSSocketFactory.java:118) at org.apache.http.impl.conn.DefaultClientConnectionOperator.openConnection(DefaultClientConnectionOperator.java:177) at org.apache.http.impl.conn.ManagedClientConnectionImpl.open(ManagedClientConnectionImpl.java:304) at org.apache.http.impl.client.DefaultRequestDirector.tryConnect(DefaultRequestDirector.java:611) at org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:446) at org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:863) at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82) at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:57) at com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:706) at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:467) at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:302) at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.invoke(AmazonDynamoDBClient.java:3240) at com.amazonaws.services.dynamodbv2.AmazonDynamoDBClient.createTable(AmazonDynamoDBClient.java:1047) at com.amazonaws.services.kinesis.leases.impl.LeaseManager.createLeaseTableIfNotExists(LeaseManager.java:118) at com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibLeaseCoordinator.initialize(KinesisClientLibLeaseCoordinator.java:173) at com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.initialize(Worker.java:373) at com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.run(Worker.java:317) at com.amazonaws.services.kinesis.connectors.KinesisConnectorExecutorBase.run(KinesisConnectorExecutorBase.java:95) at com.amazonaws.services.logs.connectors.samples.elasticsearch.ElasticsearchConnector.main(ElasticsearchConnector.java:38)

Thanks

gamefundas avatar Mar 09 '16 16:03 gamefundas

I am facing same issue

devsaik avatar Apr 06 '16 23:04 devsaik

Here is how I fixed this issue : Iam sure there is a better way to do this Replaced KinesisConnectorExecutorBase with MyOwn class called CustomeKinesisConnectorexecutorBase: I copied code from KinesisConnectorExecutorBase.java and made changes to it

/*
 * Copyright 2013-2014 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 * Licensed under the Amazon Software License (the "License").
 * You may not use this file except in compliance with the License.
 * A copy of the License is located at
 *
 * http://aws.amazon.com/asl/
 *
 * or in the "license" file accompanying this file. This file is distributed
 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 * express or implied. See the License for the specific language governing
 * permissions and limitations under the License.
 */
package com.amazonaws.services.kinesis.connectors;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration;
import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker;
import com.amazonaws.services.kinesis.connectors.interfaces.IKinesisConnectorPipeline;
import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory;
import com.amazonaws.ClientConfiguration;
public abstract class CustomeKinesisConnectorexecutorBase<T, U> implements Runnable {
    private static final Log LOG = LogFactory.getLog(KinesisConnectorExecutorBase.class);

    // Amazon Kinesis Client Library worker to process records
    protected Worker worker;

    /**
     * Initialize the Amazon Kinesis Client Library configuration and worker
     * 
     * @param kinesisConnectorConfiguration Amazon Kinesis connector configuration
     */
    protected void initialize(KinesisConnectorConfiguration kinesisConnectorConfiguration) {
        initialize(kinesisConnectorConfiguration, null);
    }

    /**
     * Initialize the Amazon Kinesis Client Library configuration and worker with metrics factory
     * 
     * @param kinesisConnectorConfiguration Amazon Kinesis connector configuration
     * @param metricFactory would be used to emit metrics in Amazon Kinesis Client Library
     */
    protected void
            initialize(KinesisConnectorConfiguration kinesisConnectorConfiguration, IMetricsFactory metricFactory) {
        val clientConfig = new ClientConfiguration()
    clientConfig.setProxyHost("add-proxy-host")
    clientConfig.setProxyPort(add-proxy-port)
    clientConfig.setProxyUsername("add-proxy-username")
    clientConfig.setProxyPassword("add-proxy-password")
    clientConfig.setPreemptiveBasicProxyAuth(false)
        KinesisClientLibConfiguration kinesisClientLibConfiguration =
                new KinesisClientLibConfiguration(kinesisConnectorConfiguration.APP_NAME,
                        kinesisConnectorConfiguration.KINESIS_INPUT_STREAM,
                        kinesisConnectorConfiguration.AWS_CREDENTIALS_PROVIDER,
                        kinesisConnectorConfiguration.WORKER_ID).withKinesisEndpoint(kinesisConnectorConfiguration.KINESIS_ENDPOINT)
                        .withFailoverTimeMillis(kinesisConnectorConfiguration.FAILOVER_TIME)
                        .withMaxRecords(kinesisConnectorConfiguration.MAX_RECORDS)
                        .withInitialPositionInStream(kinesisConnectorConfiguration.INITIAL_POSITION_IN_STREAM)
                        .withIdleTimeBetweenReadsInMillis(kinesisConnectorConfiguration.IDLE_TIME_BETWEEN_READS)
                        .withCallProcessRecordsEvenForEmptyRecordList(KinesisConnectorConfiguration.DEFAULT_CALL_PROCESS_RECORDS_EVEN_FOR_EMPTY_LIST)
                        .withCleanupLeasesUponShardCompletion(kinesisConnectorConfiguration.CLEANUP_TERMINATED_SHARDS_BEFORE_EXPIRY)
                        .withParentShardPollIntervalMillis(kinesisConnectorConfiguration.PARENT_SHARD_POLL_INTERVAL)
                        .withShardSyncIntervalMillis(kinesisConnectorConfiguration.SHARD_SYNC_INTERVAL)
                        .withTaskBackoffTimeMillis(kinesisConnectorConfiguration.BACKOFF_INTERVAL)
                        .withMetricsBufferTimeMillis(kinesisConnectorConfiguration.CLOUDWATCH_BUFFER_TIME)
                        .withMetricsMaxQueueSize(kinesisConnectorConfiguration.CLOUDWATCH_MAX_QUEUE_SIZE)
                        .withUserAgent(kinesisConnectorConfiguration.APP_NAME + ","
                                + kinesisConnectorConfiguration.CONNECTOR_DESTINATION + ","
                                + KinesisConnectorConfiguration.KINESIS_CONNECTOR_USER_AGENT)
                        .withRegionName(kinesisConnectorConfiguration.REGION_NAME)
.withCommonClientConfig(clientConfig);

        if (!kinesisConnectorConfiguration.CALL_PROCESS_RECORDS_EVEN_FOR_EMPTY_LIST) {
            LOG.warn("The false value of callProcessRecordsEvenForEmptyList will be ignored. It must be set to true for the bufferTimeMillisecondsLimit to work correctly.");
        }

        if (kinesisConnectorConfiguration.IDLE_TIME_BETWEEN_READS > kinesisConnectorConfiguration.BUFFER_MILLISECONDS_LIMIT) {
            LOG.warn("idleTimeBetweenReads is greater than bufferTimeMillisecondsLimit. For best results, ensure that bufferTimeMillisecondsLimit is more than or equal to idleTimeBetweenReads ");
        }

        // If a metrics factory was specified, use it.
        if (metricFactory != null) {
            worker =
                    new Worker(getKinesisConnectorRecordProcessorFactory(),
                            kinesisClientLibConfiguration,
                            metricFactory);
        } else {
            worker = new Worker(getKinesisConnectorRecordProcessorFactory(), kinesisClientLibConfiguration);
        }
        LOG.info(getClass().getSimpleName() + " worker created");
    }

    @Override
    public void run() {
        if (worker != null) {
            // Start Amazon Kinesis Client Library worker to process records
            LOG.info("Starting worker in " + getClass().getSimpleName());
            try {
                worker.run();
            } catch (Throwable t) {
                LOG.error(t);
                throw t;
            } finally {
                LOG.error("Worker " + getClass().getSimpleName() + " is not running.");
            }
        } else {
            throw new RuntimeException("Initialize must be called before run.");
        }
    }

    /**
     * This method returns a {@link KinesisConnectorRecordProcessorFactory} that contains the
     * appropriate {@link IKinesisConnectorPipeline} for the Amazon Kinesis Enabled Application
     * 
     * @return a {@link KinesisConnectorRecordProcessorFactory} that contains the appropriate
     *         {@link IKinesisConnectorPipeline} for the Amazon Kinesis Enabled Application
     */
    public abstract KinesisConnectorRecordProcessorFactory<T, U> getKinesisConnectorRecordProcessorFactory();
}

devsaik avatar Apr 07 '16 20:04 devsaik

Still waiting for proxy configuration?

martindariocernadas avatar Sep 06 '18 17:09 martindariocernadas

AWS SDKs understand proxy properties set via _JAVA_OPTION by default, how come the KCL library do not? Was there any reason?

This is critical to have for local testing and small scale debugging before pushing up code

erfangc avatar Feb 04 '19 02:02 erfangc