我有一个从文件中读取的ArrayList大文件,我想用多线程读取它的内容,并处理每个字符串,反复调用一个方法并将它打印到一个文件中,.I给出了我的代码的工作结构。我怎么也无法为我想要的东西编写代码,而不会陷入与线程同步相关的异常中.我对线程的概念很陌生。想要找到一种有效的方法来实现这个..I,已经研究了其他与线程和数组列表相关的解决方案,但是它对我来说还没有成功。如有任何关于如何处理此事的建议,我们将不胜感激。
import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.io.PrintStream;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
public class threadingWithMathod {
public static void main(String[] args) throws FileNotFoundException, UnsupportedEncodingException {
ArrayList<String> samples=readurls("path/to/sample.csv");
PrintStream filewriter = new PrintStream(new File("path/to/result.csv"), "UTF-8");
for (int i = 0; i < samples.size(); i++) {
String string1 = samples.get(i);
String string2 = samples.get(i+1);
///Need Info As to how process with Threading without clashing
/// sampleProcessString need to be called repeatedly
//sampleProcessString(filewriter,string) by 2-3 threads
}
}
public static void sampleProcessString(PrintStream filewriter,String string) {
filewriter.println(processedString(string));
}
private static Object processedString(String string) {
//Intended to generate a new line by using a Sql query
//This method will be using a connection to a mysql data base based on sample
return string+"++> done something";
}
public static ArrayList<String> readurls(String filename) {
ArrayList<String> aslink=new ArrayList<String>();
BufferedReader reader;
try {
reader = new BufferedReader(new FileReader( filename));
String line = reader.readLine();
while (line != null) {
aslink.add(line);
line = reader.readLine();
}
reader.close();
} catch (IOException e) {
e.printStackTrace();
}
return aslink;
}
}发布于 2020-11-03 13:24:49
创建了一些代码段,您可以尝试将实际处理代码放到其中。
我的测试数据如下所示:
try (PrintWriter pw = new PrintWriter("testdata.txt")) {
for (int i = 0; i < 1000000; i++)
pw.println(i);
}所以一个文本文件的行中有一百万个数字。
我的“任务”是创建一个包含同一行值的两倍的文件,而忽略它们的顺序:
pw.println(Integer.parseInt(line) * 2);其中line是输入文件的一行,而pw是输出的PrintWriter。
在实际代码中:
try (PrintWriter pw = new PrintWriter("testresult.txt");
BufferedReader br = new BufferedReader(new FileReader("testdata.txt"))) {
String line;
while ((line = br.readLine()) != null)
pw.println(Integer.parseInt(line) * 2);
}这是一种可以写得更短的东西,也许可以用流来读一些:
try (PrintWriter pw = new PrintWriter("testresult.txt")) {
Files.lines(Paths.get("testdata.txt")).forEach(
line -> pw.println(Integer.parseInt(line) * 2));
}这两个代码段产生非常相似的执行时间,在我的机器上大约1.6-1.7秒(用“预算”方法度量,long start = System.currentTimeMillis();之前和System.out.println(System.currentTimeMillis() - start);之后)。
然后,流可以被并行化,其中只有一个.parallel():
try (PrintWriter pw = new PrintWriter("testresult.txt")) {
Files.lines(Paths.get("testdata.txt")).parallel().forEach(
line -> pw.println(Integer.parseInt(line) * 2));
}这将产生混合顺序的结果。
关于println(int)的一个附带说明:它没有文档化,但是它的实际实现是螺纹安全,但是如果您想要绝对“安全”并且只在文档特性的基础上构建,您应该自己同步它:
try (PrintWriter pw = new PrintWriter("testresult.txt")) {
Files.lines(Paths.get("testdata.txt")).parallel().forEach(line -> {
synchronized (pw) {
pw.println(Integer.parseInt(line) * 2);
}
});
}这两个步骤实际上都比顺序慢(2秒和2.2秒,额外的手动同步确实很重要),但当然,这个处理步骤非常简单非常重要。因此,重要的是要记住,如果在您的情况下,文件操作占用了时间,那么并行性并不能真正帮助您做到这一点。
作为比较,使用线程池的完整代码段:
ExecutorService es = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
ExecutorCompletionService<String> ecs = new ExecutorCompletionService<String>(es);
int counter=0;
try (BufferedReader br = new BufferedReader(new FileReader("testdata.txt"))) {
String line;
while ((line = br.readLine()) != null) {
final String current = line;
ecs.submit(new Callable<String>() {
@Override
public String call() throws Exception {
return Integer.toString(Integer.parseInt(current)*2);
}
});
counter++;
}
}
try (PrintWriter pw = new PrintWriter("testresult.txt")) {
while(counter>0) {
pw.println(ecs.take().get());
counter--;
}
}
es.shutdown();这是它们中运行时间最长的一个,另一方面它运行了2秒,与synchronized-less流示例相比,没有它是“安全”的,因为文件操作都发生在主线程中(工作人员只计算和执行字符串)。使用全手动线程可能会使事情变得更加冗长,但目前我没有动力编写这样的代码。
发布于 2020-11-03 09:50:38
读取一个大文件是最快的顺序,因为物理磁盘访问。可以使用内存映射字节缓冲区。在您的情况下(每行处理) Files.lines(Path) (默认UTF-8)可能就足够了。
这是细粒度并发。可以通过使用线程池ExecutorService、ThreadPoolExecutor并行地完成处理,因为会有许多线程。
结果会乱七八糟的。如果这是一个问题,在Files.lines‘lambda中传递一个行号。
为了收集结果,在内存中查询它们并异步地将它们写入文件,可以查看是否有高性能的记录器。可能需要重新实现它的功能(取消日志格式设置)。因此,一个队列线程和一个写入文件的线程(一个大字节缓冲区)。
人们可以考虑压缩输出(.csv.gz),这将是进一步网络传输的空间/时间增益。
实现这一点的方法有很多种,因此研究javadoc和变体(例如FutureTask)并查看示例。
ThreadPoolExecutor executor = (ThreadPoolExecutor)
Executors.newFixedThreadPool(10);
for (;;) {
Task task = new Task(...);
executor.execute(task);
}
while (!executor.isTerminated()) { ... }
executor.shutdown();https://stackoverflow.com/questions/64660001
复制相似问题