首页
学习
活动
专区
圈层
工具
发布
社区首页 >问答首页 >文件源的火花流

文件源的火花流
EN

Stack Overflow用户
提问于 2022-05-05 22:25:23
回答 1查看 128关注 0票数 0

我有两本笔记本: streaming.ipynb和File.ipynb (工具: spark3.2.1,colab笔记本)-- streaming.ipynb:

代码语言:javascript
复制
import sys
from textblob import TextBlob
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.feature_extraction.text import CountVectorizer
import nltk
from nltk.corpus import stopwords
import string
import re

#Emoji patterns
emoji_pattern = re.compile("["
         u"\U0001F600-\U0001F64F"  # emoticons
         u"\U0001F300-\U0001F5FF"  # symbols & pictographs
         u"\U0001F680-\U0001F6FF"  # transport & map symbols
         u"\U0001F1E0-\U0001F1FF"  # flags (iOS)
         u"\U00002702-\U000027B0"
         u"\U000024C2-\U0001F251"
         "]+", flags=re.UNICODE)
# define stopwords
nltk.download('stopwords')
stopword = stopwords.words('english')
# import stemmer from nltk
ps = nltk.PorterStemmer()
# import lemmatizer from nltk
#wn = nltk.WordNetLemmatizer()
# import lemmatizer from nltk
import nltk
from nltk.stem import WordNetLemmatizer
wn = nltk.download('wordnet') #WordNetLemmatizer()
def abb(tweet):
    tweet = re.sub(r"im|i'm|iam", "i am", tweet)
    tweet = re.sub(r"he's", "he is", tweet)
    tweet = re.sub(r"she's", "she is", tweet)
    tweet = re.sub(r"that's", "that is", tweet) 
    tweet = re.sub(r"there's", "there is", tweet) 
    tweet = re.sub(r"what's", "what is", tweet)
    tweet = re.sub(r"where's", "where is", tweet) 
    tweet = re.sub(r"\'ll", " will", tweet)  
    tweet = re.sub(r"\'ve", " have", tweet)  
    tweet = re.sub(r"\'re", " are", tweet)
    tweet = re.sub(r"\'d", " would", tweet)
    tweet = re.sub(r"\'ve", " have", tweet)
    tweet = re.sub(r"won't", "will not", tweet)
    tweet = re.sub(r"don't", "do not", tweet)
    tweet = re.sub(r"did't", "did not", tweet)
    tweet = re.sub(r"can't", "can not", tweet)
    tweet = re.sub(r"it's", "it is", tweet)
    tweet = re.sub(r"couldn't", "could not", tweet)
    tweet = re.sub(r"wouldn't", "would not", tweet)
    tweet = re.sub(r"have't", "have not", tweet)
    return tweet


def getSubjectivity(text):
   return TextBlob(text).sentiment.subjectivity
  
 #Create a function to get the polarity
def getPolarity(text):
   return TextBlob(text).sentiment.polarity
  
def getAnalysis(score):
  if score < 0:
    return "Negative"
  elif score == 0:
    return "Neutral"
  else:
    return "Positive"

def main():
    sc = SparkContext(appName="PysparkStreaming")
    ssc = StreamingContext(sc, 5)   #Streaming will execute in each 3 seconds
    lines = ssc.textFileStream('/content/drive/MyDrive/Colab Notebooks/log/')  #'log/ mean directory name
    counts = lines.flatMap(lambda line: line.split("")) \
        .map(lambda x: x.lower())\
        .map(lambda x: re.sub(r"@[A-Za-z0-9]+", "", x))\
        .map(lambda x: re.sub(r"\\x[A-Za-z][0-9]+", "", x))\
        .map(lambda x: re.sub(r"\\u[0-9][A-Za-z]|\\U[0-9][A-Za-z]+", "", x))\
        .map(lambda x: re.sub(r'&amp;amp;', '&', x)) \
        .map(lambda x: re.sub(r"b[\s]+", "", x)) \
        .map(lambda x: re.sub(r"https?:\/\/t.co\/[A-Za-z0-9]+", "", x)) \
        .map(lambda x: abb(x)) \
        .map(lambda x: re.sub("[^a-zA-Z0-9\s]+", "",x)) \
        .map(lambda x: re.sub('[0-9]+', ' ', x))\
        .map(lambda x: emoji_pattern.sub(r'', x) )\
        .map(lambda x: "".join([char for char in x if char not in string.punctuation])) \
        .map(lambda x: re.split('\W+', x)) \
        .map(lambda x: " ".join(x))\
        .map(lambda x: [word for word in x if word not in stopword])\
        .map(lambda x: getPolarity(x))  
    counts.pprint()
    ssc.start()
    ssc.awaitTermination()


