Cleaned up a bit
This commit is contained in:
parent
da5e838b52
commit
9b35054423
7 changed files with 58 additions and 39 deletions
|
|
@ -0,0 +1,13 @@
|
||||||
|
package se.saasta.rabbitmq.model;
|
||||||
|
|
||||||
|
public class AkkaData {
|
||||||
|
private String orgnr;
|
||||||
|
|
||||||
|
public String getOrgnr() {
|
||||||
|
return orgnr;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setOrgnr(String orgnr) {
|
||||||
|
this.orgnr = orgnr;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -1,18 +1,20 @@
|
||||||
package se.saasta.rabbitmq.counter;
|
package se.saasta.rabbitmq.producer;
|
||||||
|
|
||||||
import io.quarkus.scheduler.Scheduled;
|
import io.quarkus.scheduler.Scheduled;
|
||||||
import io.quarkus.scheduler.ScheduledExecution;
|
import io.quarkus.scheduler.ScheduledExecution;
|
||||||
import jakarta.enterprise.context.ApplicationScoped;
|
import jakarta.enterprise.context.ApplicationScoped;
|
||||||
import org.eclipse.microprofile.reactive.messaging.Channel;
|
import org.eclipse.microprofile.reactive.messaging.Channel;
|
||||||
import org.eclipse.microprofile.reactive.messaging.Emitter;
|
import org.eclipse.microprofile.reactive.messaging.Emitter;
|
||||||
|
import se.saasta.rabbitmq.model.AkkaData;
|
||||||
|
|
||||||
|
import java.util.UUID;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
@ApplicationScoped
|
@ApplicationScoped
|
||||||
public class UpdateAktorCronJob {
|
public class UpdateAktorCronJob {
|
||||||
|
|
||||||
@Channel("akka")
|
@Channel("akka")
|
||||||
Emitter<String> quoteRequestEmitter;
|
Emitter<AkkaData> emitter;
|
||||||
|
|
||||||
private AtomicInteger counter = new AtomicInteger();
|
private AtomicInteger counter = new AtomicInteger();
|
||||||
|
|
||||||
|
|
@ -33,8 +35,10 @@ public class UpdateAktorCronJob {
|
||||||
|
|
||||||
@Scheduled(cron = "{cron.expr}")
|
@Scheduled(cron = "{cron.expr}")
|
||||||
void cronJobWithExpressionInConfig() {
|
void cronJobWithExpressionInConfig() {
|
||||||
counter.incrementAndGet();
|
AkkaData akkaData = new AkkaData();
|
||||||
System.out.println("Cron expression configured in application.properties");
|
UUID uuid = UUID.randomUUID();
|
||||||
quoteRequestEmitter.send("AKKA actor was updated");
|
akkaData.setOrgnr(uuid.toString());
|
||||||
|
emitter.send(akkaData);
|
||||||
|
System.out.println("Emitted that actor was updated");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -0,0 +1,20 @@
|
||||||
|
package se.saasta.rabbitmq.model;
|
||||||
|
|
||||||
|
public class AkkaData {
|
||||||
|
private String orgnr;
|
||||||
|
|
||||||
|
public String getOrgnr() {
|
||||||
|
return orgnr;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setOrgnr(String orgnr) {
|
||||||
|
this.orgnr = orgnr;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "AkkaData{" +
|
||||||
|
"orgnr='" + orgnr + '\'' +
|
||||||
|
'}';
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -1,32 +1,27 @@
|
||||||
package se.saasta.rabbitmq.processor;
|
package se.saasta.rabbitmq.processor;
|
||||||
|
|
||||||
|
import io.vertx.core.json.JsonObject;
|
||||||
import jakarta.enterprise.context.ApplicationScoped;
|
import jakarta.enterprise.context.ApplicationScoped;
|
||||||
import jakarta.inject.Inject;
|
import jakarta.inject.Inject;
|
||||||
import org.eclipse.microprofile.reactive.messaging.Incoming;
|
import org.eclipse.microprofile.reactive.messaging.Incoming;
|
||||||
import se.saasta.rabbitmq.entity.AkkaAktorEntity;
|
import se.saasta.rabbitmq.model.AkkaData;
|
||||||
import se.saasta.rabbitmq.service.AkkaService;
|
import se.saasta.rabbitmq.service.AkkaService;
|
||||||
|
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
import java.util.UUID;
|
|
||||||
|
|
||||||
@ApplicationScoped
|
@ApplicationScoped
|
||||||
public class AkkaProcessor {
|
public class AkkaProcessor {
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
AkkaService service;
|
private AkkaService service;
|
||||||
|
|
||||||
private Random random = new Random();
|
private Random random = new Random();
|
||||||
|
|
||||||
|
|
||||||
@Incoming("akka")
|
@Incoming("akka")
|
||||||
public void processCron(String message) throws InterruptedException {
|
public void process(JsonObject quoteRequest) throws InterruptedException {
|
||||||
System.out.println("ALOO");
|
AkkaData data = quoteRequest.mapTo(AkkaData.class);
|
||||||
System.out.println(message);
|
System.out.println("Received data from Akka: " + data);
|
||||||
|
service.saveAktor(data);
|
||||||
AkkaAktorEntity entity = new AkkaAktorEntity();
|
|
||||||
UUID uuid = UUID.randomUUID();
|
|
||||||
entity.setOrgnr(uuid.toString());
|
|
||||||
|
|
||||||
service.saveAktor(entity);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -13,7 +13,7 @@ import java.util.Random;
|
||||||
public class StaraProcessor {
|
public class StaraProcessor {
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
StaraService service;
|
private StaraService service;
|
||||||
|
|
||||||
private Random random = new Random();
|
private Random random = new Random();
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -4,6 +4,7 @@ import jakarta.enterprise.context.ApplicationScoped;
|
||||||
import jakarta.inject.Inject;
|
import jakarta.inject.Inject;
|
||||||
import jakarta.transaction.Transactional;
|
import jakarta.transaction.Transactional;
|
||||||
import se.saasta.rabbitmq.entity.AkkaAktorEntity;
|
import se.saasta.rabbitmq.entity.AkkaAktorEntity;
|
||||||
|
import se.saasta.rabbitmq.model.AkkaData;
|
||||||
import se.saasta.rabbitmq.repository.AkkaRepository;
|
import se.saasta.rabbitmq.repository.AkkaRepository;
|
||||||
|
|
||||||
@ApplicationScoped
|
@ApplicationScoped
|
||||||
|
|
@ -13,7 +14,9 @@ public class AkkaService {
|
||||||
private AkkaRepository repository;
|
private AkkaRepository repository;
|
||||||
|
|
||||||
@Transactional
|
@Transactional
|
||||||
public void saveAktor(AkkaAktorEntity entity) {
|
public void saveAktor(AkkaData data) {
|
||||||
|
AkkaAktorEntity entity = new AkkaAktorEntity();
|
||||||
|
entity.setOrgnr(data.getOrgnr());
|
||||||
repository.persist(entity);
|
repository.persist(entity);
|
||||||
System.out.println("Saved entity to database: " + entity.toString());
|
System.out.println("Saved entity to database: " + entity.toString());
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -20,19 +20,13 @@ import java.util.UUID;
|
||||||
@Path("/stara")
|
@Path("/stara")
|
||||||
public class StaraResource {
|
public class StaraResource {
|
||||||
|
|
||||||
private static long id = 0;
|
|
||||||
|
|
||||||
@Channel("stara")
|
@Channel("stara")
|
||||||
Emitter<StaraData> quoteRequestEmitter;
|
Emitter<StaraData> emitter;
|
||||||
|
|
||||||
|
|
||||||
@Channel("aver")
|
@Channel("aver")
|
||||||
Multi<StaraData> quotes;
|
Multi<StaraData> quotes;
|
||||||
|
|
||||||
/**
|
|
||||||
* Endpoint to generate a new quote request id and send it to "quote-requests" channel (which
|
|
||||||
* maps to the "quote-requests" RabbitMQ exchange) using the emitter.
|
|
||||||
*/
|
|
||||||
@POST
|
@POST
|
||||||
@Produces(MediaType.APPLICATION_JSON)
|
@Produces(MediaType.APPLICATION_JSON)
|
||||||
@Consumes(MediaType.APPLICATION_JSON)
|
@Consumes(MediaType.APPLICATION_JSON)
|
||||||
|
|
@ -43,18 +37,8 @@ public class StaraResource {
|
||||||
orgnrs.add(uuid.toString());
|
orgnrs.add(uuid.toString());
|
||||||
data.setOrgnr(orgnrs);
|
data.setOrgnr(orgnrs);
|
||||||
data.setAmount(random.nextInt(1000 + 1));
|
data.setAmount(random.nextInt(1000 + 1));
|
||||||
quoteRequestEmitter.send(data);
|
emitter.send(data);
|
||||||
return data;
|
return data;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Endpoint retrieving the "quotes" queue and sending the items to a server sent event.
|
|
||||||
*/
|
|
||||||
// @GET
|
|
||||||
// @Produces(MediaType.SERVER_SENT_EVENTS)
|
|
||||||
// public Multi<StaraData> stream() {
|
|
||||||
// return Multi.createFrom().item(new StaraData()).map(l -> new StaraData());
|
|
||||||
//// return quotes;
|
|
||||||
// }
|
|
||||||
}
|
}
|
||||||
Loading…
Reference in a new issue