我有两本笔记本: streaming.ipynb和File.ipynb (工具: spark3.2.1,colab笔记本)-- streaming.ipynb:
import sys
from textblob import TextBlob
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from sklearn.feature_extraction.text import CountVectorizer
import nltk
from nltk.corpus import stopwords
import string
import re
#Emoji patterns
emoji_pattern = re.compile("["
u"\U0001F600-\U0001F64F" # emoticons
u"\U0001F300-\U0001F5FF" # symbols & pictographs
u"\U0001F680-\U0001F6FF" # transport & map symbols
u"\U0001F1E0-\U0001F1FF" # flags (iOS)
u"\U00002702-\U000027B0"
u"\U000024C2-\U0001F251"
"]+", flags=re.UNICODE)
# define stopwords
nltk.download('stopwords')
stopword = stopwords.words('english')
# import stemmer from nltk
ps = nltk.PorterStemmer()
# import lemmatizer from nltk
#wn = nltk.WordNetLemmatizer()
# import lemmatizer from nltk
import nltk
from nltk.stem import WordNetLemmatizer
wn = nltk.download('wordnet') #WordNetLemmatizer()
def abb(tweet):
tweet = re.sub(r"im|i'm|iam", "i am", tweet)
tweet = re.sub(r"he's", "he is", tweet)
tweet = re.sub(r"she's", "she is", tweet)
tweet = re.sub(r"that's", "that is", tweet)
tweet = re.sub(r"there's", "there is", tweet)
tweet = re.sub(r"what's", "what is", tweet)
tweet = re.sub(r"where's", "where is", tweet)
tweet = re.sub(r"\'ll", " will", tweet)
tweet = re.sub(r"\'ve", " have", tweet)
tweet = re.sub(r"\'re", " are", tweet)
tweet = re.sub(r"\'d", " would", tweet)
tweet = re.sub(r"\'ve", " have", tweet)
tweet = re.sub(r"won't", "will not", tweet)
tweet = re.sub(r"don't", "do not", tweet)
tweet = re.sub(r"did't", "did not", tweet)
tweet = re.sub(r"can't", "can not", tweet)
tweet = re.sub(r"it's", "it is", tweet)
tweet = re.sub(r"couldn't", "could not", tweet)
tweet = re.sub(r"wouldn't", "would not", tweet)
tweet = re.sub(r"have't", "have not", tweet)
return tweet
def getSubjectivity(text):
return TextBlob(text).sentiment.subjectivity
#Create a function to get the polarity
def getPolarity(text):
return TextBlob(text).sentiment.polarity
def getAnalysis(score):
if score < 0:
return "Negative"
elif score == 0:
return "Neutral"
else:
return "Positive"
def main():
sc = SparkContext(appName="PysparkStreaming")
ssc = StreamingContext(sc, 5) #Streaming will execute in each 3 seconds
lines = ssc.textFileStream('/content/drive/MyDrive/Colab Notebooks/log/') #'log/ mean directory name
counts = lines.flatMap(lambda line: line.split("")) \
.map(lambda x: x.lower())\
.map(lambda x: re.sub(r"@[A-Za-z0-9]+", "", x))\
.map(lambda x: re.sub(r"\\x[A-Za-z][0-9]+", "", x))\
.map(lambda x: re.sub(r"\\u[0-9][A-Za-z]|\\U[0-9][A-Za-z]+", "", x))\
.map(lambda x: re.sub(r'&amp;', '&', x)) \
.map(lambda x: re.sub(r"b[\s]+", "", x)) \
.map(lambda x: re.sub(r"https?:\/\/t.co\/[A-Za-z0-9]+", "", x)) \
.map(lambda x: abb(x)) \
.map(lambda x: re.sub("[^a-zA-Z0-9\s]+", "",x)) \
.map(lambda x: re.sub('[0-9]+', ' ', x))\
.map(lambda x: emoji_pattern.sub(r'', x) )\
.map(lambda x: "".join([char for char in x if char not in string.punctuation])) \
.map(lambda x: re.split('\W+', x)) \
.map(lambda x: " ".join(x))\
.map(lambda x: [word for word in x if word not in stopword])\
.map(lambda x: getPolarity(x))
counts.pprint()
ssc.start()
ssc.awaitTermination()
if __name__ == "__main__":
main()和File.ipynb:
from random import randint
import time
"""
This is use for create 30 file one by one in each 5 seconds interval.
These files will store content dynamically from 'lorem.txt' using below code
"""
def main():
a = 1
with open('/content/drive/MyDrive/Colab Notebooks/lorem.txt', 'r',encoding='latin-1') as file: # reading content from 'lorem.txt' file
lines = file.readlines()
while a <= 30:
totalline = len(lines)
linenumber = randint(0, totalline - 10)
with open('/content/drive/MyDrive/Colab Notebooks/log/log.txt'.format(a), 'w') as writefile:
writefile.write(' '.join(line for line in lines[linenumber:totalline]))
print('creating file log{}.txt'.format(a))
a += 1
time.sleep(5)
if __name__ == '__main__':
main()注意:在File.ipynb中,我读到了lorem.txt,它在log.txt中包含tweet和单独的数据(为NULL ),但是当您运行该代码时,它将创建日志{1}包含部分tweet,log{2}等等……而来自这些日志的streaming.ipynb流每5秒打印结果,lorem.txt包含:
'A woman faces 100 lashes and up to seven years in prison because she was a VICTIM of RAPE in Qatar \n\nShe was told she could possibly avoid criminal charges if she married her attacker\n\n#FIFA22 #WorldCup2022 #Qatar2022 #Qatar'
'#WorldCup2022 '
'i love my mother'
'it is so bad!'
'Black Stars technical team among 32 for 2022 Aspire Academy Global Summit\n\n- \n\n.#Ghana #WorldCup2022 '
'New Episode 163 - The Business End out now!!! #WorldCup2022 #TorontoFC #RealMadridChelsea #LiverpoolVillarreal #ManCityRealMadrid #ARSMUN #ACMILAN #ChampionsLeague #uefachampionsleague \nOut now on all popular audio podcast platforms '
'Check out new work on my @Behance profile: "Edited" \n#edit #photoshop #photography #road #street #art #design #architect #amazon #quote #fun #amazing #EidMubarak #ukraine #putin #gold #oil #WorldCup2022'
'Can we talk about how bad the Al Rihla looks #WorldCup2022 '
'They should hire Mourinho as a consultant for the #WorldCup2022'
'@yuzi_chahal @imkuldeep18 The spin duo back with bang bang performance for representing franchise Eagerly Waiting to see in India team behind the stump by Rishaph pant The selectors are very big problem to pick a players for #WorldCup2022\n#IPL20222'
'@DraganflyInc Draganflyer Commander can sanitize the entire 50,000 seater stadium in 4-6 hrs. The #WorldCup2022 in #Qatar2022 is slowly approaching. This drone could really help keep stadiums safe. Food for thought.\n\n#CovidIsntOver \n#GOPTaxScam \n#IMPEACHBIDENNOW \n#BTC \n#NFTs '
'NFT News \n\nTooneyChain: when NFTs invite themselves to the 2022 football world cup\n@TooneyChainNFT \n\n#WorldCup2022 #football #NFTs #NFTcollectibles '
'Mark my words. 3-1 USA over England this fall in the #WorldCup2022 . If we win, England has to call it soccer from now on.'
'you can as well use our services in projects that are not permanent like @Containerhouse or #structures for #WorldCup2022 ,also for #camps ,#swimmingpool #parks #offices #dormitory'
'@bhogleharsha Than we need to forgot about @BhuviOfficial ?\n#ipl2022 #WorldCup2022'
'Ghana and Tunisia have been invited to take part in the Kirin Super Cup 2022, which will be hosted by Japan from Friday, June 10 to Tuesday, June 14, 2022, in the lead up to the #WorldCup2022 in Qatar.\n\nThe other 2 participants are hosts Japan and Chile #AfricanFootball'
'join us on TG for our game of the day!⚽️⚽️ The World Cup token team is rooting for a @Cristiano goal today! Good luck to both sides today! @ManUtd @ChelseaFC $WCT #WCT #WctArmy #WorldCup2022 #Qatar2022 #100xgem #BSC #BNB\xa0\xa0\xa0#BSCGem #BNBchain #1000xgem #moonshot #ETH #CRO #BTC '
'@julietbawuah @tv3_ghana Wow, super talented player. Who knows, with him on #Qatar’s team they might just win the #WorldCup2022. #Qatar2022 #Fooball #games'
'@ForthHelena @thevinetway @jodyvance Jody are you a QA asset?\nCan Jody tweet about LGBTQ issues? Notable events from around the world ignored & Jody is regurgitating CNn narratives?\nToo many ads not enough of what you want? SiriusXM commercial free entertainment.\n#Canucks \n#WorldCup2022\n#NFLDraft\n#TikTok '
'Congratulations, Danyel! You’re really having an impact in this field at just the right time! This, alongside your new co-authored book on #Qatar & the #WorldCup2022 , AND our @CIRSGUQ research project on the same subject! ! '
'Its the crypto world cup final and you are the manager.\nWhich front 3 are you going with?\n\n#xrp #vet #qnt #ada #zil #xlm #doge #btc #cro #eth #sand #luna #WorldCup2022\n#cryptocurrency'
'@yuzi_chahal I Would like to see KulCha Combination in #WorldCup2022'
'It’s going to be scary, but not for us \n\n#Portugal #WorldCup2022 '错误如下所示:
Py4JJavaError Traceback (most recent call last)
<ipython-input-5-bb31ef108289> in <module>()
88
89 if __name__ == "__main__":
---> 90 main()
3 frames
/content/spark-3.2.1-bin-hadoop3.2/python/lib/py4j-0.10.9.3-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling o23.awaitTermination.
: org.apache.spark.SparkException: An exception was raised by Python:
Traceback (most recent call last):
File "/content/spark-3.2.1-bin-hadoop3.2/python/pyspark/streaming/util.py", line 68, in call
r = self.func(t, *rdds)
File "/content/spark-3.2.1-bin-hadoop3.2/python/pyspark/streaming/dstream.py", line 170, in takeAndPrint
taken = rdd.take(num + 1)
File "/content/spark-3.2.1-bin-hadoop3.2/python/pyspark/rdd.py", line 1568, in take
res = self.context.runJob(self, takeUpToNumLeft, p)
File "/content/spark-3.2.1-bin-hadoop3.2/python/pyspark/context.py", line 1227, in runJob
sock_info = self._jvm.PythonRDD.runJob(self._jsc.sc(), mappedRDD._jrdd, partitions)
File "/content/spark-3.2.1-bin-hadoop3.2/python/lib/py4j-0.10.9.3-src.zip/py4j/java_gateway.py", line 1322, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/content/spark-3.2.1-bin-hadoop3.2/python/lib/py4j-0.10.9.3-src.zip/py4j/protocol.py", line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.runJob.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0) (9557898acd7f executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/content/spark-3.2.1-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/worker.py", line 619, in main
process()
File "/content/spark-3.2.1-bin-hadoop3.2/python/lib/pyspark.zip/pyspark/worker.py", line 611, in process我该怎么解决呢?
发布于 2022-05-06 00:08:25
我发现您的代码有两个问题:
据我所知,
/content/drive/MyDrive/Colab Notebooks/log/,其中包含遵循log<number>.txt模式的文件。这一行似乎有一个bug:with open('/content/drive/MyDrive/Colab Notebooks/log/log.txt'.format(a), 'w') as writefile:实际上,您并不创建多个文件,而是在每次迭代中覆盖单个log.txt文件,原因是文件中缺少花括号。
StreamingContext.textFileStream接受HDFS作为参数。请阅读文档中的以下章节:https://spark.apache.org/docs/latest/streaming-programming-guide.htmlhttps://stackoverflow.com/questions/72134232
复制相似问题