if __name__ == "__main__":
    main()

和File.ipynb:

代码语言:javascript
复制
from random import randint
import time

"""
This is use for create 30 file one by one in each 5 seconds interval. 
These files will store content dynamically from 'lorem.txt' using below code
"""


def main():
    a = 1
    with open('/content/drive/MyDrive/Colab Notebooks/lorem.txt', 'r',encoding='latin-1') as file:  # reading content from 'lorem.txt' file
        lines = file.readlines()
        while a <= 30:
            totalline = len(lines)
            linenumber = randint(0, totalline - 10)
            with open('/content/drive/MyDrive/Colab Notebooks/log/log.txt'.format(a), 'w') as writefile:
                writefile.write(' '.join(line for line in lines[linenumber:totalline]))
            print('creating file log{}.txt'.format(a))
            a += 1
            time.sleep(5)


if __name__ == '__main__':
    main()

注意:在File.ipynb中,我读到了lorem.txt,它在log.txt中包含tweet和单独的数据(为NULL ),但是当您运行该代码时,它将创建日志{1}包含部分tweet,log{2}等等……而来自这些日志的streaming.ipynb流每5秒打印结果,lorem.txt包含:

代码语言:javascript
复制
'A woman faces 100 lashes and up to seven years in prison because she was a VICTIM of RAPE in Qatar \n\nShe was told she could possibly avoid criminal charges if she married her attacker\n\n#FIFA22 #WorldCup2022 #Qatar2022 #Qatar' 
'#WorldCup2022 '
'i love my mother'
'it is so bad!'
'Black Stars technical team among 32 for 2022 Aspire Academy Global Summit\n\n-  \n\n.#Ghana #WorldCup2022 '
'New Episode 163 - The Business End out now!!! #WorldCup2022 #TorontoFC #RealMadridChelsea #LiverpoolVillarreal  #ManCityRealMadrid  #ARSMUN #ACMILAN #ChampionsLeague #uefachampionsleague \nOut now on all popular audio podcast platforms '
'Check out new work on my @Behance profile: "Edited" \n#edit #photoshop #photography #road #street #art #design #architect #amazon #quote #fun #amazing #EidMubarak #ukraine #putin #gold #oil #WorldCup2022'
'Can we talk about how bad the Al Rihla looks #WorldCup2022 '
'They should hire Mourinho as a consultant for the #WorldCup2022'
'@yuzi_chahal @imkuldeep18   The spin duo back with bang bang performance for representing franchise   Eagerly Waiting to see in India team behind the stump by Rishaph pant The selectors are very big problem to pick a players for #WorldCup2022\n#IPL20222'
'@DraganflyInc Draganflyer Commander can sanitize the entire 50,000 seater stadium in 4-6 hrs. The #WorldCup2022 in #Qatar2022 is slowly approaching. This drone could really help keep stadiums safe. Food for thought.\n\n#CovidIsntOver \n#GOPTaxScam \n#IMPEACHBIDENNOW \n#BTC \n#NFTs '
'NFT News \n\nTooneyChain: when NFTs invite themselves to the 2022 football world cup\n@TooneyChainNFT \n\n#WorldCup2022 #football #NFTs #NFTcollectibles '
'Mark my words. 3-1 USA over England this fall in the #WorldCup2022 . If we win, England has to call it soccer from now on.'
'you can as well use our services in projects that  are not permanent like @Containerhouse or #structures for #WorldCup2022 ,also for #camps ,#swimmingpool #parks #offices #dormitory'
'@bhogleharsha Than we need to forgot about @BhuviOfficial ?\n#ipl2022 #WorldCup2022'
'Ghana and Tunisia have been invited to take part in the Kirin Super Cup 2022, which will be hosted by Japan from Friday, June 10 to Tuesday, June 14, 2022, in the lead up to the #WorldCup2022 in Qatar.\n\nThe other 2 participants are hosts Japan and Chile #AfricanFootball'
'join us on TG for our game of the day!⚽️⚽️ The World Cup token team is rooting for a @Cristiano goal today! Good luck to both sides today! @ManUtd  @ChelseaFC  $WCT #WCT #WctArmy #WorldCup2022 #Qatar2022 #100xgem #BSC #BNB\xa0\xa0\xa0#BSCGem #BNBchain  #1000xgem #moonshot #ETH #CRO #BTC '
'@julietbawuah @tv3_ghana Wow, super talented player. Who knows, with him on #Qatar’s team they might just win the #WorldCup2022. #Qatar2022 #Fooball #games'
'@ForthHelena @thevinetway @jodyvance Jody are you a QA asset?\nCan Jody tweet about LGBTQ issues? Notable events from around the world ignored &amp; Jody is regurgitating CNn narratives?\nToo many ads not enough of what you want? SiriusXM commercial free entertainment.\n#Canucks \n#WorldCup2022\n#NFLDraft\n#TikTok '
'Congratulations, Danyel! You’re really having an impact in this field at just the right time!  This, alongside your new co-authored book on #Qatar &amp; the #WorldCup2022 , AND our @CIRSGUQ research project on the same subject! !  '
'Its the crypto world cup final and you are the manager.\nWhich front 3 are you going with?\n\n#xrp #vet #qnt #ada #zil #xlm #doge #btc #cro #eth #sand #luna #WorldCup2022\n#cryptocurrency'
'@yuzi_chahal I Would like to see KulCha Combination in  #WorldCup2022'
'It’s going to be scary, but not for us \n\n#Portugal #WorldCup2022 '

