lyquor/
lyquor.rs

1use std::path::Path;
2use std::str::FromStr;
3use std::sync::Arc;
4
5use actix::prelude::*;
6use actix::{Actor, Addr};
7use alloy_node_bindings::{Anvil, AnvilInstance};
8use lyquor_primitives::{LyquidNumber, debug_struct_name};
9use tokio::signal::unix::{SignalKind, signal};
10use tokio::sync::Notify;
11
12use lyquor_api::{
13    actor::Stop,
14    anyhow::{self, Context as _Context},
15    kvstore::{KVStore, Key, PrefixedKVStore, ShadowKVStore},
16    subkey_builder,
17};
18use lyquor_db::{MemDB, RocksDB};
19use lyquor_jsonrpc::client::{ClientConfig, ClientHandle};
20use lyquor_lib::{api, lyquid, utils};
21use lyquor_primitives::{Address, Bytes, LyquidID, LyteLog, RegisterEvent};
22use lyquor_seq::{SequenceBackend, eth, fco::FCO, repo};
23use lyquor_vm::log::Log;
24
25subkey_builder!(RootSubKey(
26    ([0x01])-lyquid() => Key,
27    ([0x02])-fco() => Key,
28    ([0x03])-log() => Key,
29    ([0x04])-event_store() => Key,
30));
31
32fn subkey() -> &'static RootSubKey {
33    static KEY: std::sync::OnceLock<RootSubKey> = std::sync::OnceLock::new();
34    KEY.get_or_init(|| RootSubKey::new(Bytes::new().into()))
35}
36
37const LYQUOR_DB_ROOT: &str = "./lyquor_db";
38const ANVIL_STATE_FILENAME: &str = "anvil.state";
39
40static ANVIL_PID: std::sync::OnceLock<u32> = std::sync::OnceLock::new();
41
42#[derive(Debug, Message)]
43#[rtype(result = "()")]
44struct LoadLyquid {
45    id: LyquidID,
46    deps: Vec<LyquidID>,
47}
48
49#[derive(Debug, Message)]
50#[rtype(result = "()")]
51struct SetPool {
52    pool: Addr<lyquid::LyquidPool>,
53}
54
55struct LyquidDiscovery {
56    pool: Option<Addr<lyquid::LyquidPool>>,
57    register: Arc<Notify>,
58    bartender_id: LyquidID,
59    bartender_addr: Address,
60}
61
62lyquor_primitives::debug_struct_name!(LyquidDiscovery);
63
64impl Handler<lyquor_seq::eth::GetContract> for LyquidDiscovery {
65    type Result = ResponseFuture<Address>;
66    #[tracing::instrument(level = "trace", skip(self, msg, _ctx))]
67    fn handle(&mut self, msg: lyquor_seq::eth::GetContract, _ctx: &mut Context<Self>) -> Self::Result {
68        let lyquor_seq::eth::GetContract { id, idx } = msg;
69        if id == self.bartender_id && idx == Some(0) {
70            return Box::pin(std::future::ready(self.bartender_addr));
71        }
72        let pool = self.pool.clone().expect("LyquidDiscovery: Pool not set.");
73        let reg = self.register.clone();
74        Box::pin(async move {
75            loop {
76                // FIXME: improve the efficiency of this by filtering the reigster event for
77                // the specific lyquid in bartender
78                // idx=None means latest; else nth deployment
79                if let Some(n) = idx {
80                    match pool.send(crate::lyquid::GetLyquidDeploymentInfo { id, idx: n }).await {
81                        Ok(Some(info)) => return info.contract,
82                        Ok(None) => reg.notified().await,
83                        Err(e) => {
84                            tracing::error!("Failed to get lyquid deployment info: {}", e);
85                            // Stall forever for safety
86                            std::future::pending::<()>().await;
87                        }
88                    }
89                } else {
90                    match pool.send(crate::lyquid::GetLatestLyquidInfo { id }).await {
91                        Ok(Some(info)) => return info.contract,
92                        Ok(None) => reg.notified().await,
93                        Err(e) => {
94                            tracing::error!("Failed to get latest lyquid info: {}", e);
95                            std::future::pending::<()>().await;
96                        }
97                    }
98                }
99            }
100        })
101    }
102}
103
104// latest variant handled in GetContract via idx=None
105
106impl Handler<lyquor_vm::log::LogItem> for LyquidDiscovery {
107    type Result = ();
108
109    #[tracing::instrument(level = "trace", skip(msg))]
110    fn handle(&mut self, msg: lyquor_vm::log::LogItem, ctx: &mut Context<Self>) -> Self::Result {
111        let event: Option<RegisterEvent> = lyquor_primitives::decode_object(&msg.0.data);
112        if let Some(event) = event {
113            ctx.spawn(self.load(event.id, event.deps).into_actor(self));
114        }
115    }
116}
117
118impl Handler<LoadLyquid> for LyquidDiscovery {
119    type Result = ResponseFuture<()>;
120
121    #[tracing::instrument(level = "trace")]
122    fn handle(&mut self, msg: LoadLyquid, _ctx: &mut Context<Self>) -> Self::Result {
123        Box::pin(self.load(msg.id, msg.deps))
124    }
125}
126
127impl Handler<SetPool> for LyquidDiscovery {
128    type Result = ();
129
130    #[tracing::instrument(level = "trace")]
131    fn handle(&mut self, msg: SetPool, ctx: &mut Context<Self>) -> Self::Result {
132        if self.pool.is_some() {
133            return
134        }
135        let pool = msg.pool;
136        let me = ctx.address();
137        self.pool = Some(pool.clone());
138        ctx.spawn(
139            async move {
140                // TODO: right now we just subscribe to every lyquid known in history, need to change this
141                // to user-configured set (or groups)
142                let lyquids_with_deps = match pool.send(crate::lyquid::GetRegisteredLyquidList).await.ok().flatten() {
143                    None => {
144                        // FIXME: this could be triggered when bartender has only executed one slot
145                        // upon recovery. There is a race that before bartender repeat the
146                        // execution of the first slot, its state is without image so this call
147                        // will fail.
148                        tracing::error!("Failed to obtain the registered Lyquid list.");
149                        return
150                    }
151                    Some(lyquids_with_deps) => lyquids_with_deps,
152                };
153                // Now we have the actual dependency list from the bartender
154                for (id, deps) in lyquids_with_deps.into_iter() {
155                    if let Err(e) = me.send(LoadLyquid { id, deps }).await {
156                        tracing::error!("Failed to send LoadLyquid message: {}", e);
157                    }
158                }
159            }
160            .into_actor(self),
161        );
162    }
163}
164
165impl LyquidDiscovery {
166    fn load(&mut self, id: LyquidID, deps: Vec<LyquidID>) -> impl Future<Output = ()> + 'static {
167        let pool = self.pool.clone().expect("LyquidDiscovery: Pool not set.");
168        let register = self.register.clone();
169        async move {
170            match pool.send(crate::lyquid::LoadLyquid { id, deps }).await {
171                Ok(result) => match result {
172                    Ok(already_exists) => {
173                        if !already_exists {
174                            tracing::info!("Discovered {}", id);
175                        }
176                    }
177                    Err(e) => tracing::error!("Failed to register discovered Lyquid: {e}"),
178                },
179                Err(e) => tracing::error!("Failed to send LoadLyquid message: {}", e),
180            }
181            register.notify_waiters();
182        }
183    }
184}
185
186impl Handler<Stop> for LyquidDiscovery {
187    type Result = ();
188
189    #[tracing::instrument(level = "trace")]
190    fn handle(&mut self, _: Stop, ctx: &mut Context<Self>) -> Self::Result {
191        ctx.stop();
192    }
193}
194
195impl Actor for LyquidDiscovery {
196    type Context = actix::Context<Self>;
197
198    #[tracing::instrument(level = "trace", skip(_ctx))]
199    fn started(&mut self, _ctx: &mut Self::Context) {}
200
201    #[tracing::instrument(level = "trace", skip(_ctx))]
202    fn stopped(&mut self, _ctx: &mut Self::Context) {}
203}
204
205struct DBFlusher {
206    last_bn: u64,
207    store: Arc<ShadowKVStore<Box<dyn KVStore>>>,
208    fco: Addr<FCO>,
209    anvil_chain: Option<ClientHandle>,
210    flush_interval_secs: u64,
211}
212
213lyquor_primitives::debug_struct_name!(DBFlusher);
214
215impl DBFlusher {
216    fn prepare_flush(&self) -> impl Future<Output = anyhow::Result<()>> + 'static {
217        let anvil_chain = self.anvil_chain.clone();
218        let fco = self.fco.clone();
219        async move {
220            fco.send(lyquor_seq::fco::Commit)
221                .await
222                .map_err(anyhow::Error::from)
223                .and_then(|e| e.map_err(anyhow::Error::from))?;
224
225            if let Some(client) = &anvil_chain {
226                // If running devnet (anvil), dump chain state.
227                //
228                // We dump the anvil chain state *after* the commit of FCO, so FCO's progress will
229                // not outpace the dumped chain state (i.e., includes execution caused by the
230                // blocks later). Of course, this somehow assume Anvil's API handling has an
231                // ordering (the dump chain state is more recent than that handled previous API
232                // responses).
233                use lyquor_jsonrpc::types::{AnvilDumpState, AnvilDumpStateResp};
234                let dump = client
235                    .request::<AnvilDumpState, AnvilDumpStateResp>(AnvilDumpState)
236                    .await
237                    .map(|r| r.0);
238
239                match dump {
240                    Ok(state) => {
241                        // Write the anvil state to file
242                        let devnet_path = format!("{}/devnet", LYQUOR_DB_ROOT);
243                        std::fs::create_dir_all(&devnet_path)?;
244                        std::fs::write(&format!("{}/{}", devnet_path, ANVIL_STATE_FILENAME), &state)?;
245                    }
246                    Err(_) => {
247                        tracing::error!(
248                            "DBFlusher: Failed to dump state from anvil. This could cause issue due to the loss of local chain state upon next startup."
249                        );
250                    }
251                }
252            }
253
254            Ok(())
255        }
256    }
257
258    fn flush(&mut self, res: anyhow::Result<()>) {
259        if let Err(e) = res.and_then(|_| self.store.flush().map_err(|e| e.into())) {
260            tracing::error!("Failed to write: {e}");
261        } else {
262            tracing::info!("Written to the storage backend (block={}).", self.last_bn);
263        }
264    }
265}
266
267#[derive(Message, Debug)]
268#[rtype(result = "()")]
269struct FlushDB;
270
271impl Handler<FlushDB> for DBFlusher {
272    type Result = ResponseActFuture<Self, ()>;
273
274    #[tracing::instrument(level = "trace", skip(self, _ctx))]
275    fn handle(&mut self, _: FlushDB, _ctx: &mut actix::Context<Self>) -> Self::Result {
276        Box::pin(self.prepare_flush().into_actor(self).map(move |res, actor, _ctx| {
277            actor.flush(res);
278        }))
279    }
280}
281
282impl Handler<Stop> for DBFlusher {
283    type Result = ResponseActFuture<Self, ()>;
284
285    #[tracing::instrument(level = "trace")]
286    fn handle(&mut self, _: Stop, _ctx: &mut actix::Context<Self>) -> Self::Result {
287        Box::pin(self.prepare_flush().into_actor(self).map(move |res, actor, _ctx| {
288            actor.flush(res);
289        }))
290    }
291}
292
293impl Actor for DBFlusher {
294    type Context = actix::Context<Self>;
295
296    #[tracing::instrument(level = "trace", skip_all)]
297    fn started(&mut self, ctx: &mut Self::Context) {
298        if self.flush_interval_secs > 0 {
299            let periodic_write = tokio::time::Duration::from_secs(self.flush_interval_secs);
300            ctx.run_interval(periodic_write, |_act, ctx| {
301                ctx.address().do_send(FlushDB);
302            });
303        }
304    }
305
306    #[tracing::instrument(level = "trace", skip_all)]
307    fn stopped(&mut self, _ctx: &mut Self::Context) {}
308}
309
310async fn setup_eth_backend(
311    profile: &utils::LyquorProfile, reg: lyquor_seq::eth::ContractRegistry, jsonrpc_client: ClientHandle,
312) -> anyhow::Result<eth::Backend> {
313    let finality_tag: lyquor_jsonrpc::types::BlockNumber =
314        serde_json::from_str(profile.finality()).context("Invalid finality tag.")?;
315
316    Ok(eth::Backend::new(
317        eth::Config::builder()
318            .finality_tag(finality_tag)
319            .registry(reg)
320            .blocks_per_request(std::num::NonZeroUsize::new(10).context("Block per request should be >0.")?)
321            .build(),
322        jsonrpc_client,
323    ))
324}
325
326#[derive(Default)]
327struct Subsystems {
328    fco: Option<Addr<FCO>>,
329    log: Option<Addr<Log>>,
330    pool: Option<Addr<lyquid::LyquidPool>>,
331    api: Option<Addr<api::EthAPIServer>>,
332    db: Option<Addr<DBFlusher>>,
333    dis: Option<Addr<LyquidDiscovery>>,
334    client: Option<ClientHandle>,
335    eth_backend: Option<lyquor_seq::eth::Backend>,
336}
337debug_struct_name!(Subsystems);
338
339impl Subsystems {
340    #[tracing::instrument(level = "trace")]
341    async fn stop(&mut self) {
342        if let Some(api) = self.api.take() {
343            api.send(Stop).await.ok();
344        }
345        if let Some(eth_backend) = self.eth_backend.take() {
346            eth_backend.stop().await;
347        }
348        if let Some(log) = self.log.take() {
349            log.send(Stop).await.ok();
350        }
351        if let Some(dis) = self.dis.take() {
352            dis.send(Stop).await.ok();
353        }
354        if let Some(pool) = self.pool.take() {
355            pool.send(Stop).await.ok();
356        }
357        if let Some(db) = self.db.take() {
358            db.send(Stop).await.ok();
359        }
360        if let Some(dis) = self.dis.take() {
361            dis.send(Stop).await.ok();
362        }
363        if let Some(fco) = self.fco.take() {
364            fco.send(Stop).await.ok();
365        }
366    }
367}
368
369struct NodeConfig {
370    mem_db: bool,
371    seq_eth_url: String,
372    api_url: String,
373    profile: utils::LyquorProfile,
374    started_anvil: bool,
375    flush_interval_secs: u64,
376}
377
378/// TODO: refactor this with actix's Actor model.
379struct Node {
380    sys: Subsystems,
381    config: Arc<NodeConfig>,
382    network: Option<lyquor_net::hub::Hub>,
383}
384
385lyquor_primitives::debug_struct_name!(Node);
386
387impl Handler<Stop> for Node {
388    type Result = AtomicResponse<Self, ()>;
389
390    #[tracing::instrument(level = "trace")]
391    fn handle(&mut self, _: Stop, _ctx: &mut Context<Self>) -> Self::Result {
392        let mut sys = std::mem::replace(&mut self.sys, Subsystems::default());
393        AtomicResponse::new(Box::pin(
394            async move {
395                sys.stop().await;
396            }
397            .into_actor(self)
398            .map(|_, _act, ctx| {
399                ctx.stop();
400            }),
401        ))
402    }
403}
404
405impl Actor for Node {
406    type Context = Context<Self>;
407
408    #[tracing::instrument(level = "trace", skip(ctx))]
409    fn started(&mut self, ctx: &mut Self::Context) {
410        // NOTE: use spawn here instead of wait so Stop can be handled
411        ctx.spawn(
412            Self::init(
413                self.config.clone(),
414                self.network.take().expect("Node: Network not found."),
415            )
416            .into_actor(self)
417            .map(|sys, act, ctx| {
418                match sys {
419                    Ok(sys) => {
420                        act.sys = sys;
421                    }
422                    Err(e) => {
423                        tracing::error!("Failed to initialize node: {e:?}");
424                        ctx.stop();
425                    }
426                };
427            }),
428        );
429    }
430
431    #[tracing::instrument(level = "trace", skip(_ctx))]
432    fn stopped(&mut self, _ctx: &mut Self::Context) {}
433}
434
435impl Node {
436    #[tracing::instrument(level = "trace", skip_all)]
437    async fn init(config: Arc<NodeConfig>, network: lyquor_net::hub::Hub) -> anyhow::Result<Subsystems> {
438        let mut sys = Subsystems::default();
439
440        //// 1. DB & Storage
441        let store: Arc<ShadowKVStore<Box<dyn KVStore>>>;
442        let lyq_store: Arc<dyn utils::LVMStoreFactory> = if config.mem_db {
443            tracing::warn!("using MemDB, all data will be lost upon exit");
444            store = Arc::new(ShadowKVStore::new(Box::new(MemDB::new())));
445            Arc::new(utils::LVMMemStore)
446        } else {
447            store = Arc::new(ShadowKVStore::new(Box::new(RocksDB::new(
448                &Path::new(LYQUOR_DB_ROOT).join(config.profile.id()),
449            )?)));
450            Arc::new(utils::LVMDBStore(Arc::new(PrefixedKVStore::new(
451                store.clone(),
452                subkey().lyquid(),
453            ))))
454        };
455        ////
456
457        //// 2. Fate-Constrainted Ordering Subsystem.
458        // Start discovering Lyquid and sequencing contracts.
459        let dis = LyquidDiscovery {
460            pool: None,
461            register: Arc::new(Notify::new()),
462            bartender_id: *config.profile.bartender_id(),
463            bartender_addr: Address::from_str(config.profile.bartender_addr())
464                .context("Invalid bartender address in profile.")?
465                .into(),
466        }
467        .start();
468        sys.dis = Some(dis.clone());
469
470        // Set up Ethereum sequence backend.
471        let jsonrpc_client = ClientConfig::builder()
472            .url(config.seq_eth_url.clone().parse()?)
473            .build()
474            .into_client();
475        sys.client = Some(jsonrpc_client.clone());
476
477        // Load anvil state if we started our own anvil and state file exists
478        let anvil_chain = if config.started_anvil && !config.mem_db {
479            let state_file = format!("{}/{}/{}", LYQUOR_DB_ROOT, config.profile.id(), ANVIL_STATE_FILENAME);
480            if std::path::Path::new(&state_file).exists() {
481                use lyquor_jsonrpc::types::{AnvilLoadState, AnvilLoadStateResp};
482                let state_data = std::fs::read(&state_file)?;
483                let load_result = jsonrpc_client
484                    .request::<AnvilLoadState, AnvilLoadStateResp>(AnvilLoadState(state_data.into()))
485                    .await?;
486                tracing::info!("Loaded anvil state from {}: {}.", state_file, load_result.0);
487            } else {
488                tracing::info!(
489                    "No existing anvil state file found at {}, starting with fresh state.",
490                    state_file
491                );
492            }
493            Some(jsonrpc_client.clone())
494        } else {
495            None
496        };
497
498        let eth_backend = setup_eth_backend(&config.profile, dis.clone().recipient(), jsonrpc_client.clone()).await?;
499
500        let fco_store = PrefixedKVStore::new(store.clone(), subkey().fco());
501        let fco = FCO::new(
502            eth_backend.clone(),
503            Arc::new(fco_store),
504            Some(config.profile.init_chain_position()),
505        )?
506        .start();
507        sys.fco = Some(fco.clone());
508        // Keep a separate reference for shutdown management
509        sys.eth_backend = Some(eth_backend);
510        ////
511
512        //// 3. Lyquor Virtual Machine Subsystem.
513        // Configure Log subsystem.
514        let log_store = PrefixedKVStore::new(store.clone(), subkey().log());
515        let log = Log::new(Arc::new(lyquor_state::DBSimpleStateStore::new(log_store))).start();
516        sys.log = Some(log.clone());
517
518        // Create the top-level LVM object
519        let lvm = lyquid::LVM::new(
520            log.clone(),
521            network,
522            10,
523            Some(lyquor_vm::InterOps {
524                inter: fco.clone().recipient(),
525                submit: fco.clone().recipient(),
526            }),
527        );
528        let api_subs = api::Subscriptions::new().start();
529        let event_store = Arc::new(lyquor_seq::event_store::EventStore::new(
530            Arc::new(PrefixedKVStore::new(store.clone(), subkey().event_store())),
531            1000,
532        ));
533        ////
534
535        //// 4. Pool of all Lyquids
536        let image_repo_store = RocksDB::new(Path::new("./lyquid_image_db"))?;
537        let image_repo: Arc<dyn repo::LyquidImageRepo> = Arc::new(repo::KVStoreRepo::new(image_repo_store));
538        let bartender_id = *config.profile.bartender_id();
539        let pool = lyquid::LyquidPool::new(
540            bartender_id,
541            lvm,
542            lyquid::LyquidPoolSetup {
543                fco: fco.clone(),
544                store_factory: lyq_store,
545                image_repo,
546                event_store,
547                api_subs: api_subs.clone(),
548                config: lyquid::LyquidConfig::builder().build(),
549            },
550        )
551        .await?
552        .start();
553        sys.pool = Some(pool.clone());
554        ////
555
556        // Configure bartender.
557        let bartender_setup = {
558            let register_topic = LyteLog::tagged_value_topic("Register");
559            match log
560                .send(lyquor_vm::log::GetNumberOfRecords {
561                    id: bartender_id,
562                    topic: register_topic,
563                })
564                .await
565            {
566                Ok(count) if count > 0 => None,
567                Ok(_) => Some(log.send(lyquor_vm::log::WaitNext {
568                    id: bartender_id,
569                    topic: register_topic,
570                })),
571                Err(e) => return Err(e).context("Failed to get number of records during bartender setup."),
572            }
573        };
574
575        // Start the API server - use the same jsonrpc client as sequencer
576        let api = lyquor_lib::api::EthAPIServer::new(
577            &config.api_url,
578            jsonrpc_client.clone(),
579            api_subs,
580            fco.clone(),
581            pool.clone(),
582        )
583        .await?;
584        sys.api = Some(api.start());
585
586        // Wait for bartender to be registered.
587        if let Some(reg) = bartender_setup {
588            tracing::info!("Waiting for bartender to be registered.");
589            reg.await.context("Failed to wait for bartender registration.")?;
590        }
591        // Bartender, as a normal Lyquid, may have some re-execution, let's make sure it is done.
592        let bartender_ins = pool
593            .send(lyquid::GetBartender)
594            .await
595            .expect("Failed to obtain bartender.");
596        let bartender_ln = bartender_ins
597            .instance()
598            .read()
599            .max_number()
600            .unwrap_or(LyquidNumber::ZERO);
601        tracing::debug!("Waiting for bartender to reach {bartender_ln}.");
602        bartender_ins
603            .wait_until(bartender_ln)
604            .await
605            .context("Failed to wait for the initial re-execution of bartender.")?;
606
607        // Finish up LyquidDiscovery.
608        log.send(lyquor_vm::log::Subscribe {
609            id: bartender_id,
610            topic: LyteLog::tagged_value_topic("Register"),
611            subscriber: dis.clone().recipient(),
612        })
613        .await
614        .context("Failed to subscribe to log system for LyquidDiscovery.")?;
615        dis.send(SetPool { pool: pool.clone() })
616            .await
617            .context("Failed to set pool in LyquidDiscovery.")?;
618
619        tracing::info!("INIT COMPLETE (bartender={bartender_id})");
620
621        // Start a DB writer that persist when node is idle.
622        let db = DBFlusher {
623            last_bn: 0,
624            store,
625            fco,
626            anvil_chain,
627            flush_interval_secs: config.flush_interval_secs,
628        }
629        .start();
630        sys.db = Some(db);
631
632        Ok(sys)
633    }
634}
635
636fn generate_unused_port() -> anyhow::Result<u16> {
637    let listener = std::net::TcpListener::bind("127.0.0.1:0").context("Failed to bind to local address.")?;
638    let port = listener.local_addr().context("Failed to get local address.")?.port();
639    Ok(port)
640}
641
642fn start_devnet_anvil() -> anyhow::Result<AnvilInstance> {
643    let anvil_port = generate_unused_port()?;
644    // Mask the SIGINT signal so anvil process won't terminate on SIGINT (it'll only
645    // terminate when dropped)
646    use nix::sys::signal::{self, SigSet, SigmaskHow, Signal};
647    let mut sigset = SigSet::empty();
648    sigset.add(Signal::SIGINT);
649    let mut old_mask = SigSet::empty();
650    signal::sigprocmask(SigmaskHow::SIG_BLOCK, Some(&sigset), Some(&mut old_mask))
651        .context("Failed to mask SIGINT for anvil.")?;
652
653    let mut anvil = Anvil::new().port(anvil_port).keep_stdout().try_spawn()?;
654
655    let stdout_reader = std::io::BufReader::new(
656        anvil
657            .child_mut()
658            .stdout
659            .take()
660            .context("Failed to read from anvil stdout.")?,
661    );
662
663    tokio::task::spawn_blocking(move || {
664        use std::io::BufRead;
665        let mut stdout_lines = stdout_reader.lines();
666        while let Some(line) = stdout_lines.next() {
667            tracing::debug!(target: "lyquor_anvil", "{}", line.unwrap_or_else(|_| "".to_string()));
668        }
669    });
670
671    tracing::info!("Anvil started at port {}.", anvil_port);
672    signal::sigprocmask(SigmaskHow::SIG_SETMASK, Some(&old_mask), None)
673        .context("Failed to restore old signal mask for anvil.")?;
674
675    Ok(anvil)
676}
677
678#[actix::main]
679async fn main() -> anyhow::Result<()> {
680    // Install panic hook to cleanup anvil on panic
681    std::panic::set_hook(Box::new(|_| {
682        if let Some(&pid) = ANVIL_PID.get() {
683            tracing::warn!("Panic detected, killing anvil process {}", pid);
684            use nix::sys::signal::{Signal, kill};
685            let _ = kill(nix::unistd::Pid::from_raw(pid as i32), Signal::SIGTERM);
686        }
687    }));
688
689    lyquor_cli::setup_tracing()?;
690
691    println!("{}", lyquor_cli::format_logo_banner());
692
693    let matches = clap::command!()
694        .propagate_version(true)
695        .arg(
696            clap::arg!(--"mem-db" "Use in-memory DB simulation (for debugging or some benchmarks).")
697                .required(false)
698                .action(clap::ArgAction::SetTrue),
699        )
700        .arg(
701            clap::arg!(--"api-eth" <IP_AND_PORT> "Serving address for Ethereum API.")
702                .required(false)
703                .default_value("localhost:10087"),
704        )
705        .arg(
706            clap::arg!(--"upc-addr" <IP_AND_PORT> "Serving address for UPC.")
707                .required(false)
708                .default_value("localhost:10080"),
709        )
710        .arg(
711            clap::arg!(--network <NETWORK> "Select the network to run. (Available: devnet/testnet/mainnet)")
712                .required(false)
713                .default_value("devnet"),
714        )
715        .arg(
716            clap::arg!(--"force-seq-eth" <WS_URL> "Override the WebSocket url for Ethereum API Sequencer.")
717                .required(false),
718        )
719        .arg(
720            clap::arg!(--"flush-interval" <SECONDS> "Interval in seconds for database flushing (0 to disable).")
721                .required(false)
722                .value_parser(clap::value_parser!(u64)),
723        )
724        .get_matches();
725
726    // TODO: configure these parameters properly
727    let (_, tls_config) = lyquor_tls::generator::single_node_config();
728    let addr = matches.get_one::<String>("upc-addr").unwrap();
729    let mut network = lyquor_net::hub::HubBuilder::new(tls_config)
730        .listen_addr(addr.clone())
731        .build()
732        .context("Failed to setup network.")?;
733    network.start().await.context("Failed to start network.")?;
734
735    let mut _anvil: Option<AnvilInstance> = None;
736    let network_name = matches.get_one::<String>("network").map(|s| s.as_str());
737    let profile: utils::LyquorProfile = match network_name {
738        Some("devnet") => {
739            let ws_url = match matches.get_one::<String>("force-seq-eth") {
740                None => {
741                    let anvil = start_devnet_anvil()?;
742                    // Store anvil PID for panic hook
743                    ANVIL_PID.set(anvil.child().id()).ok();
744                    let ep = anvil.ws_endpoint();
745                    _anvil = Some(anvil);
746                    ep
747                }
748                Some(url) => url.to_string(),
749            };
750
751            utils::Devnet::new(ws_url).into()
752        }
753        _ => return Err(anyhow::anyhow!("Network is not supported.")),
754    };
755    let seq_eth_url = matches
756        .get_one::<String>("force-seq-eth")
757        .cloned()
758        .unwrap_or_else(|| profile.sequencer().to_string());
759    let api_url = matches.get_one::<String>("api-eth").cloned().unwrap();
760    let mem_db = matches.get_flag("mem-db");
761    let flush_interval_secs = matches
762        .get_one::<u64>("flush-interval")
763        .copied()
764        .unwrap_or_else(|| match network_name {
765            Some("devnet") => 0,
766            _ => 10,
767        });
768
769    // Signal handlers should be registered before starting any actors, so they'll be stopped
770    // gracefully.
771    let mut sigint = signal(SignalKind::interrupt()).context("Failed to register SIGINT handler")?;
772    let mut sigterm = signal(SignalKind::terminate()).context("Failed to register SIGTERM handler")?;
773
774    let node = Node {
775        sys: Subsystems::default(),
776        config: Arc::new(NodeConfig {
777            mem_db,
778            seq_eth_url,
779            api_url,
780            profile,
781            started_anvil: _anvil.is_some(),
782            flush_interval_secs,
783        }),
784        network: Some(network),
785    }
786    .start();
787
788    tokio::select! {
789        _ = sigint.recv() => {
790            tracing::info!("received SIGINT");
791        }
792        _ = sigterm.recv() => {
793            tracing::info!("received SIGTERM");
794        }
795    }
796
797    Ok(node.send(Stop).await?)
798}