首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >Apache /flink runner未在EMR中执行(从GCS访问文件)

Apache /flink runner未在EMR中执行(从GCS访问文件)
EN

Stack Overflow用户
提问于 2020-08-05 11:20:32
回答 1查看 454关注 0票数 0

我有一个apache beam管道来索引一些数据到elasticsearch。我尝试使用spark或Flink runner在AWS EMR中运行作业。当我试图在本地设置的独立spark上运行作业时,管道可以处理本地磁盘中的源文件,但是,当我从GCS读取文件时,它不能工作。当我在EMR集群中运行时,这是相同的。

我在Hadoop core-site.xml上设置为EMR配置的配置

代码语言:javascript
复制
{
    "Classification": "core-site",
    "Properties": {
      "fs.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem",
      "fs.AbstractFileSystem.gs.impl": "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS",
      "fs.gs.project.id": "data-warehouse",
      "google.cloud.auth.service.account.enable": "true",
      "fs.gs.auth.service.account.json.keyfile": "/home/hadoop/utils/key.json"
    }
  }

此外,GCS-connector jar位于spark jar路径和hadoop jar路径中

管道的maven的pom文件。

代码语言:javascript
复制
<project xmlns="http://maven.apache.org/POM/4.0.0"
  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.company.beam</groupId>
  <artifactId>IndexToEs</artifactId>
  <version>1.0-SNAPSHOT</version>

  <properties>
    <beam.version>2.22.0</beam.version>
    <maven-compiler-plugin.version>3.7.0</maven-compiler-plugin.version>
    <maven-exec-plugin.version>1.6.0</maven-exec-plugin.version>
    <slf4j.version>1.7.25</slf4j.version>
  </properties>

  <repositories>
    <repository>
      <id>apache.snapshots</id>
      <name>Apache Development Snapshot Repository</name>
      <url>https://repository.apache.org/content/repositories/snapshots/</url>
      <releases>
        <enabled>false</enabled>
      </releases>
      <snapshots>
        <enabled>true</enabled>
      </snapshots>
    </repository>
  </repositories>

  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>${maven-compiler-plugin.version}</version>
        <configuration>
          <source>8</source>
          <target>8</target>
        </configuration>
      </plugin>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-shade-plugin</artifactId>
        <version>3.2.3</version>
        <executions>
          <execution>
            <goals>
              <goal>shade</goal>
            </goals>
            <configuration>
              <createDependencyReducedPom>false</createDependencyReducedPom>
              <transformers>
                <transformer
                  implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
                <transformer
                  implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                  <mainClass>com.company.beam.IndexToEs</mainClass>
                </transformer>
              </transformers>
              <filters>
                <filter>
                  <artifact>*:*</artifact>
                  <excludes>
                    <exclude>META-INF/*.SF</exclude>
                    <exclude>META-INF/*.DSA</exclude>
                    <exclude>META-INF/*.RSA</exclude>
                  </excludes>
                </filter>
              </filters>
            </configuration>
          </execution>
        </executions>
      </plugin>
    </plugins>

    <pluginManagement>
      <plugins>
        <plugin>
          <groupId>org.codehaus.mojo</groupId>
          <artifactId>exec-maven-plugin</artifactId>
          <version>${maven-exec-plugin.version}</version>
          <configuration>
            <cleanupDaemonThreads>false</cleanupDaemonThreads>
          </configuration>
        </plugin>

      </plugins>

    </pluginManagement>
  </build>

  <dependencies>
    <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-sdks-java-core</artifactId>
      <version>${beam.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-sdks-java-io-elasticsearch</artifactId>
      <version>${beam.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
      <version>${beam.version}</version>
    </dependency>
<!--     https://mvnrepository.com/artifact/org.apache.beam/beam-sdks-java-extensions-google-cloud-platform-core -->
    <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-sdks-java-extensions-google-cloud-platform-core</artifactId>
      <version>${beam.version}</version>
    </dependency>
<!--     https://mvnrepository.com/artifact/org.apache.beam/beam-runners-google-cloud-dataflow-java -->
    <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>
      <version>${beam.version}</version>
    </dependency>
    <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-runners-direct-java</artifactId>
      <version>${beam.version}</version>
<!--      <scope>runtime</scope>-->
    </dependency>
    <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-runners-spark</artifactId>
      <version>${beam.version}</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.beam/beam-runners-flink -->
    <dependency>
      <groupId>org.apache.beam</groupId>
      <artifactId>beam-runners-flink_2.11</artifactId>
      <version>2.16.0</version>
    </dependency>
    <dependency>
      <groupId>com.google.cloud.bigdataoss</groupId>
      <artifactId>gcs-connector</artifactId>
      <version>hadoop2-1.9.17</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>2.1.3</version>
    </dependency>
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-streaming_2.11</artifactId>
      <version>2.1.3</version>
    </dependency>




    <!-- slf4j API frontend binding with JUL backend -->
<!--    <dependency>-->
<!--      <groupId>org.slf4j</groupId>-->
<!--      <artifactId>slf4j-api</artifactId>-->
<!--      <version>${slf4j.version}</version>-->
<!--    </dependency>-->
<!--    <dependency>-->
<!--      <groupId>org.slf4j</groupId>-->
<!--      <artifactId>slf4j-jdk14</artifactId>-->
<!--      <version>${slf4j.version}</version>-->
<!--    </dependency>-->
  </dependencies>
</project>

没有错误,但EMR显示任务已完成,但流水线尚未运行。

我不知道这是apache光束问题还是集群配置问题。

EN

回答 1

Stack Overflow用户

发布于 2020-08-10 10:05:16

我发现了问题所在。Apache beam sdk使用gsutil访问GCS文件。根据flink文档,hadoop连接器负责任何其他文件系统访问,但在apache beam使用flink runner的情况下,数据使用gsutil读取并馈送到下游。所以我安装了google and并激活了服务帐户。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/63257934

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档