错误如下所示:

代码语言:javascript
复制
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-5-bb31ef108289> in <module>()
     88 
     89 if __name__ == "__main__":
---> 90     main()

3 frames
/content/spark-3.2.1-bin-hadoop3.2/python/lib/py4j-0.10.9.3-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o23.awaitTermination.
: org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
  File "/content/spark-3.2.1-bin-hadoop3.2/python/pyspark/streaming/util.py", line 68, in call
    r = self.func(t, *rdds)
  File "/content/spark-3.2.1-bin-hadoop3.2/python/pyspark/streaming/dstream.py", line 170, in takeAndPrint
    taken = rdd.take(num + 1)
  File "/content/spark-3.2.1-bin-hadoop3.2/python/pyspark/rdd.py", line 1568, in take
    res = self.context.runJob(self, takeUpToNumLeft, p)
  File "/content/spark-3.2.1-bin-hadoop3.2/python/pyspark/context.py", line 1227, in runJob
    sock_info = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
  File "/content/spark-3.2.1-bin-hadoop3.2/python/lib/py4j-0.10.9.3-src.zip/py4j/java_gateway.py", line 1322, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/content/spark-3.2.1-bin-hadoop3.2/python/lib/py4j-0.10.9.3-src.zip/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0) (9557898acd7f executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/content/spark-3.2.1-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/worker.py", line 619, in main
    process()
  File "/content/spark-3.2.1-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/worker.py", line 611, in process

我该怎么解决呢?

EN

回答 1

Stack Overflow用户

发布于 2022-05-06 00:08:25

我发现您的代码有两个问题:

据我所知,

  • 需要创建一个目录/content/drive/MyDrive/Colab Notebooks/log/,其中包含遵循log<number>.txt模式的文件。这一行似乎有一个bug:

代码语言:javascript
复制
with open('/content/drive/MyDrive/Colab Notebooks/log/log.txt'.format(a), 'w') as writefile:

实际上,您并不创建多个文件,而是在每次迭代中覆盖单个log.txt文件,原因是文件中缺少花括号。

票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/72134232

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档