我们有标准报告上传为PDF每天的基础上。在PDF中有一些表,我们希望将这些表拉到数据集中。我在代码存储库中导入了表格,但我似乎无法让代码存储库引入PDF。
我收到这个错误:
“错误加载输入”
{“消息”:“无法解析别名”,“别名”:“/US Office/COO/pdf_tests/test.pdf",”fallbackBranches“:”主“}”
这是我使用的最基本的代码:
from transforms.api import transform_df, Input, Output
import tabula
@transform_df(
Output("/US Office/COO/pdf_tests/datasets/pdf_read"),
source_df=Input("/US Office/COO/pdf_tests/test.pdf"),
)
def compute(source_df):
df = source_df
df = tabula.read_pdf(df, pages='all')
return df我可以轻松地用本地python安装使tabula输出一个csv,而不是在Foundry。您可以提供的任何帮助将是伟大的,因为我是非常新的Palantir铸造和代码存储库。
谢谢!
发布于 2022-01-24 18:11:21
首先,谢谢你让我朝着正确的方向前进。
接下来,答案是:
一旦摆脱了结果变量中的.read(),我就能够让它工作起来。Tabula希望看到一个路径(在这种情况下很难),或者像.open一样的对象,这是返回的内容。下面是有效的代码,希望能帮助其他人开始工作。
from transforms.api import transform, Input, Output
import tabula
from pyspark.sql.types import StructType, StructField, StringType
import pandas as pd
@transform( #note that this is @transform and not @transform_df
out=Output("/US Office/COO/pdf_tests/outdata"), # The output datasheet path
raw=Input("/US Office/COO/pdf_tests/datasets/dataset_pdf"), # path to the input raw datasheet that contains the PDF
)
def compute(ctx, raw, out):
rows = []
result = [] # define results list
fs = raw.filesystem() # variable to make accsessing the datasheet filesystem easy
latest_file = 'test.pdf' # the name of the pdf inside the dataset
df_schema = StructType([ # defining the spark schema
StructField("system", StringType(), True),
StructField("status", StringType(), True),
StructField('date', StringType(), True),
StructField('user', StringType(), True),
StructField('id', StringType(), True)
])
with fs.open(latest_file, mode='rb') as f: # with statement that opens the pdf as f. rb essential as it has to be opened as a raw binary file
result = tabula.read_pdf(f, pages='all', multiple_tables=True) # basic tabula read function that generates a list of tables within another list.
rows.append(result[0]) # adds the list of tables into the rows list
df = pd.DataFrame(rows[0]) # Creates a pandas dataframe using the first table in the pdf
df = ctx.spark_session.createDataFrame(df, schema=df_schema) # casts the pandas df to a pyspark df using the defined schema
out.write_dataframe(df) # writes the pyspark dataframe to the ouput dataset发布于 2022-01-13 20:26:40
您缺少的部分是获取和解析PDF的方法。让我们一步一步地走:
您的Input只能使用数据集,它们需要在fallbackBranch上至少有一个有效的事务(通常是master)。
您可以通过使用数据连接来摄取数据集来实现这一点,或者您也可以手动创建数据集,并将您的pdf放入其内容中。
然后文件就会出现,就像在这个从公共数据中摄取的例子中一样:

因为现在这是一个数据集,所以您可以在转换中从Foundry文件系统读取内容。我想它会像这样,注意@transform而不是@transform_df
from transforms.api import transform, Input, Output
from pyspark.sql import Row
@transform(
out=Output("/US Office/COO/pdf_tests/datasets/pdf_read"),
source_df=Input("/US Office/COO/pdf_tests/pdf_staset"),
)
def compute(ctx, source_df, out):
rows = []
with source_df.filesystem().open(latest_file.path, 'rb') as f:
result = tabula.read_pdf(f.read(), pages='all')
# add your logic to populate rows here:
rows.append(Row({
"col_a": "a",
"col_b": "row",
"col_c": "some values"
}))
schema = StructType([
StructField('col_a', StringType()),
StructField('col_b', StringType()),
StructField('col_c', StringType()),
])
return out.write_dataframe(ctx.spark_session.createDataFrame(rows, schema))我没有测试上面的代码。
https://stackoverflow.com/questions/70689175
复制相似问题